#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 数据整合导出工具 从 bjh_integrated_data.json 中读取数据并导出为三个数据库表对应的CSV文件: 1. ai_statistics.csv - 账号汇总统计表 2. ai_statistics_day.csv - 每日明细统计表 3. ai_statistics_days.csv - 核心指标统计表(含发文量、收益、环比) """ import sys import os import json import csv from datetime import datetime from typing import Dict, List, Optional from decimal import Decimal # 导入数据库配置 try: from database_config import DatabaseManager, DB_CONFIG except ImportError: DatabaseManager = None DB_CONFIG = None # 设置UTF-8编码 if sys.platform == 'win32': import io if not isinstance(sys.stdout, io.TextIOWrapper) or sys.stdout.encoding != 'utf-8': sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') if not isinstance(sys.stderr, io.TextIOWrapper) or sys.stderr.encoding != 'utf-8': sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) try: from log_config import setup_export_csv_logger except ImportError: # 如果没有日志模块,使用print替代 def setup_export_csv_logger(): class DummyLogger: def info(self, msg): print(msg) def error(self, msg): print(msg) def debug(self, msg): pass return DummyLogger() class DataExporter: """数据导出器""" def __init__(self, use_database: bool = False, db_config: Optional[Dict] = None): """初始化导出器 Args: use_database: 是否使用数据库模式(True=直接插入数据库,False=导出CSV) db_config: 数据库配置,默认使用database_config.DB_CONFIG """ # 初始化日志 self.logger = setup_export_csv_logger() self.script_dir = os.path.dirname(os.path.abspath(__file__)) # 使用新的整合数据文件 self.integrated_file = os.path.join(self.script_dir, "bjh_integrated_data.json") # 输出文件路径(对应三个数据库表) self.output_ai_statistics = os.path.join(self.script_dir, "ai_statistics.csv") self.output_ai_statistics_day = os.path.join(self.script_dir, "ai_statistics_day.csv") self.output_ai_statistics_days = os.path.join(self.script_dir, "ai_statistics_days.csv") # 数据库模式 self.use_database = use_database self.db_manager = None if use_database: if DatabaseManager is None: print("[X] 数据库模块未安装,请检查 database_config.py") self.use_database = False else: self.db_manager = DatabaseManager(db_config) print("[配置] 已启用数据库模式") else: # CSV模式下也需要连接数据库以查询author_id if DatabaseManager is not None: try: self.db_manager = DatabaseManager(db_config) print("[配置] CSV模式 - 已连接数据库以查询author_id") except Exception as e: print(f"[!] 数据库连接失败,author_id将设为0: {e}") self.db_manager = None # 缓存author_id映射(author_name -> author_id) self.author_id_cache = {} def get_author_id(self, author_name: str) -> int: """获取作者ID Args: author_name: 作者名称 Returns: 作者ID,未找到返回0 """ # 先从缓存中查找 if author_name in self.author_id_cache: return self.author_id_cache[author_name] # 如果没有数据库连接,返回0 if not self.db_manager: return 0 # 从数据库查询 try: sql = "SELECT id FROM ai_authors WHERE author_name = %s AND channel = 1 LIMIT 1" result = self.db_manager.execute_query(sql, (author_name,), fetch_one=True) if result: author_id = result.get('id', 0) # 缓存结果 self.author_id_cache[author_name] = author_id return author_id else: # 未找到,缓存为0 self.author_id_cache[author_name] = 0 return 0 except Exception as e: print(f" [!] 查询author_id失败: {e}") return 0 def query_article_count_from_db(self, author_id: int, stat_date: str) -> Dict: """从ai_articles表查询文章相关数据 Args: author_id: 作者ID stat_date: 统计日期 (YYYY-MM-DD) Returns: 包含单日发文量、累计发文量(当月)和投稿量的字典 """ result = { 'daily_published_count': 0, 'cumulative_published_count': 0, 'total_submission_count': 0 } if not self.db_manager or author_id == 0: print(f" [数据库] 未连接或author_id无效,无法查询文章数据") return result try: from datetime import datetime # 解析统计日期 target_date = datetime.strptime(stat_date, '%Y-%m-%d') stat_date_str = target_date.strftime('%Y-%m-%d') # 计算当月第一天 month_first_day = target_date.replace(day=1).strftime('%Y-%m-%d') # 查询单日发文量(当天状态为published的文章数) daily_sql = """ SELECT COUNT(*) as count FROM ai_articles WHERE author_id = %s AND DATE(publish_time) = %s AND status = 'published' AND channel = 1 """ daily_result = self.db_manager.execute_query( daily_sql, (author_id, stat_date_str), fetch_one=True, dict_cursor=True ) if daily_result: result['daily_published_count'] = int(daily_result.get('count', 0) or 0) print(f" [数据库] 单日发文量 ({stat_date_str}): {result['daily_published_count']}") # 查询累计发文量(当月第一天到stat_date的发文量) cumulative_sql = """ SELECT COUNT(*) as count FROM ai_articles WHERE author_id = %s AND DATE(publish_time) >= %s AND DATE(publish_time) <= %s AND status = 'published' AND channel = 1 """ cumulative_result = self.db_manager.execute_query( cumulative_sql, (author_id, month_first_day, stat_date_str), fetch_one=True, dict_cursor=True ) if cumulative_result: result['cumulative_published_count'] = int(cumulative_result.get('count', 0) or 0) print(f" [数据库] 累计发文量 ({month_first_day}至{stat_date_str}): {result['cumulative_published_count']}") # 查询投稿量(当天的所有文章数,包括各种状态) # total_submission_count 用于 ai_statistics_day 表 submission_sql = """ SELECT COUNT(*) as count FROM ai_articles WHERE author_id = %s AND DATE(publish_time) = %s AND status = 'published' AND channel = 1 """ submission_result = self.db_manager.execute_query( submission_sql, (author_id, stat_date_str), fetch_one=True, dict_cursor=True ) if submission_result: result['total_submission_count'] = int(submission_result.get('count', 0) or 0) print(f" [数据库] 投稿量 ({stat_date_str}): {result['total_submission_count']}") return result except Exception as e: print(f" [!] 从数据库查询文章数据失败: {e}") import traceback traceback.print_exc() return result def calculate_weekly_article_count_from_db(self, author_id: int, stat_date: str) -> int: """从ai_articles表计算当周发文量(周一至周日) Args: author_id: 作者ID stat_date: 统计日期 (YYYY-MM-DD) Returns: 当周发文量 """ if not self.db_manager or author_id == 0: print(f" [数据库] 未连接或author_id无效,无法计算当周发文量") return 0 try: from datetime import datetime, timedelta # 解析日期 target_date = datetime.strptime(stat_date, '%Y-%m-%d') # 计算本周一的日期(weekday: 0=周一, 6=周日) weekday = target_date.weekday() monday = target_date - timedelta(days=weekday) sunday = monday + timedelta(days=6) monday_str = monday.strftime('%Y-%m-%d') sunday_str = sunday.strftime('%Y-%m-%d') # 查询当周发文量 sql = """ SELECT COUNT(*) as count FROM ai_articles WHERE author_id = %s AND DATE(publish_time) >= %s AND DATE(publish_time) <= %s AND status = 'published' AND channel = 1 """ result = self.db_manager.execute_query( sql, (author_id, monday_str, sunday_str), fetch_one=True, dict_cursor=True ) if result: weekly_count = int(result.get('count', 0) or 0) print(f" [数据库] 当周发文量 ({monday_str} 至 {sunday_str}): {weekly_count}") return weekly_count else: print(f" [数据库] 未找到当周数据") return 0 except Exception as e: print(f" [!] 从数据库计算当周发文量失败: {e}") return 0 def calculate_weekly_revenue_from_db(self, author_id: int, stat_date: str) -> float: """从ai_statistics_days表汇总计算当周收益(周一至周日) 基于day_revenue字段进行汇总计算 Args: author_id: 作者ID stat_date: 统计日期 (YYYY-MM-DD) Returns: 当周收益总额 """ if not self.db_manager or author_id == 0: print(f" [数据库] 未连接或author_id无效,无法计算当周收益") return 0.0 try: from datetime import datetime, timedelta # 解析日期 target_date = datetime.strptime(stat_date, '%Y-%m-%d') # 计算本周一的日期(weekday: 0=周一, 6=周日) weekday = target_date.weekday() monday = target_date - timedelta(days=weekday) sunday = monday + timedelta(days=6) monday_str = monday.strftime('%Y-%m-%d') sunday_str = sunday.strftime('%Y-%m-%d') print(f" [调试] 目标日期: {stat_date}, 周一: {monday_str}, 周日: {sunday_str}") # 查询数据库中本周的day_revenue总和 sql = """ SELECT SUM(day_revenue) as weekly_total, COUNT(*) as day_count FROM ai_statistics_days WHERE author_id = %s AND stat_date >= %s AND stat_date <= %s AND channel = 1 """ result = self.db_manager.execute_query( sql, (author_id, monday_str, sunday_str), fetch_one=True, dict_cursor=True ) print(f" [调试] 查询结果: {result}") if result and result.get('weekly_total') is not None: weekly_total = float(result['weekly_total'] or 0) day_count = int(result.get('day_count', 0) or 0) print(f" [数据库] 当周收益 ({monday_str} 至 {sunday_str}): ¥{weekly_total:.2f} (基于{day_count}天的数据)") return weekly_total else: print(f" [数据库] 未找到当周数据 ({monday_str} 至 {sunday_str}),返回0") return 0.0 except Exception as e: print(f" [!] 从数据库计算当周收益失败: {e}") return 0.0 def calculate_last_week_revenue_from_db(self, author_id: int, stat_date: str) -> float: """从ai_statistics_days表汇总计算上周收益(上周一至上周日) Args: author_id: 作者ID stat_date: 统计日期 (YYYY-MM-DD) Returns: 上周收益总额 """ if not self.db_manager or author_id == 0: return 0.0 try: from datetime import datetime, timedelta target_date = datetime.strptime(stat_date, '%Y-%m-%d') weekday = target_date.weekday() # 本周一 this_monday = target_date - timedelta(days=weekday) # 上周一 = 本周一 - 7天 last_monday = this_monday - timedelta(days=7) # 上周日 = 上周一 + 6天 last_sunday = last_monday + timedelta(days=6) last_monday_str = last_monday.strftime('%Y-%m-%d') last_sunday_str = last_sunday.strftime('%Y-%m-%d') sql = """ SELECT SUM(day_revenue) as weekly_total FROM ai_statistics_days WHERE author_id = %s AND stat_date >= %s AND stat_date <= %s AND channel = 1 """ result = self.db_manager.execute_query( sql, (author_id, last_monday_str, last_sunday_str), fetch_one=True, dict_cursor=True ) if result and result.get('weekly_total') is not None: last_week_total = float(result['weekly_total'] or 0) print(f" [数据库] 上周收益 ({last_monday_str} 至 {last_sunday_str}): ¥{last_week_total:.2f}") return last_week_total else: print(f" [数据库] 未找到上周数据 ({last_monday_str} 至 {last_sunday_str})") return 0.0 except Exception as e: print(f" [!] 从数据库计算上周收益失败: {e}") return 0.0 def calculate_last_month_revenue_from_db(self, author_id: int, stat_date: str) -> float: """从ai_statistics_days表汇总计算上月收益 Args: author_id: 作者ID stat_date: 统计日期 (YYYY-MM-DD) Returns: 上月收益总额 """ if not self.db_manager or author_id == 0: return 0.0 try: from datetime import datetime from dateutil.relativedelta import relativedelta target_date = datetime.strptime(stat_date, '%Y-%m-%d') # 上个月的第一天 last_month_first = (target_date.replace(day=1) - relativedelta(months=1)) # 上个月的最后一天 last_month_last = target_date.replace(day=1) - relativedelta(days=1) last_month_first_str = last_month_first.strftime('%Y-%m-%d') last_month_last_str = last_month_last.strftime('%Y-%m-%d') sql = """ SELECT SUM(day_revenue) as monthly_total FROM ai_statistics_days WHERE author_id = %s AND stat_date >= %s AND stat_date <= %s AND channel = 1 """ result = self.db_manager.execute_query( sql, (author_id, last_month_first_str, last_month_last_str), fetch_one=True, dict_cursor=True ) if result and result.get('monthly_total') is not None: last_month_total = float(result['monthly_total'] or 0) print(f" [数据库] 上月收益 ({last_month_first_str} 至 {last_month_last_str}): ¥{last_month_total:.2f}") return last_month_total else: print(f" [数据库] 未找到上月数据 ({last_month_first_str} 至 {last_month_last_str})") return 0.0 except Exception as e: print(f" [!] 从数据库计算上月收益失败: {e}") return 0.0 def load_integrated_data(self) -> List[Dict]: """加载整合数据文件 bjh_integrated_data.json""" try: if os.path.exists(self.integrated_file): with open(self.integrated_file, 'r', encoding='utf-8') as f: data = json.load(f) print(f"[OK] 加载整合数据文件: {self.integrated_file}") print(f" 共 {len(data)} 个账号") return data else: print(f"[!] 未找到整合数据文件: {self.integrated_file}") return [] except Exception as e: print(f"[X] 加载整合数据失败: {e}") import traceback traceback.print_exc() return [] def extract_total_metrics(self, account_data: Dict) -> Dict: """从整合数据中提取汇总指标(对应 ai_statistics 表)""" metrics = { 'submission_count': 0, 'read_count': 0, 'comment_count': 0, 'comment_rate': 0.0, 'like_count': 0, 'like_rate': 0.0, 'favorite_count': 0, 'favorite_rate': 0.0, 'share_count': 0, 'share_rate': 0.0, 'slide_ratio': 0.0, 'baidu_search_volume': 0, } try: analytics = account_data.get('analytics', {}) apis = analytics.get('apis', []) if apis and len(apis) > 0: api_data = apis[0].get('data', {}) if api_data.get('errno') == 0: total_info = api_data.get('data', {}).get('total_info', {}) if total_info: metrics['submission_count'] = int(total_info.get('publish_count', 0) or 0) metrics['read_count'] = int(total_info.get('view_count', 0) or 0) metrics['comment_count'] = int(total_info.get('comment_count', 0) or 0) metrics['comment_rate'] = float(total_info.get('comment_rate', 0) or 0) metrics['like_count'] = int(total_info.get('likes_count', 0) or 0) metrics['like_rate'] = float(total_info.get('likes_rate', 0) or 0) metrics['favorite_count'] = int(total_info.get('collect_count', 0) or 0) metrics['favorite_rate'] = float(total_info.get('collect_rate', 0) or 0) metrics['share_count'] = int(total_info.get('share_count', 0) or 0) metrics['share_rate'] = float(total_info.get('share_rate', 0) or 0) metrics['slide_ratio'] = float(total_info.get('pic_slide_rate', 0) or 0) metrics['baidu_search_volume'] = int(total_info.get('disp_pv', 0) or 0) # 修正:使用disp_pv except Exception as e: print(f" [!] 提取汇总指标失败: {e}") return metrics def extract_income_metrics(self, account_data: Dict, account_id: str = '') -> Dict: """从整合数据中提取收入指标 注意: - weekly_revenue: 不再从API获取,在export_ai_statistics_days中从数据库计算 - monthly_revenue: 使用currentMonth(当前自然月收益) - day_revenue: 从yesterday提取昨日收益(当日收益) - revenue_wow_growth_rate: 周环比,从数据库计算(本周 vs 上周) - revenue_mom_growth_rate: 月环比,从数据库计算(当月 vs 上月) Args: account_data: 账号整合数据 account_id: 账号ID(用于获取当周收益) """ metrics = { 'weekly_revenue': 0.0, # 将在导出时从数据库计算 'monthly_revenue': 0.0, 'day_revenue': 0.0, # 当日收益 'revenue_wow_growth_rate': 0.0, # 周环比,将在导出时从数据库计算 'revenue_mom_growth_rate': 0.0, # 月环比,将在导出时从数据库计算 } try: income = account_data.get('income', {}) if income and income.get('errno') == 0: income_data = income.get('data', {}).get('income', {}) # 从yesterday获取当日收益(day_revenue) yesterday = income_data.get('yesterday', {}) if yesterday: metrics['day_revenue'] = float(yesterday.get('income', 0) or 0) print(f" 当日收益: ¥{metrics['day_revenue']:.2f}") # weekly_revenue 不再从API获取,在导出时从数据库的day_revenue汇总计算 print(f" 当周收益: 将从数据库计算") # 周环比和月环比不再从API获取,将在导出时从数据库计算 # 这里保持为0,由export_ai_statistics_days方法计算 print(f" 环比增长率: 将从数据库计算") # 当前自然月收入(currentMonth) current_month = income_data.get('currentMonth', {}) if current_month: metrics['monthly_revenue'] = float(current_month.get('income', 0) or 0) except Exception as e: print(f" [!] 提取收入指标失败: {e}") return metrics def export_ai_statistics(self, integrated_data: List[Dict]) -> bool: """导出 ai_statistics 表数据(账号汇总统计)""" try: print("\n" + "="*70) print("开始导出 ai_statistics 表数据") print("="*70) csv_rows = [] for account_data in integrated_data: account_id = account_data.get('account_id', '') if not account_id: continue print(f"\n处理账号: {account_id}") # 获取作者ID author_id = self.get_author_id(account_id) # 提取统计指标 metrics = self.extract_total_metrics(account_data) # 获取最新日期 latest_date = datetime.now().strftime('%Y-%m-%d') analytics = account_data.get('analytics', {}) apis = analytics.get('apis', []) if apis and len(apis) > 0: api_data = apis[0].get('data', {}) if api_data.get('errno') == 0: latest_event_day = api_data.get('data', {}).get('latest_event_day', '') if latest_event_day: # 格式化:20251216 -> 2025-12-16 date_str = str(latest_event_day) latest_date = f"{date_str[0:4]}-{date_str[4:6]}-{date_str[6:8]}" row = { 'author_id': author_id, # 使用从数据库查询的author_id 'author_name': account_id, 'channel': 1, # 1=baidu 'date': latest_date, 'submission_count': metrics['submission_count'], 'read_count': metrics['read_count'], 'comment_count': metrics['comment_count'], 'comment_rate': f"{metrics['comment_rate']:.4f}", 'like_count': metrics['like_count'], 'like_rate': f"{metrics['like_rate']:.4f}", 'favorite_count': metrics['favorite_count'], 'favorite_rate': f"{metrics['favorite_rate']:.4f}", 'share_count': metrics['share_count'], 'share_rate': f"{metrics['share_rate']:.4f}", 'slide_ratio': f"{metrics['slide_ratio']:.4f}", 'baidu_search_volume': metrics['baidu_search_volume'], } csv_rows.append(row) print(f" 投稿量: {row['submission_count']}") print(f" 阅读量: {row['read_count']}") # 写入CSV if csv_rows: fieldnames = [ 'author_id', 'author_name', 'channel', 'date', 'submission_count', 'read_count', 'comment_count', 'comment_rate', 'like_count', 'like_rate', 'favorite_count', 'favorite_rate', 'share_count', 'share_rate', 'slide_ratio', 'baidu_search_volume' ] with open(self.output_ai_statistics, 'w', encoding='utf-8-sig', newline='') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(csv_rows) print(f"\n{'='*70}") print(f"[OK] ai_statistics 表数据已导出到: {self.output_ai_statistics}") print(f" 共 {len(csv_rows)} 条记录") print(f"{'='*70}") return True else: print("\n[!] 没有数据可导出") return False except Exception as e: print(f"\n[X] 导出 ai_statistics 失败: {e}") import traceback traceback.print_exc() return False def export_ai_statistics_day(self, integrated_data: List[Dict], use_db_article_count: bool = True) -> bool: """导出 ai_statistics_day 表数据(每日明细统计) Args: integrated_data: 整合数据列表 use_db_article_count: 是否使用数据库查询文章数据(默认True) """ try: print("\n" + "="*70) print("开始导出 ai_statistics_day 表数据") print("="*70) csv_rows = [] for account_data in integrated_data: account_id = account_data.get('account_id', '') if not account_id: continue print(f"\n处理账号: {account_id}") # 获取作者ID author_id = self.get_author_id(account_id) # 获取每日数据列表 analytics = account_data.get('analytics', {}) apis = analytics.get('apis', []) daily_list = [] if apis and len(apis) > 0: api_data = apis[0].get('data', {}) if api_data.get('errno') == 0: daily_list = api_data.get('data', {}).get('list', []) if daily_list: print(f" 找到 {len(daily_list)} 天的数据") # 只取最新一天的数据 # 按日期排序,取最后一条(最新) daily_list_sorted = sorted(daily_list, key=lambda x: x.get('event_day', '')) latest_day_data = daily_list_sorted[-1] if daily_list_sorted else None if latest_day_data: event_day = latest_day_data.get('event_day', '') if event_day: # 格式化日期 date_str = str(event_day) formatted_date = f"{date_str[0:4]}-{date_str[4:6]}-{date_str[6:8]}" # 优先从数据库查询投稿量 total_submission_count = int(latest_day_data.get('publish_count', 0) or 0) if use_db_article_count and author_id > 0: article_data = self.query_article_count_from_db(author_id, formatted_date) total_submission_count = article_data['total_submission_count'] print(f" [使用数据库] 投稿量: {total_submission_count}") else: print(f" [使用API] 投稿量: {total_submission_count}") row = { 'author_id': author_id, 'author_name': account_id, 'channel': 1, 'stat_date': formatted_date, 'total_submission_count': total_submission_count, 'total_read_count': int(latest_day_data.get('view_count', 0) or 0), 'total_comment_count': int(latest_day_data.get('comment_count', 0) or 0), 'total_like_count': int(latest_day_data.get('likes_count', 0) or 0), 'total_favorite_count': int(latest_day_data.get('collect_count', 0) or 0), 'total_share_count': int(latest_day_data.get('share_count', 0) or 0), 'avg_comment_rate': f"{float(latest_day_data.get('comment_rate', 0) or 0):.4f}", 'avg_like_rate': f"{float(latest_day_data.get('likes_rate', 0) or 0):.4f}", 'avg_favorite_rate': f"{float(latest_day_data.get('collect_rate', 0) or 0):.4f}", 'avg_share_rate': f"{float(latest_day_data.get('share_rate', 0) or 0):.4f}", 'avg_slide_ratio': f"{float(latest_day_data.get('pic_slide_rate', 0) or 0):.4f}", 'total_baidu_search_volume': int(latest_day_data.get('disp_pv', 0) or 0), } csv_rows.append(row) print(f" 最新日期: {formatted_date}") else: print(f" 未找到有效的最新数据") else: print(f" 未找到每日数据") # 写入CSV if csv_rows: fieldnames = [ 'author_id', 'author_name', 'channel', 'stat_date', 'total_submission_count', 'total_read_count', 'total_comment_count', 'total_like_count', 'total_favorite_count', 'total_share_count', 'avg_comment_rate', 'avg_like_rate', 'avg_favorite_rate', 'avg_share_rate', 'avg_slide_ratio', 'total_baidu_search_volume' ] with open(self.output_ai_statistics_day, 'w', encoding='utf-8-sig', newline='') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(csv_rows) print(f"\n{'='*70}") print(f"[OK] ai_statistics_day 表数据已导出到: {self.output_ai_statistics_day}") print(f" 共 {len(csv_rows)} 条记录") print(f"{'='*70}") return True else: print("\n[!] 没有数据可导出") return False except Exception as e: print(f"\n[X] 导出 ai_statistics_day 失败: {e}") import traceback traceback.print_exc() return False def export_ai_statistics_days(self, integrated_data: List[Dict], use_db_article_count: bool = True, use_db_weekly_revenue: bool = True) -> bool: """导出ai_statistics_days表数据(核心指标:发文量、收益、环比) 注意: - daily_published_count: 优先从ai_articles表查询,否则使用API数据 - cumulative_published_count: 优先从ai_articles表查询(从起始日到stat_date的累计发文量) - monthly_revenue: stat_date所在自然月的总收益(使用近30天收益作为近似值) - weekly_revenue: 优先从ai_statistics_days表汇总计算,否则使用API数据 Args: integrated_data: 整合数据列表 use_db_article_count: 是否使用数据库查询文章数据(默认True) use_db_weekly_revenue: 是否使用数据库计算当周收益(默认True) """ try: print("\n" + "="*70) print("开始导出 ai_statistics_days 表数据") print("="*70) csv_rows = [] for account_data in integrated_data: account_id = account_data.get('account_id', '') if not account_id: continue print(f"\n处理账号: {account_id}") # 获取作者ID author_id = self.get_author_id(account_id) # 获取每日数据列表 analytics = account_data.get('analytics', {}) apis = analytics.get('apis', []) daily_list = [] if apis and len(apis) > 0: api_data = apis[0].get('data', {}) if api_data.get('errno') == 0: daily_list = api_data.get('data', {}).get('list', []) # 提取收入指标(传入账号ID以支持当周收益) income_metrics = self.extract_income_metrics(account_data, account_id) # 处理每日数据 if daily_list: print(f" 找到 {len(daily_list)} 天的数据") # 按日期排序(从早到晚) daily_list_sorted = sorted(daily_list, key=lambda x: x.get('event_day', '')) # 逐日累计发文量(API数据,仅当数据库无法查询时使用) cumulative_count = 0 for day_data in daily_list_sorted: daily_published = int(day_data.get('publish_count', 0) or 0) cumulative_count += daily_published # 只取最新一天的数据 latest_day_data = daily_list_sorted[-1] if daily_list_sorted else None if latest_day_data: event_day = latest_day_data.get('event_day', '') if event_day: # 格式化日期 date_str = str(event_day) formatted_date = f"{date_str[0:4]}-{date_str[4:6]}-{date_str[6:8]}" # 优先从数据库查询文章数据 if use_db_article_count and author_id > 0: article_data = self.query_article_count_from_db(author_id, formatted_date) daily_published = article_data['daily_published_count'] cumulative_count = article_data['cumulative_published_count'] print(f" [使用数据库] 文章数据: 单日={daily_published}, 累计={cumulative_count}") else: # 使用API数据 daily_published = int(latest_day_data.get('publish_count', 0) or 0) print(f" [使用API] 文章数据: 单日={daily_published}, 累计={cumulative_count}") # 计算当周收益:数据库中本周已有的收益 + 当日新抓取的收益 if use_db_weekly_revenue and author_id > 0: # 从数据库查询本周已有的收益(不包括今天,因为今天的数据还没导入) weekly_revenue_db = self.calculate_weekly_revenue_from_db(author_id, formatted_date) # 当周收益 = 数据库中的历史收益 + 当日新抓取的收益 day_revenue = income_metrics['day_revenue'] weekly_revenue_total = weekly_revenue_db + day_revenue income_metrics['weekly_revenue'] = weekly_revenue_total print(f" [数据库] 本周已有收益: ¥{weekly_revenue_db:.2f}") print(f" [API] 当日新增收益: ¥{day_revenue:.2f}") print(f" [计算] 当周总收益: ¥{weekly_revenue_total:.2f}") # 计算周环比:本周 vs 上周 last_week_revenue = self.calculate_last_week_revenue_from_db(author_id, formatted_date) if last_week_revenue > 0: income_metrics['revenue_wow_growth_rate'] = (weekly_revenue_total - last_week_revenue) / last_week_revenue print(f" [计算] 周环比: {income_metrics['revenue_wow_growth_rate']:.2%} (本周¥{weekly_revenue_total:.2f} vs 上周¥{last_week_revenue:.2f})") else: print(f" [计算] 周环比: 无法计算(上周没有数据)") # 计算月环比:当月 vs 上月 last_month_revenue = self.calculate_last_month_revenue_from_db(author_id, formatted_date) monthly_revenue = income_metrics['monthly_revenue'] if last_month_revenue > 0: income_metrics['revenue_mom_growth_rate'] = (monthly_revenue - last_month_revenue) / last_month_revenue print(f" [计算] 月环比: {income_metrics['revenue_mom_growth_rate']:.2%} (当月¥{monthly_revenue:.2f} vs 上月¥{last_month_revenue:.2f})") else: print(f" [计算] 月环比: 无法计算(上月没有数据)") else: # 如果不使用数据库,weekly_revenue = 当日收益 income_metrics['weekly_revenue'] = income_metrics['day_revenue'] print(f" [跳过数据库] 当周收益 = 当日收益: ¥{income_metrics['day_revenue']:.2f}") row = { 'author_id': author_id, 'author_name': account_id, 'channel': 1, 'stat_date': formatted_date, 'daily_published_count': daily_published, 'cumulative_published_count': cumulative_count, 'day_revenue': f"{income_metrics['day_revenue']:.2f}", 'monthly_revenue': f"{income_metrics['monthly_revenue']:.2f}", 'weekly_revenue': f"{income_metrics['weekly_revenue']:.2f}", 'revenue_mom_growth_rate': f"{income_metrics['revenue_mom_growth_rate']:.6f}", 'revenue_wow_growth_rate': f"{income_metrics['revenue_wow_growth_rate']:.6f}", } csv_rows.append(row) print(f" 最新日期: {formatted_date}") print(f" 累计发文量: {cumulative_count}") else: print(f" 未找到有效的最新数据") else: # 即使没有每日数据,也导出收入信息 row = { 'author_id': author_id, 'author_name': account_id, 'channel': 1, 'stat_date': datetime.now().strftime('%Y-%m-%d'), 'daily_published_count': 0, 'cumulative_published_count': 0, 'day_revenue': f"{income_metrics['day_revenue']:.2f}", # 新增:每日收益 'monthly_revenue': f"{income_metrics['monthly_revenue']:.2f}", 'weekly_revenue': f"{income_metrics['weekly_revenue']:.2f}", 'revenue_mom_growth_rate': f"{income_metrics['revenue_mom_growth_rate']:.6f}", 'revenue_wow_growth_rate': f"{income_metrics['revenue_wow_growth_rate']:.6f}", } csv_rows.append(row) print(f" 未找到每日数据,使用默认值") # 写入CSV if csv_rows: fieldnames = [ 'author_id', 'author_name', 'channel', 'stat_date', 'daily_published_count', 'cumulative_published_count', 'day_revenue', # 新增:每日收益 'monthly_revenue', 'weekly_revenue', 'revenue_mom_growth_rate', 'revenue_wow_growth_rate' ] with open(self.output_ai_statistics_days, 'w', encoding='utf-8-sig', newline='') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(csv_rows) print(f"\n{'='*70}") print(f"[OK] ai_statistics_days 表数据已导出到: {self.output_ai_statistics_days}") print(f" 共 {len(csv_rows)} 条记录") print(f"{'='*70}") return True else: print("\n[!] 没有数据可导出") return False except Exception as e: print(f"\n[X] 导出 ai_statistics_days 失败: {e}") import traceback traceback.print_exc() return False def merge_and_export(self, analytics_data: Dict, income_data: Dict) -> bool: """整合数据并导出为CSV Args: analytics_data: 发文统计数据 income_data: 收入数据 Returns: bool: 是否成功导出 """ try: # 准备CSV数据 csv_rows = [] # 收集所有账号ID account_ids = set() # 从发文统计中收集账号 if isinstance(analytics_data, list): for item in analytics_data: account_ids.add(item.get('account_id', '')) elif isinstance(analytics_data, dict): account_ids.update(analytics_data.keys()) # 从收入数据中收集账号 if isinstance(income_data, dict): account_ids.update(income_data.keys()) # 移除空字符串 account_ids.discard('') print(f"\n找到 {len(account_ids)} 个账号") # 为每个账号整合数据 for account_id in sorted(account_ids): print(f"\n处理账号: {account_id}") row = { '账号ID': account_id, '更新时间': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), } # 旧方法已废弃,此部分代码不再使用 analytics_metrics = {} daily_list = [] # 计算单日发文量(平均值)和累计发文量 if daily_list: total_publish = 0 valid_days = 0 for day_data in daily_list: publish_count = day_data.get('publish_count', '') if publish_count != '' and publish_count != '0': try: total_publish += int(publish_count) valid_days += 1 except (ValueError, TypeError): pass # 累计发文量:7天内发文总数(从投稿量字段获取) analytics_metrics['累计发文量'] = analytics_metrics.get('投稿量', '0') # 单日发文量:平均值 if valid_days > 0: avg_daily = total_publish / valid_days analytics_metrics['单日发文量'] = f"{avg_daily:.2f}" else: analytics_metrics['单日发文量'] = "0" else: analytics_metrics['单日发文量'] = "0" analytics_metrics['累计发文量'] = analytics_metrics.get('投稿量', '0') # 提取收入数据 income_metrics = {} if account_id in income_data: income_metrics = self.extract_income_metrics(income_data[account_id]) # 合并指标 row.update(analytics_metrics) row.update(income_metrics) csv_rows.append(row) # 显示数据预览 print(f" 投稿量: {row.get('投稿量', '无')}") print(f" 当周收益: ¥{row.get('当周收益', '0')}") print(f" 当月收益: ¥{row.get('当月收益', '0')}") # 写入CSV文件 if csv_rows: # 定义列顺序 fieldnames = [ '账号ID', '更新时间', # 发文统计 '投稿量', '阅读量', '评论量', '评论率', '点赞量', '点赞率', '收藏量', '收藏率', '分享量', '分享率', '滑图占比', '百度搜索量', '单日发文量', '累计发文量', '曝光量', '点击率', # 收入数据 '当周收益', '当月收益', '收益周环比', '收益月环比', ] # 旧方法已废弃 return False else: print("\n[!] 没有数据可导出") return False except Exception as e: print(f"\n[X] 导出失败: {e}") import traceback traceback.print_exc() return False def export_all_tables(self) -> bool: """导出所有三个表的数据 Returns: bool: 是否成功 """ if self.use_database: print("\n" + "="*70) print("从 bjh_integrated_data.json 导入数据到数据库") print("="*70) else: print("\n" + "="*70) print("从 bjh_integrated_data.json 导出数据到三个表") print("="*70) # 加载整合数据 integrated_data = self.load_integrated_data() if not integrated_data: print("\n[X] 没有可用的数据") return False # 导出三个表 if self.use_database: # 数据库模式:直接插入数据库 result1 = self.insert_ai_statistics(integrated_data) result2 = self.insert_ai_statistics_day(integrated_data) result3 = self.insert_ai_statistics_days(integrated_data) else: # CSV模式:导出CSV文件 result1 = self.export_ai_statistics(integrated_data) result2 = self.export_ai_statistics_day(integrated_data) result3 = self.export_ai_statistics_days(integrated_data) return result1 and result2 and result3 def export_daily_data(self, analytics_data: Dict) -> bool: """导出每日明细数据(对应ai_statistics_day表) Args: analytics_data: 发文统计数据 Returns: bool: 是否成功导出 """ try: print("\n" + "="*70) print("开始导出每日明细数据") print("="*70) # 准备CSV数据 csv_rows = [] # 收集所有账号ID account_ids = set() if isinstance(analytics_data, list): for item in analytics_data: account_ids.add(item.get('account_id', '')) elif isinstance(analytics_data, dict): account_ids.update(analytics_data.keys()) account_ids.discard('') print(f"\n找到 {len(account_ids)} 个账号") # 为每个账号提取每日数据 for account_id in sorted(account_ids): print(f"\n处理账号: {account_id}") # 获取每日数据列表 daily_list = [] if isinstance(analytics_data, list): for item in analytics_data: if item.get('account_id') == account_id: data = item.get('data', {}) api_data = data.get('api_data', {}) if 'apis' in api_data: for api_item in api_data['apis']: api_resp = api_item.get('data', {}) if api_resp.get('errno') == 0: daily_list = api_resp.get('data', {}).get('list', []) break break elif isinstance(analytics_data, dict) and account_id in analytics_data: data = analytics_data[account_id].get('data', {}) api_data = data.get('api_data', {}) if 'apis' in api_data: for api_item in api_data['apis']: api_resp = api_item.get('data', {}) if api_resp.get('errno') == 0: daily_list = api_resp.get('data', {}).get('list', []) break # 处理每日数据 if daily_list: print(f" 找到 {len(daily_list)} 天的数据") for day_data in daily_list: event_day = day_data.get('event_day', '') if not event_day: continue # 格式化日期:20251215 -> 2025-12-15 try: date_str = str(event_day) formatted_date = f"{date_str[0:4]}-{date_str[4:6]}-{date_str[6:8]}" except: formatted_date = event_day row = { '账号ID': account_id, '统计日期': formatted_date, '投稿量': day_data.get('publish_count', '0'), '阅读量': day_data.get('view_count', '0'), '评论量': day_data.get('comment_count', '0'), '评论率': day_data.get('comment_rate', '0'), '点赞量': day_data.get('likes_count', '0'), '点赞率': day_data.get('likes_rate', '0'), '收藏量': day_data.get('collect_count', '0'), '收藏率': day_data.get('collect_rate', '0'), '分享量': day_data.get('share_count', '0'), '分享率': day_data.get('share_rate', '0'), '滑图占比': day_data.get('pic_slide_rate', '0'), '百度搜索量': day_data.get('search_click_pv', '0'), '曝光量': day_data.get('disp_pv', '0'), '点击率': day_data.get('click_rate', '0'), } csv_rows.append(row) else: print(f" 未找到每日数据") # 写入CSV文件 if csv_rows: # 定义列顺序 fieldnames = [ '账号ID', '统计日期', '投稿量', '阅读量', '评论量', '评论率', '点赞量', '点赞率', '收藏量', '收藏率', '分享量', '分享率', '滑图占比', '百度搜索量', '曝光量', '点击率', ] # 旧方法已废弃 return False else: print("\n[!] 没有每日数据可导出") return False except Exception as e: print(f"\n[X] 导出每日数据失败: {e}") import traceback traceback.print_exc() return False def export_core_metrics(self, analytics_data: Dict, income_data: Dict) -> bool: """导出核心指标数据(对应ai_statistics_days表) 包含:单日发文量、累计发文量、当周收益、当月收益、收益环比 Args: analytics_data: 发文统计数据 income_data: 收入数据 Returns: bool: 是否成功导出 """ try: print("\n" + "="*70) print("开始导出核心指标数据(ai_statistics_days表)") print("="*70) # 准备CSV数据 csv_rows = [] # 收集所有账号ID account_ids = set() if isinstance(analytics_data, list): for item in analytics_data: account_ids.add(item.get('account_id', '')) elif isinstance(analytics_data, dict): account_ids.update(analytics_data.keys()) if isinstance(income_data, dict): account_ids.update(income_data.keys()) account_ids.discard('') print(f"\n找到 {len(account_ids)} 个账号") # 为每个账号提取核心指标 for account_id in sorted(account_ids): print(f"\n处理账号: {account_id}") # 获取每日数据列表 daily_list = [] if isinstance(analytics_data, list): for item in analytics_data: if item.get('account_id') == account_id: data = item.get('data', {}) api_data = data.get('api_data', {}) if 'apis' in api_data: for api_item in api_data['apis']: api_resp = api_item.get('data', {}) if api_resp.get('errno') == 0: daily_list = api_resp.get('data', {}).get('list', []) break break elif isinstance(analytics_data, dict) and account_id in analytics_data: data = analytics_data[account_id].get('data', {}) api_data = data.get('api_data', {}) if 'apis' in api_data: for api_item in api_data['apis']: api_resp = api_item.get('data', {}) if api_resp.get('errno') == 0: daily_list = api_resp.get('data', {}).get('list', []) break # 获取收入数据 income_metrics = {} if account_id in income_data: income_metrics = self.extract_income_metrics(income_data[account_id]) # 计算累计发文量(7天内的总发文量) cumulative_count = 0 if daily_list: for day_data in daily_list: publish_count = day_data.get('publish_count', '') if publish_count != '' and publish_count != '0': try: cumulative_count += int(publish_count) except (ValueError, TypeError): pass # 处理每日数据 if daily_list: print(f" 找到 {len(daily_list)} 天的数据") for day_data in daily_list: event_day = day_data.get('event_day', '') if not event_day: continue # 格式化日期:20251215 -> 2025-12-15 try: date_str = str(event_day) formatted_date = f"{date_str[0:4]}-{date_str[4:6]}-{date_str[6:8]}" except: formatted_date = event_day # 单日发文量 daily_published = day_data.get('publish_count', '0') row = { '账号ID': account_id, '统计日期': formatted_date, '单日发文量': daily_published, '累计发文量': str(cumulative_count), '当周收益': income_metrics.get('当周收益', '0'), '当月收益': income_metrics.get('当月收益', '0'), '收益周环比': income_metrics.get('收益周环比', '0.000000'), '收益月环比': income_metrics.get('收益月环比', '0.000000'), } csv_rows.append(row) else: # 即使没有每日数据,也导出收入信息 row = { '账号ID': account_id, '统计日期': datetime.now().strftime('%Y-%m-%d'), '单日发文量': '0', '累计发文量': '0', '当周收益': income_metrics.get('当周收益', '0'), '当月收益': income_metrics.get('当月收益', '0'), '收益周环比': income_metrics.get('收益周环比', '0.000000'), '收益月环比': income_metrics.get('收益月环比', '0.000000'), } csv_rows.append(row) print(f" 未找到每日数据,使用默认值") # 写入CSV文件 if csv_rows: # 定义列顺序(对应ai_statistics_days表) fieldnames = [ '账号ID', '统计日期', '单日发文量', '累计发文量', '当周收益', '当月收益', '收益周环比', '收益月环比', ] # 旧方法已废弃 return False else: print("\n[!] 没有核心指标数据可导出") return False except Exception as e: print(f"\n[X] 导出核心指标数据失败: {e}") import traceback traceback.print_exc() return False def insert_ai_statistics(self, integrated_data: List[Dict]) -> bool: """插入 ai_statistics 表数据(账号汇总统计)到数据库""" try: print("\n" + "="*70) print("开始插入 ai_statistics 表数据") print("="*70) if not self.db_manager: print("[X] 数据库管理器未初始化") return False success_count = 0 for account_data in integrated_data: account_id = account_data.get('account_id', '') if not account_id: continue print(f"\n处理账号: {account_id}") # 获取作者ID author_id = self.get_author_id(account_id) # 提取统计指标 metrics = self.extract_total_metrics(account_data) # 获取最新日期 latest_date = datetime.now().strftime('%Y-%m-%d') analytics = account_data.get('analytics', {}) if analytics: apis = analytics.get('apis', []) if apis and len(apis) > 0: api_data = apis[0].get('data', {}) if api_data.get('errno') == 0: data_info = api_data.get('data', {}) latest_event_day = data_info.get('latest_event_day', '') if latest_event_day: date_str = str(latest_event_day) latest_date = f"{date_str[0:4]}-{date_str[4:6]}-{date_str[6:8]}" # 滑图占比需要限制在decimal(5,4)范围内(0-9.9999) slide_ratio_value = float(metrics['slide_ratio']) # 如果值大于10,说明是百分比格式,需要除以100 if slide_ratio_value > 10: slide_ratio_value = slide_ratio_value / 100 # 确保不超过9.9999 slide_ratio_value = min(slide_ratio_value, 9.9999) # 准备数据库记录 record = { 'author_id': author_id, # 使用从数据库查询的author_id 'author_name': account_id, 'channel': 1, 'date': latest_date, # ai_statistics表使用date字段,不是stat_date 'submission_count': metrics['submission_count'], 'read_count': metrics['read_count'], 'comment_count': metrics['comment_count'], 'comment_rate': metrics['comment_rate'], 'like_count': metrics['like_count'], 'like_rate': metrics['like_rate'], 'favorite_count': metrics['favorite_count'], 'favorite_rate': metrics['favorite_rate'], 'share_count': metrics['share_count'], 'share_rate': metrics['share_rate'], 'slide_ratio': slide_ratio_value, # ai_statistics表使用slide_ratio字段,并限制范围 'baidu_search_volume': metrics['baidu_search_volume'], } # 插入或更新 try: self.db_manager.insert_or_update( table='ai_statistics', data=record, unique_keys=['author_name', 'channel'] # 使用author_name作为唯一键 ) success_count += 1 print(f" [OK] 插入成功") except Exception as e: print(f" [X] 插入失败: {e}") print(f"\n{'='*70}") print(f"[OK] ai_statistics 表数据插入完成") print(f" 成功 {success_count}/{len(integrated_data)} 条记录") print(f"{'='*70}") return success_count > 0 except Exception as e: print(f"\n[X] 插入 ai_statistics 失败: {e}") import traceback traceback.print_exc() return False def insert_ai_statistics_day(self, integrated_data: List[Dict], use_db_article_count: bool = True) -> bool: """插入 ai_statistics_day 表数据(每日明细统计)到数据库 Args: integrated_data: 整合数据列表 use_db_article_count: 是否使用数据库查询文章数据(默认True) """ try: print("\n" + "="*70) print("开始插入 ai_statistics_day 表数据") print("="*70) if not self.db_manager: print("[X] 数据库管理器未初始化") return False success_count = 0 for account_data in integrated_data: account_id = account_data.get('account_id', '') if not account_id: continue print(f"\n处理账号: {account_id}") # 获取作者ID author_id = self.get_author_id(account_id) analytics = account_data.get('analytics', {}) apis = analytics.get('apis', []) if apis and len(apis) > 0: api_data = apis[0].get('data', {}) if api_data.get('errno') == 0: daily_list = api_data.get('data', {}).get('list', []) if daily_list: print(f" 找到 {len(daily_list)} 天的数据") # 只取最新一天的数据 daily_list_sorted = sorted(daily_list, key=lambda x: x.get('event_day', '')) latest_day_data = daily_list_sorted[-1] if daily_list_sorted else None if latest_day_data: event_day = latest_day_data.get('event_day', '') if event_day: # 格式化日期 date_str = str(event_day) formatted_date = f"{date_str[0:4]}-{date_str[4:6]}-{date_str[6:8]}" # 优先从数据库查询投稿量 total_submission_count = int(latest_day_data.get('publish_count', 0) or 0) if use_db_article_count and author_id > 0: article_data = self.query_article_count_from_db(author_id, formatted_date) total_submission_count = article_data['total_submission_count'] print(f" [使用数据库] 投稿量: {total_submission_count}") else: print(f" [使用API] 投稿量: {total_submission_count}") # 滑图占比需要限制在decimal(5,4)范围内(0-9.9999) slide_ratio_value = float(latest_day_data.get('pic_slide_rate', 0) or 0) # 如果值大于10,说明是百分比格式,需要除以100 if slide_ratio_value > 10: slide_ratio_value = slide_ratio_value / 100 # 确保不超过9.9999 slide_ratio_value = min(slide_ratio_value, 9.9999) record = { 'author_id': author_id, 'author_name': account_id, 'channel': 1, 'stat_date': formatted_date, 'total_submission_count': total_submission_count, 'total_read_count': int(latest_day_data.get('view_count', 0) or 0), 'total_comment_count': int(latest_day_data.get('comment_count', 0) or 0), 'total_like_count': int(latest_day_data.get('likes_count', 0) or 0), 'total_favorite_count': int(latest_day_data.get('collect_count', 0) or 0), 'total_share_count': int(latest_day_data.get('share_count', 0) or 0), 'avg_comment_rate': float(latest_day_data.get('comment_rate', 0) or 0), 'avg_like_rate': float(latest_day_data.get('likes_rate', 0) or 0), 'avg_favorite_rate': float(latest_day_data.get('collect_rate', 0) or 0), 'avg_share_rate': float(latest_day_data.get('share_rate', 0) or 0), 'avg_slide_ratio': slide_ratio_value, 'total_baidu_search_volume': int(latest_day_data.get('disp_pv', 0) or 0), } # 插入或更新 try: self.db_manager.insert_or_update( table='ai_statistics_day', data=record, unique_keys=['author_name', 'channel'] # 使用author_name作为唯一键 ) success_count += 1 print(f" [OK] 插入最新一天: {formatted_date}") except Exception as e: print(f" [X] 插入失败: {e}") else: print(f" 未找到每日数据") print(f"\n{'='*70}") print(f"[OK] ai_statistics_day 表数据插入完成") print(f" 成功 {success_count} 条记录") print(f"{'='*70}") return success_count > 0 except Exception as e: print(f"\n[X] 插入 ai_statistics_day 失败: {e}") import traceback traceback.print_exc() return False def insert_ai_statistics_days(self, integrated_data: List[Dict]) -> bool: """插入 ai_statistics_days 表数据(核心指标统计)到数据库""" try: print("\n" + "="*70) print("开始插入 ai_statistics_days 表数据") print("="*70) if not self.db_manager: print("[X] 数据库管理器未初始化") return False success_count = 0 for account_data in integrated_data: account_id = account_data.get('account_id', '') if not account_id: continue print(f"\n处理账号: {account_id}") # 获取作者ID author_id = self.get_author_id(account_id) # 提取收入指标 income_metrics = self.extract_income_metrics(account_data, account_id) analytics = account_data.get('analytics', {}) apis = analytics.get('apis', []) if apis and len(apis) > 0: api_data = apis[0].get('data', {}) if api_data.get('errno') == 0: daily_list = api_data.get('data', {}).get('list', []) if daily_list: print(f" 找到 {len(daily_list)} 天的数据") # 按日期排序 daily_list_sorted = sorted(daily_list, key=lambda x: x.get('event_day', '')) # 计算累计发文量 cumulative_count = 0 for day_data in daily_list_sorted: daily_published = int(day_data.get('publish_count', 0) or 0) cumulative_count += daily_published # 只插入最新一天的数据 latest_day_data = daily_list_sorted[-1] event_day = latest_day_data.get('event_day', '') if event_day: # 格式化日期 date_str = str(event_day) formatted_date = f"{date_str[0:4]}-{date_str[4:6]}-{date_str[6:8]}" # 单日发文量 daily_published = int(latest_day_data.get('publish_count', 0) or 0) record = { 'author_id': author_id, 'author_name': account_id, 'channel': 1, 'stat_date': formatted_date, 'daily_published_count': daily_published, 'cumulative_published_count': cumulative_count, 'day_revenue': Decimal(str(income_metrics['day_revenue'])), # 新增:每日收益 'monthly_revenue': Decimal(str(income_metrics['monthly_revenue'])), 'weekly_revenue': Decimal(str(income_metrics['weekly_revenue'])), 'revenue_mom_growth_rate': Decimal(str(income_metrics['revenue_mom_growth_rate'])), 'revenue_wow_growth_rate': Decimal(str(income_metrics['revenue_wow_growth_rate'])), } # 插入或更新 try: self.db_manager.insert_or_update( table='ai_statistics_days', data=record, unique_keys=['author_name', 'channel'] ) success_count += 1 print(f" [OK] 插入最新一天: {formatted_date}, 累计发文: {cumulative_count}") except Exception as e: print(f" [X] 插入失败: {e}") else: print(f" 未找到每日数据") print(f"\n{'='*70}") print(f"[OK] ai_statistics_days 表数据插入完成") print(f" 成功 {success_count} 条记录") print(f"{'='*70}") return success_count > 0 except Exception as e: print(f"\n[X] 插入 ai_statistics_days 失败: {e}") import traceback traceback.print_exc() return False def main(): print("\n" + "="*70) print("百家号数据导出工具 - 从 bjh_integrated_data.json 导出") print("="*70) # 选择导出模式 print("\n请选择导出模式:") print(" 1. 导出CSV文件") print(" 2. 直接插入数据库") mode = input("\n输入选项 (1/2, 默认1): ").strip() or '1' if mode == '2': # 数据库模式 exporter = DataExporter(use_database=True) print("\n将从 bjh_integrated_data.json 导入数据到数据库的三个表:") print(" 1. ai_statistics - 账号汇总统计表") print(" 2. ai_statistics_day - 每日明细统计表") print(" 3. ai_statistics_days - 核心指标统计表(含发文量、收益、环比)") print("="*70) else: # CSV模式 exporter = DataExporter(use_database=False) print("\n将从 bjh_integrated_data.json 导出以下三个CSV文件:") print(" 1. ai_statistics.csv - 账号汇总统计表") print(" 2. ai_statistics_day.csv - 每日明细统计表") print(" 3. ai_statistics_days.csv - 核心指标统计表(含发文量、收益、环比)") print("="*70) confirm = input("\n是否继续? (y/n): ").strip().lower() if confirm == 'y': exporter.export_all_tables() else: print("\n已取消") return print("\n" + "="*70) print("完成") print("="*70 + "\n") if __name__ == '__main__': main()