""" 导出作者发文统计数据到CSV """ import pandas as pd import logging import argparse from database_config import DatabaseManager from datetime import datetime, date # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def export_author_review_stats(db_manager, target_date, output_file): """ 导出作者发文审核统计(包含所有审核状态) Args: db_manager: 数据库管理器实例 target_date: 目标日期,格式:'2026-01-21' output_file: 输出CSV文件路径 Returns: 导出的记录数 """ sql = """ SELECT a.`author_id`, COALESCE(a.`author_name`, auth.`author_name`) as `author_name`, COUNT(*) as `today_published_count` FROM `ai_articles` a LEFT JOIN `ai_authors` auth ON a.`author_id` = auth.`id` WHERE a.`channel` = 4 AND a.`status` IN('published', 'published_review', 'pending_review', 'failed') AND DATE(a.`updated_at`) = %s GROUP BY a.`author_id`, COALESCE(a.`author_name`, auth.`author_name`) HAVING COUNT(*) > 0 ORDER BY `today_published_count` DESC """ try: logger.info(f"正在查询发文审核统计数据(日期:{target_date})...") result = db_manager.execute_query(sql, (target_date,)) if not result: logger.warning("未查询到数据") return 0 # 转换为DataFrame df = pd.DataFrame(result, columns=['author_id', 'author_name', 'today_published_count']) # 导出到CSV df.to_csv(output_file, index=False, encoding='utf-8-sig') logger.info(f"成功导出 {len(df)} 条记录到: {output_file}") # 显示前5条预览 logger.info(f"\n数据预览(前5条):\n{df.head()}") return len(df) except Exception as e: logger.error(f"导出发文审核统计失败: {e}", exc_info=True) raise def export_author_published_stats(db_manager, target_date, output_file): """ 导出作者发文成功统计(仅published状态) Args: db_manager: 数据库管理器实例 target_date: 目标日期,格式:'2026-01-21' output_file: 输出CSV文件路径 Returns: 导出的记录数 """ sql = """ SELECT a.`author_id`, COALESCE(a.`author_name`, auth.`author_name`) as `author_name`, COUNT(*) as `today_published_count`, COALESCE(auth.`daily_post_max`, 0) as `daily_post_max`, (COALESCE(auth.`daily_post_max`, 0) - COUNT(*)) as `gap` FROM `ai_articles` a LEFT JOIN `ai_authors` auth ON a.`author_id` = auth.`id` WHERE a.`channel` = 4 AND a.`status` = 'published' AND DATE(a.`updated_at`) = %s GROUP BY a.`author_id`, COALESCE(a.`author_name`, auth.`author_name`), auth.`daily_post_max` HAVING COUNT(*) > 0 ORDER BY `gap` ASC """ try: logger.info(f"正在查询发文成功统计数据(日期:{target_date})...") result = db_manager.execute_query(sql, (target_date,)) if not result: logger.warning("未查询到数据") return 0 # 转换为DataFrame df = pd.DataFrame(result, columns=['author_id', 'author_name', 'today_published_count', 'daily_post_max', 'gap']) # 导出到CSV df.to_csv(output_file, index=False, encoding='utf-8-sig') logger.info(f"成功导出 {len(df)} 条记录到: {output_file}") # 显示前5条预览 logger.info(f"\n数据预览(前5条):\n{df.head()}") return len(df) except Exception as e: logger.error(f"导出发文成功统计失败: {e}", exc_info=True) raise def main(): """主函数""" # 解析命令行参数 parser = argparse.ArgumentParser(description='导出作者发文统计数据到CSV') parser.add_argument('--host', default='8.149.233.36', help='数据库主机') parser.add_argument('--port', type=int, default=3306, help='数据库端口') parser.add_argument('--user', default='ai_articles_read', help='数据库用户名') parser.add_argument('--password', default='7aK_H2yvokVumr84lLNDt8fDBp6P', help='数据库密码') parser.add_argument('--database', default='ai_article', help='数据库名') parser.add_argument('--date', default=date.today().strftime('%Y-%m-%d'), help='目标日期,格式:YYYY-MM-DD') parser.add_argument('--output-dir', default='./exports', help='输出目录') args = parser.parse_args() # 创建输出目录 import os os.makedirs(args.output_dir, exist_ok=True) # 创建数据库连接配置 db_config = { 'host': args.host, 'port': args.port, 'user': args.user, 'password': args.password, 'database': args.database, 'charset': 'utf8mb4' } logger.info("=" * 60) logger.info("开始导出作者发文统计数据") logger.info(f"数据库配置: {args.user}@{args.host}:{args.port}/{args.database}") logger.info(f"目标日期: {args.date}") logger.info(f"输出目录: {args.output_dir}") logger.info("=" * 60) # 创建数据库管理器 db_manager = DatabaseManager(db_config) try: # 生成输出文件名 review_file = f"{args.output_dir}/author_review_stats_{args.date}.csv" published_file = f"{args.output_dir}/author_published_stats_{args.date}.csv" # 1. 导出发文审核统计 logger.info("\n[任务1] 导出发文审核统计(所有状态)") review_count = export_author_review_stats(db_manager, args.date, review_file) # 2. 导出发文成功统计 logger.info("\n[任务2] 导出发文成功统计(published状态)") published_count = export_author_published_stats(db_manager, args.date, published_file) logger.info("\n" + "=" * 60) logger.info("✓ 导出完成!") logger.info(f"发文审核统计: {review_count} 条 -> {review_file}") logger.info(f"发文成功统计: {published_count} 条 -> {published_file}") logger.info("=" * 60) except Exception as e: logger.error(f"✗ 导出过程出错: {e}", exc_info=True) logger.info("=" * 60) logger.info("✗ 导出失败") logger.info("=" * 60) if __name__ == '__main__': main()