#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 更新day_revenue脚本 功能:从CSV文件读取数据,只更新ai_statistics_days表中的day_revenue字段 """ import os import sys import csv from typing import List, Dict from decimal import Decimal sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from database_config import DatabaseManager class DayRevenueUpdater: """day_revenue字段更新器""" def __init__(self): """初始化""" self.db_manager = DatabaseManager() self.script_dir = os.path.dirname(os.path.abspath(__file__)) self.csv_file = os.path.join(self.script_dir, 'ai_statistics_days.csv') print(f"[初始化] CSV文件: {self.csv_file}") def read_csv_data(self) -> List[Dict]: """读取CSV文件数据 Returns: 数据列表 """ if not os.path.exists(self.csv_file): print(f"[X] CSV文件不存在: {self.csv_file}") return [] try: with open(self.csv_file, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) data = list(reader) print(f"[OK] 读取到 {len(data)} 条记录") return data except Exception as e: print(f"[X] 读取CSV失败: {e}") import traceback traceback.print_exc() return [] def update_day_revenue(self, batch_size: int = 50) -> bool: """更新day_revenue字段 Args: batch_size: 批量更新大小 Returns: 是否成功 """ # 读取CSV数据 csv_data = self.read_csv_data() if not csv_data: print("[!] 没有数据需要更新") return False print(f"\n[开始] 更新day_revenue字段...") # 准备批量更新 update_sql = """ UPDATE ai_statistics_days SET day_revenue = %s, updated_at = NOW() WHERE author_id = %s AND stat_date = %s AND channel = %s """ success_count = 0 failed_count = 0 not_found_count = 0 # 逐条更新 for idx, row in enumerate(csv_data, 1): try: author_id = int(row.get('author_id', 0)) stat_date = row.get('stat_date', '') channel = int(row.get('channel', 1)) day_revenue = Decimal(row.get('day_revenue', '0.00')) # 执行更新 affected_rows = self.db_manager.execute_update( update_sql, (day_revenue, author_id, stat_date, channel) ) if affected_rows > 0: success_count += 1 print(f" [{idx}/{len(csv_data)}] ✓ 更新成功: author_id={author_id}, stat_date={stat_date}, day_revenue={day_revenue}") else: not_found_count += 1 print(f" [{idx}/{len(csv_data)}] - 未找到记录: author_id={author_id}, stat_date={stat_date}") except Exception as e: failed_count += 1 print(f" [{idx}/{len(csv_data)}] ✗ 更新失败: {e}") print(f" 数据: {row}") # 输出统计 print(f"\n{'='*70}") print(f"更新完成") print(f"{'='*70}") print(f"成功更新: {success_count}/{len(csv_data)}") print(f"未找到记录: {not_found_count}/{len(csv_data)}") print(f"更新失败: {failed_count}/{len(csv_data)}") print(f"{'='*70}") return failed_count == 0 def main(): """主函数""" print("\n" + "="*70) print("day_revenue字段批量更新工具") print("="*70) print("功能:从 ai_statistics_days.csv 读取数据,只更新数据库中的 day_revenue 字段") print("="*70) try: updater = DayRevenueUpdater() # 确认执行 confirm = input("\n是否开始更新? (y/n): ").strip().lower() if confirm != 'y': print("已取消") return 0 success = updater.update_day_revenue() return 0 if success else 1 except Exception as e: print(f"\n[X] 程序执行出错: {e}") import traceback traceback.print_exc() return 1 if __name__ == '__main__': sys.exit(main())