初始提交:文字匹配图片项目
This commit is contained in:
137
export_approved_articles.py
Normal file
137
export_approved_articles.py
Normal file
@@ -0,0 +1,137 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
导出审核通过的文章内容和标签到CSV文件
|
||||
此脚本将从ai_articles表中导出status为approved的文章内容和标签
|
||||
"""
|
||||
|
||||
import csv
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime
|
||||
from log_config import setup_logger
|
||||
|
||||
|
||||
def export_approved_articles_to_csv(output_file='approved_articles_export.csv'):
|
||||
"""
|
||||
导出审核通过的文章内容和标签到CSV文件
|
||||
|
||||
Args:
|
||||
output_file: 输出的CSV文件名
|
||||
"""
|
||||
# 设置日志记录器
|
||||
logger = setup_logger('article_export', 'logs/article_export.log', 'logs/article_export_error.log')
|
||||
|
||||
try:
|
||||
# 从数据库获取真实数据
|
||||
from database_config import db_manager
|
||||
|
||||
# 查询审核通过的文章,包含内容和标签
|
||||
sql = """
|
||||
SELECT id, title, content, coze_tag, created_at, updated_at
|
||||
FROM ai_articles
|
||||
WHERE status = 'approved'
|
||||
ORDER BY id
|
||||
"""
|
||||
|
||||
logger.info("开始查询审核通过的文章数据...")
|
||||
results = db_manager.execute_query(sql)
|
||||
|
||||
if not results:
|
||||
logger.warning("没有找到状态为approved的文章")
|
||||
print("没有找到状态为approved的文章")
|
||||
return
|
||||
|
||||
logger.info(f"查询到 {len(results)} 条审核通过的文章")
|
||||
print(f"查询到 {len(results)} 条审核通过的文章")
|
||||
|
||||
# 准备输出目录
|
||||
output_dir = os.path.dirname(output_file)
|
||||
if output_dir and not os.path.exists(output_dir):
|
||||
os.makedirs(output_dir)
|
||||
|
||||
# 写入CSV文件
|
||||
with open(output_file, 'w', newline='', encoding='utf-8-sig') as csvfile:
|
||||
fieldnames = ['ID', '标题', '内容', '标签', '创建时间', '更新时间']
|
||||
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
||||
|
||||
# 写入表头
|
||||
writer.writeheader()
|
||||
|
||||
# 写入数据
|
||||
for row in results:
|
||||
id_val, title, content, coze_tag, created_at, updated_at = row
|
||||
|
||||
# 尝试解析标签,如果是JSON格式则转换为字符串
|
||||
parsed_tags = coze_tag
|
||||
if coze_tag:
|
||||
try:
|
||||
# 尝试解析JSON格式的标签
|
||||
tags_data = json.loads(coze_tag)
|
||||
if isinstance(tags_data, list):
|
||||
parsed_tags = ', '.join(tags_data)
|
||||
elif isinstance(tags_data, dict):
|
||||
# 如果是字典格式,提取值
|
||||
parsed_tags = ', '.join(str(v) for v in tags_data.values())
|
||||
except json.JSONDecodeError:
|
||||
# 如果不是JSON格式,保持原样
|
||||
parsed_tags = coze_tag
|
||||
|
||||
writer.writerow({
|
||||
'ID': id_val,
|
||||
'标题': title,
|
||||
'内容': content,
|
||||
'标签': parsed_tags or '',
|
||||
'创建时间': created_at.strftime('%Y-%m-%d %H:%M:%S') if created_at else '',
|
||||
'更新时间': updated_at.strftime('%Y-%m-%d %H:%M:%S') if updated_at else ''
|
||||
})
|
||||
|
||||
logger.info(f"成功导出 {len(results)} 条文章到 {output_file}")
|
||||
print(f"成功导出 {len(results)} 条文章到 {output_file}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"导出文章数据时发生错误: {e}", exc_info=True)
|
||||
print(f"导出文章数据时发生错误: {e}")
|
||||
raise
|
||||
|
||||
|
||||
def test_db_connection():
|
||||
"""
|
||||
测试数据库连接
|
||||
"""
|
||||
try:
|
||||
from database_config import db_manager
|
||||
# 尝试执行一个简单的查询来测试连接
|
||||
test_sql = "SELECT 1 as test"
|
||||
result = db_manager.execute_query(test_sql)
|
||||
print("数据库连接测试成功:", result)
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"数据库连接测试失败: {e}")
|
||||
return False
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 创建logs目录
|
||||
if not os.path.exists('logs'):
|
||||
os.makedirs('logs')
|
||||
|
||||
# 检查命令行参数
|
||||
import sys
|
||||
if len(sys.argv) > 1:
|
||||
output_filename = sys.argv[1]
|
||||
else:
|
||||
output_filename = 'approved_articles_export.csv'
|
||||
|
||||
# 测试数据库连接
|
||||
print("正在测试数据库连接...")
|
||||
if not test_db_connection():
|
||||
print("数据库连接失败,请检查数据库配置。")
|
||||
print("请确认以下信息:")
|
||||
print("- 数据库服务器是否正常运行")
|
||||
print("- 数据库地址、用户名、密码是否正确")
|
||||
print("- 网络连接是否正常")
|
||||
print("- 用户是否有查询ai_articles表的权限")
|
||||
exit(1)
|
||||
|
||||
export_approved_articles_to_csv(output_filename)
|
||||
Reference in New Issue
Block a user