Files
baijiahao_text_to_image/export_approved_articles.py

137 lines
4.8 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
导出审核通过的文章内容和标签到CSV文件
此脚本将从ai_articles表中导出status为approved的文章内容和标签
"""
import csv
import json
import os
from datetime import datetime
from log_config import setup_logger
def export_approved_articles_to_csv(output_file='approved_articles_export.csv'):
"""
导出审核通过的文章内容和标签到CSV文件
Args:
output_file: 输出的CSV文件名
"""
# 设置日志记录器
logger = setup_logger('article_export', 'logs/article_export.log', 'logs/article_export_error.log')
try:
# 从数据库获取真实数据
from database_config import db_manager
# 查询审核通过的文章,包含内容和标签
sql = """
SELECT id, title, content, coze_tag, created_at, updated_at
FROM ai_articles
WHERE status = 'approved'
ORDER BY id
"""
logger.info("开始查询审核通过的文章数据...")
results = db_manager.execute_query(sql)
if not results:
logger.warning("没有找到状态为approved的文章")
print("没有找到状态为approved的文章")
return
logger.info(f"查询到 {len(results)} 条审核通过的文章")
print(f"查询到 {len(results)} 条审核通过的文章")
# 准备输出目录
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
# 写入CSV文件
with open(output_file, 'w', newline='', encoding='utf-8-sig') as csvfile:
fieldnames = ['ID', '标题', '内容', '标签', '创建时间', '更新时间']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# 写入表头
writer.writeheader()
# 写入数据
for row in results:
id_val, title, content, coze_tag, created_at, updated_at = row
# 尝试解析标签如果是JSON格式则转换为字符串
parsed_tags = coze_tag
if coze_tag:
try:
# 尝试解析JSON格式的标签
tags_data = json.loads(coze_tag)
if isinstance(tags_data, list):
parsed_tags = ', '.join(tags_data)
elif isinstance(tags_data, dict):
# 如果是字典格式,提取值
parsed_tags = ', '.join(str(v) for v in tags_data.values())
except json.JSONDecodeError:
# 如果不是JSON格式保持原样
parsed_tags = coze_tag
writer.writerow({
'ID': id_val,
'标题': title,
'内容': content,
'标签': parsed_tags or '',
'创建时间': created_at.strftime('%Y-%m-%d %H:%M:%S') if created_at else '',
'更新时间': updated_at.strftime('%Y-%m-%d %H:%M:%S') if updated_at else ''
})
logger.info(f"成功导出 {len(results)} 条文章到 {output_file}")
print(f"成功导出 {len(results)} 条文章到 {output_file}")
except Exception as e:
logger.error(f"导出文章数据时发生错误: {e}", exc_info=True)
print(f"导出文章数据时发生错误: {e}")
raise
def test_db_connection():
"""
测试数据库连接
"""
try:
from database_config import db_manager
# 尝试执行一个简单的查询来测试连接
test_sql = "SELECT 1 as test"
result = db_manager.execute_query(test_sql)
print("数据库连接测试成功:", result)
return True
except Exception as e:
print(f"数据库连接测试失败: {e}")
return False
if __name__ == "__main__":
# 创建logs目录
if not os.path.exists('logs'):
os.makedirs('logs')
# 检查命令行参数
import sys
if len(sys.argv) > 1:
output_filename = sys.argv[1]
else:
output_filename = 'approved_articles_export.csv'
# 测试数据库连接
print("正在测试数据库连接...")
if not test_db_connection():
print("数据库连接失败,请检查数据库配置。")
print("请确认以下信息:")
print("- 数据库服务器是否正常运行")
print("- 数据库地址、用户名、密码是否正确")
print("- 网络连接是否正常")
print("- 用户是否有查询ai_articles表的权限")
exit(1)
export_approved_articles_to_csv(output_filename)