Files
ai_image_quary/export_author_stats.py

192 lines
6.6 KiB
Python
Raw Normal View History

"""
导出作者发文统计数据到CSV
"""
import pandas as pd
import logging
import argparse
from database_config import DatabaseManager
from datetime import datetime, date
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def export_author_review_stats(db_manager, target_date, output_file):
"""
导出作者发文审核统计包含所有审核状态
Args:
db_manager: 数据库管理器实例
target_date: 目标日期格式'2026-01-21'
output_file: 输出CSV文件路径
Returns:
导出的记录数
"""
sql = """
SELECT
a.`author_id`,
COALESCE(a.`author_name`, auth.`author_name`) as `author_name`,
COUNT(*) as `today_published_count`
FROM `ai_articles` a
LEFT JOIN `ai_authors` auth ON a.`author_id` = auth.`id`
WHERE
a.`channel` = 4
AND a.`status` IN('published', 'published_review', 'pending_review', 'failed')
AND DATE(a.`updated_at`) = %s
GROUP BY a.`author_id`, COALESCE(a.`author_name`, auth.`author_name`)
HAVING COUNT(*) > 0
ORDER BY `today_published_count` DESC
"""
try:
logger.info(f"正在查询发文审核统计数据(日期:{target_date}...")
result = db_manager.execute_query(sql, (target_date,))
if not result:
logger.warning("未查询到数据")
return 0
# 转换为DataFrame
df = pd.DataFrame(result, columns=['author_id', 'author_name', 'today_published_count'])
# 导出到CSV
df.to_csv(output_file, index=False, encoding='utf-8-sig')
logger.info(f"成功导出 {len(df)} 条记录到: {output_file}")
# 显示前5条预览
logger.info(f"\n数据预览前5条:\n{df.head()}")
return len(df)
except Exception as e:
logger.error(f"导出发文审核统计失败: {e}", exc_info=True)
raise
def export_author_published_stats(db_manager, target_date, output_file):
"""
导出作者发文成功统计仅published状态
Args:
db_manager: 数据库管理器实例
target_date: 目标日期格式'2026-01-21'
output_file: 输出CSV文件路径
Returns:
导出的记录数
"""
sql = """
SELECT
a.`author_id`,
COALESCE(a.`author_name`, auth.`author_name`) as `author_name`,
COUNT(*) as `today_published_count`,
COALESCE(auth.`daily_post_max`, 0) as `daily_post_max`,
(COALESCE(auth.`daily_post_max`, 0) - COUNT(*)) as `gap`
FROM `ai_articles` a
LEFT JOIN `ai_authors` auth ON a.`author_id` = auth.`id`
WHERE
a.`channel` = 4
AND a.`status` = 'published'
AND DATE(a.`updated_at`) = %s
GROUP BY a.`author_id`, COALESCE(a.`author_name`, auth.`author_name`), auth.`daily_post_max`
HAVING COUNT(*) > 0
ORDER BY `gap` ASC
"""
try:
logger.info(f"正在查询发文成功统计数据(日期:{target_date}...")
result = db_manager.execute_query(sql, (target_date,))
if not result:
logger.warning("未查询到数据")
return 0
# 转换为DataFrame
df = pd.DataFrame(result, columns=['author_id', 'author_name', 'today_published_count', 'daily_post_max', 'gap'])
# 导出到CSV
df.to_csv(output_file, index=False, encoding='utf-8-sig')
logger.info(f"成功导出 {len(df)} 条记录到: {output_file}")
# 显示前5条预览
logger.info(f"\n数据预览前5条:\n{df.head()}")
return len(df)
except Exception as e:
logger.error(f"导出发文成功统计失败: {e}", exc_info=True)
raise
def main():
"""主函数"""
# 解析命令行参数
parser = argparse.ArgumentParser(description='导出作者发文统计数据到CSV')
parser.add_argument('--host', default='8.149.233.36', help='数据库主机')
parser.add_argument('--port', type=int, default=3306, help='数据库端口')
parser.add_argument('--user', default='ai_articles_read', help='数据库用户名')
parser.add_argument('--password', default='7aK_H2yvokVumr84lLNDt8fDBp6P', help='数据库密码')
parser.add_argument('--database', default='ai_article', help='数据库名')
parser.add_argument('--date', default=date.today().strftime('%Y-%m-%d'), help='目标日期格式YYYY-MM-DD')
parser.add_argument('--output-dir', default='./exports', help='输出目录')
args = parser.parse_args()
# 创建输出目录
import os
os.makedirs(args.output_dir, exist_ok=True)
# 创建数据库连接配置
db_config = {
'host': args.host,
'port': args.port,
'user': args.user,
'password': args.password,
'database': args.database,
'charset': 'utf8mb4'
}
logger.info("=" * 60)
logger.info("开始导出作者发文统计数据")
logger.info(f"数据库配置: {args.user}@{args.host}:{args.port}/{args.database}")
logger.info(f"目标日期: {args.date}")
logger.info(f"输出目录: {args.output_dir}")
logger.info("=" * 60)
# 创建数据库管理器
db_manager = DatabaseManager(db_config)
try:
# 生成输出文件名
review_file = f"{args.output_dir}/author_review_stats_{args.date}.csv"
published_file = f"{args.output_dir}/author_published_stats_{args.date}.csv"
# 1. 导出发文审核统计
logger.info("\n[任务1] 导出发文审核统计(所有状态)")
review_count = export_author_review_stats(db_manager, args.date, review_file)
# 2. 导出发文成功统计
logger.info("\n[任务2] 导出发文成功统计published状态")
published_count = export_author_published_stats(db_manager, args.date, published_file)
logger.info("\n" + "=" * 60)
logger.info("✓ 导出完成!")
logger.info(f"发文审核统计: {review_count} 条 -> {review_file}")
logger.info(f"发文成功统计: {published_count} 条 -> {published_file}")
logger.info("=" * 60)
except Exception as e:
logger.error(f"✗ 导出过程出错: {e}", exc_info=True)
logger.info("=" * 60)
logger.info("✗ 导出失败")
logger.info("=" * 60)
if __name__ == '__main__':
main()