Files
ai_image_quary/export_author_stats.py

192 lines
6.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
导出作者发文统计数据到CSV
"""
import pandas as pd
import logging
import argparse
from database_config import DatabaseManager
from datetime import datetime, date
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def export_author_review_stats(db_manager, target_date, output_file):
"""
导出作者发文审核统计(包含所有审核状态)
Args:
db_manager: 数据库管理器实例
target_date: 目标日期,格式:'2026-01-21'
output_file: 输出CSV文件路径
Returns:
导出的记录数
"""
sql = """
SELECT
a.`author_id`,
COALESCE(a.`author_name`, auth.`author_name`) as `author_name`,
COUNT(*) as `today_published_count`
FROM `ai_articles` a
LEFT JOIN `ai_authors` auth ON a.`author_id` = auth.`id`
WHERE
a.`channel` = 4
AND a.`status` IN('published', 'published_review', 'pending_review', 'failed')
AND DATE(a.`updated_at`) = %s
GROUP BY a.`author_id`, COALESCE(a.`author_name`, auth.`author_name`)
HAVING COUNT(*) > 0
ORDER BY `today_published_count` DESC
"""
try:
logger.info(f"正在查询发文审核统计数据(日期:{target_date}...")
result = db_manager.execute_query(sql, (target_date,))
if not result:
logger.warning("未查询到数据")
return 0
# 转换为DataFrame
df = pd.DataFrame(result, columns=['author_id', 'author_name', 'today_published_count'])
# 导出到CSV
df.to_csv(output_file, index=False, encoding='utf-8-sig')
logger.info(f"成功导出 {len(df)} 条记录到: {output_file}")
# 显示前5条预览
logger.info(f"\n数据预览前5条:\n{df.head()}")
return len(df)
except Exception as e:
logger.error(f"导出发文审核统计失败: {e}", exc_info=True)
raise
def export_author_published_stats(db_manager, target_date, output_file):
"""
导出作者发文成功统计仅published状态
Args:
db_manager: 数据库管理器实例
target_date: 目标日期,格式:'2026-01-21'
output_file: 输出CSV文件路径
Returns:
导出的记录数
"""
sql = """
SELECT
a.`author_id`,
COALESCE(a.`author_name`, auth.`author_name`) as `author_name`,
COUNT(*) as `today_published_count`,
COALESCE(auth.`daily_post_max`, 0) as `daily_post_max`,
(COALESCE(auth.`daily_post_max`, 0) - COUNT(*)) as `gap`
FROM `ai_articles` a
LEFT JOIN `ai_authors` auth ON a.`author_id` = auth.`id`
WHERE
a.`channel` = 4
AND a.`status` = 'published'
AND DATE(a.`updated_at`) = %s
GROUP BY a.`author_id`, COALESCE(a.`author_name`, auth.`author_name`), auth.`daily_post_max`
HAVING COUNT(*) > 0
ORDER BY `gap` ASC
"""
try:
logger.info(f"正在查询发文成功统计数据(日期:{target_date}...")
result = db_manager.execute_query(sql, (target_date,))
if not result:
logger.warning("未查询到数据")
return 0
# 转换为DataFrame
df = pd.DataFrame(result, columns=['author_id', 'author_name', 'today_published_count', 'daily_post_max', 'gap'])
# 导出到CSV
df.to_csv(output_file, index=False, encoding='utf-8-sig')
logger.info(f"成功导出 {len(df)} 条记录到: {output_file}")
# 显示前5条预览
logger.info(f"\n数据预览前5条:\n{df.head()}")
return len(df)
except Exception as e:
logger.error(f"导出发文成功统计失败: {e}", exc_info=True)
raise
def main():
"""主函数"""
# 解析命令行参数
parser = argparse.ArgumentParser(description='导出作者发文统计数据到CSV')
parser.add_argument('--host', default='8.149.233.36', help='数据库主机')
parser.add_argument('--port', type=int, default=3306, help='数据库端口')
parser.add_argument('--user', default='ai_articles_read', help='数据库用户名')
parser.add_argument('--password', default='7aK_H2yvokVumr84lLNDt8fDBp6P', help='数据库密码')
parser.add_argument('--database', default='ai_article', help='数据库名')
parser.add_argument('--date', default=date.today().strftime('%Y-%m-%d'), help='目标日期格式YYYY-MM-DD')
parser.add_argument('--output-dir', default='./exports', help='输出目录')
args = parser.parse_args()
# 创建输出目录
import os
os.makedirs(args.output_dir, exist_ok=True)
# 创建数据库连接配置
db_config = {
'host': args.host,
'port': args.port,
'user': args.user,
'password': args.password,
'database': args.database,
'charset': 'utf8mb4'
}
logger.info("=" * 60)
logger.info("开始导出作者发文统计数据")
logger.info(f"数据库配置: {args.user}@{args.host}:{args.port}/{args.database}")
logger.info(f"目标日期: {args.date}")
logger.info(f"输出目录: {args.output_dir}")
logger.info("=" * 60)
# 创建数据库管理器
db_manager = DatabaseManager(db_config)
try:
# 生成输出文件名
review_file = f"{args.output_dir}/author_review_stats_{args.date}.csv"
published_file = f"{args.output_dir}/author_published_stats_{args.date}.csv"
# 1. 导出发文审核统计
logger.info("\n[任务1] 导出发文审核统计(所有状态)")
review_count = export_author_review_stats(db_manager, args.date, review_file)
# 2. 导出发文成功统计
logger.info("\n[任务2] 导出发文成功统计published状态")
published_count = export_author_published_stats(db_manager, args.date, published_file)
logger.info("\n" + "=" * 60)
logger.info("✓ 导出完成!")
logger.info(f"发文审核统计: {review_count} 条 -> {review_file}")
logger.info(f"发文成功统计: {published_count} 条 -> {published_file}")
logger.info("=" * 60)
except Exception as e:
logger.error(f"✗ 导出过程出错: {e}", exc_info=True)
logger.info("=" * 60)
logger.info("✗ 导出失败")
logger.info("=" * 60)
if __name__ == '__main__':
main()