#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 导出审核通过的文章内容和标签到CSV文件 此脚本将从ai_articles表中导出status为approved的文章内容和标签 """ import csv import json import os from datetime import datetime from log_config import setup_logger def export_approved_articles_to_csv(output_file='approved_articles_export.csv'): """ 导出审核通过的文章内容和标签到CSV文件 Args: output_file: 输出的CSV文件名 """ # 设置日志记录器 logger = setup_logger('article_export', 'logs/article_export.log', 'logs/article_export_error.log') try: # 从数据库获取真实数据 from database_config import db_manager # 查询审核通过的文章,包含内容和标签 sql = """ SELECT id, title, content, coze_tag, created_at, updated_at FROM ai_articles WHERE status = 'approved' ORDER BY id """ logger.info("开始查询审核通过的文章数据...") results = db_manager.execute_query(sql) if not results: logger.warning("没有找到状态为approved的文章") print("没有找到状态为approved的文章") return logger.info(f"查询到 {len(results)} 条审核通过的文章") print(f"查询到 {len(results)} 条审核通过的文章") # 准备输出目录 output_dir = os.path.dirname(output_file) if output_dir and not os.path.exists(output_dir): os.makedirs(output_dir) # 写入CSV文件 with open(output_file, 'w', newline='', encoding='utf-8-sig') as csvfile: fieldnames = ['ID', '标题', '内容', '标签', '创建时间', '更新时间'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) # 写入表头 writer.writeheader() # 写入数据 for row in results: id_val, title, content, coze_tag, created_at, updated_at = row # 尝试解析标签,如果是JSON格式则转换为字符串 parsed_tags = coze_tag if coze_tag: try: # 尝试解析JSON格式的标签 tags_data = json.loads(coze_tag) if isinstance(tags_data, list): parsed_tags = ', '.join(tags_data) elif isinstance(tags_data, dict): # 如果是字典格式,提取值 parsed_tags = ', '.join(str(v) for v in tags_data.values()) except json.JSONDecodeError: # 如果不是JSON格式,保持原样 parsed_tags = coze_tag writer.writerow({ 'ID': id_val, '标题': title, '内容': content, '标签': parsed_tags or '', '创建时间': created_at.strftime('%Y-%m-%d %H:%M:%S') if created_at else '', '更新时间': updated_at.strftime('%Y-%m-%d %H:%M:%S') if updated_at else '' }) logger.info(f"成功导出 {len(results)} 条文章到 {output_file}") print(f"成功导出 {len(results)} 条文章到 {output_file}") except Exception as e: logger.error(f"导出文章数据时发生错误: {e}", exc_info=True) print(f"导出文章数据时发生错误: {e}") raise def test_db_connection(): """ 测试数据库连接 """ try: from database_config import db_manager # 尝试执行一个简单的查询来测试连接 test_sql = "SELECT 1 as test" result = db_manager.execute_query(test_sql) print("数据库连接测试成功:", result) return True except Exception as e: print(f"数据库连接测试失败: {e}") return False if __name__ == "__main__": # 创建logs目录 if not os.path.exists('logs'): os.makedirs('logs') # 检查命令行参数 import sys if len(sys.argv) > 1: output_filename = sys.argv[1] else: output_filename = 'approved_articles_export.csv' # 测试数据库连接 print("正在测试数据库连接...") if not test_db_connection(): print("数据库连接失败,请检查数据库配置。") print("请确认以下信息:") print("- 数据库服务器是否正常运行") print("- 数据库地址、用户名、密码是否正确") print("- 网络连接是否正常") print("- 用户是否有查询ai_articles表的权限") exit(1) export_approved_articles_to_csv(output_filename)