#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 指定日期统计数据获取脚本 功能:获取指定日期的百家号统计数据并填充到数据库三个统计表 """ import os import sys import json import argparse import requests import time from datetime import datetime, timedelta from typing import List, Dict, Optional from decimal import Decimal sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from database_config import DatabaseManager from export_to_csv import DataExporter # 天启代理配置 PROXY_API_URL = 'http://api.tianqiip.com/getip?secret=tmcrmh3q&num=1&type=txt&port=1&mr=1&sign=5451e454a54b9f1f06222606c418e12f' class DateStatisticsFetcher: """指定日期统计数据获取器""" def __init__(self, target_date: str, use_proxy: bool = True): """初始化 Args: target_date: 目标日期 (YYYY-MM-DD) use_proxy: 是否使用代理(默认True) """ self.target_date = datetime.strptime(target_date, '%Y-%m-%d') self.target_date_str = target_date self.db_manager = DatabaseManager() self.script_dir = os.path.dirname(os.path.abspath(__file__)) self.use_proxy = use_proxy self.current_proxy = None # 创建临时数据目录 self.temp_dir = os.path.join(self.script_dir, 'temp_data') os.makedirs(self.temp_dir, exist_ok=True) # 创建请求会话 self.session = requests.Session() self.session.verify = False # 禁用SSL警告 import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) print(f"[初始化] 目标日期: {target_date}") print(f"[初始化] 代理模式: {'启用' if use_proxy else '禁用'}") print(f"[初始化] 临时数据目录: {self.temp_dir}") def get_all_authors(self) -> List[Dict]: """获取所有活跃账号 Returns: 账号列表 """ try: sql = """ SELECT id as author_id, author_name, toutiao_cookie FROM ai_authors WHERE channel = 1 AND status = 'active' AND toutiao_cookie IS NOT NULL AND toutiao_cookie != '' ORDER BY id """ accounts = self.db_manager.execute_query(sql, fetch_one=False, dict_cursor=True) if accounts: print(f"[数据库] 找到 {len(accounts)} 个活跃账号") return accounts else: print("[!] 未找到任何活跃账号") return [] except Exception as e: print(f"[X] 查询账号失败: {e}") return [] def get_daily_article_count(self, author_id: int, date_str: str) -> int: """从ai_articles表获取指定日期的发文量 Args: author_id: 作者ID date_str: 日期字符串 (YYYY-MM-DD) Returns: 发文量 """ try: sql = """ SELECT COUNT(*) as count FROM ai_articles WHERE author_id = %s AND DATE(publish_time) = %s AND status = 'published' AND channel = 1 """ result = self.db_manager.execute_query( sql, (author_id, date_str), fetch_one=True, dict_cursor=True ) return result['count'] if result else 0 except Exception as e: print(f" [!] 查询发文量失败: {e}") return 0 def fetch_daily_income(self, cookie_string: str, date_timestamp: int, max_retries: int = 3) -> Optional[Dict]: """获取指定日期的收入数据(带重试机制) Args: cookie_string: Cookie字符串 date_timestamp: 日期Unix时间戳(秒) max_retries: 最大重试次数 Returns: 收入数据字典,失败返回None """ api_url = "https://baijiahao.baidu.com/author/eco/income4/overviewhomelist" # 设置Cookie self.session.cookies.clear() for item in cookie_string.split(';'): item = item.strip() if '=' in item: key, value = item.split('=', 1) self.session.cookies.set(key.strip(), value.strip()) # 从Cookie中提取token token_cookie = self.session.cookies.get('bjhStoken') or self.session.cookies.get('devStoken') # 请求参数 params = { 'start_date': date_timestamp, 'end_date': date_timestamp } # 请求头 headers = { 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.9', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Referer': 'https://baijiahao.baidu.com/builder/rc/incomecenter', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', } if token_cookie: headers['token'] = token_cookie retry_count = 0 while retry_count <= max_retries: try: # 如果是重试,先等待 if retry_count > 0: wait_time = retry_count * 3 # 3秒、6秒、9秒 print(f" [重试 {retry_count}/{max_retries}] 等待 {wait_time} 秒...") time.sleep(wait_time) # 获取代理 proxies = self.fetch_proxy() if self.use_proxy else None response = self.session.get( api_url, headers=headers, params=params, proxies=proxies, timeout=15 ) if response.status_code == 200: data = response.json() if data.get('errno') == 0: return data else: error_msg = data.get('errmsg', '') errno = data.get('errno') print(f" [!] API返回错误: errno={errno}, errmsg={error_msg}") # 异常请求错误,尝试重试 if errno == 10000015 and retry_count < max_retries: retry_count += 1 continue return None else: print(f" [!] HTTP错误: {response.status_code}") return None except Exception as e: error_type = type(e).__name__ print(f" [!] 请求异常: {error_type} - {e}") # 判断是否需要重试 is_retry_error = any([ 'Connection' in error_type, 'Timeout' in error_type, 'ProxyError' in error_type, ]) if is_retry_error and retry_count < max_retries: retry_count += 1 continue return None return None def fetch_analytics_api(self, cookie_string: str, target_date: str, max_retries: int = 3) -> Optional[Dict]: """调用百家号发文统计API获取阅读量、评论量等数据 Args: cookie_string: Cookie字符串 target_date: 目标日期 (YYYY-MM-DD) max_retries: 最大重试次数 Returns: API返回数据,失败返回None """ # 设置Cookie self.session.cookies.clear() for item in cookie_string.split(';'): item = item.strip() if '=' in item: key, value = item.split('=', 1) self.session.cookies.set(key.strip(), value.strip(), domain='.baidu.com') # 从Cookie中提取token token_cookie = self.session.cookies.get('bjhStoken') or self.session.cookies.get('devStoken') # 计算日期范围(仅查询目标日期当天) date_obj = datetime.strptime(target_date, '%Y-%m-%d') start_day = date_obj.strftime('%Y%m%d') end_day = start_day # 开始和结束是同一天 # API端点(使用appStatisticV3) api_url = "https://baijiahao.baidu.com/author/eco/statistics/appStatisticV3" # 请求参数 params = { 'type': 'event', 'start_day': start_day, 'end_day': end_day, 'stat': '0', 'special_filter_days': '1' } # 请求头 headers = { 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.9', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Referer': 'https://baijiahao.baidu.com/builder/rc/analysiscontent', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', } if token_cookie: headers['token'] = token_cookie retry_count = 0 while retry_count <= max_retries: try: # 如果是重试,先等待 if retry_count > 0: wait_time = retry_count * 3 print(f" [重试 {retry_count}/{max_retries}] 等待 {wait_time} 秒...") time.sleep(wait_time) # 获取代理 proxies = self.fetch_proxy() if self.use_proxy else None response = self.session.get( api_url, headers=headers, params=params, proxies=proxies, timeout=15 ) if response.status_code == 200: data = response.json() errno = data.get('errno', -1) if errno == 0: # 提取total_info和list数据 data_content = data.get('data', {}) total_info = data_content.get('total_info', {}) daily_list = data_content.get('list', []) print(f" [发文统计] 阅读量: {total_info.get('view_count', 0)}") print(f" [发文统计] 评论量: {total_info.get('comment_count', 0)}") return data else: error_msg = data.get('errmsg', '') print(f" [!] 发文统计API错误: errno={errno}, errmsg={error_msg}") if errno == 10000015 and retry_count < max_retries: retry_count += 1 continue return None else: print(f" [!] HTTP错误: {response.status_code}") return None except Exception as e: error_type = type(e).__name__ print(f" [!] 请求异常: {error_type} - {e}") is_retry_error = any([ 'Connection' in error_type, 'Timeout' in error_type, 'ProxyError' in error_type, ]) if is_retry_error and retry_count < max_retries: retry_count += 1 continue return None return None def get_cumulative_article_count(self, author_id: int, start_date: str, end_date: str) -> int: """从ai_articles表获取累计发文量 Args: author_id: 作者ID start_date: 开始日期 (YYYY-MM-DD) end_date: 结束日期 (YYYY-MM-DD) Returns: 累计发文量 """ try: sql = """ SELECT COUNT(*) as count FROM ai_articles WHERE author_id = %s AND DATE(publish_time) >= %s AND DATE(publish_time) <= %s AND status = 'published' AND channel = 1 """ result = self.db_manager.execute_query( sql, (author_id, start_date, end_date), fetch_one=True, dict_cursor=True ) return result['count'] if result else 0 except Exception as e: print(f" [!] 查询累计发文量失败: {e}") return 0 def fetch_proxy(self) -> Optional[Dict]: """获取天启代理IP Returns: 代理配置字典,失败返回None """ if not self.use_proxy: return None try: resp = requests.get(PROXY_API_URL, timeout=10) resp.raise_for_status() text = resp.text.strip() # 检测是否返回错误信息 if text.upper().startswith('ERROR'): print(f" [!] 代理API返回错误: {text}") return None # 解析IP:PORT格式 lines = text.split('\n') for line in lines: line = line.strip() if ':' in line and line.count(':') == 1: ip_port = line.split()[0] if ' ' in line else line host, port = ip_port.split(':', 1) proxy_url = f'http://{host}:{port}' self.current_proxy = { 'http': proxy_url, 'https': proxy_url, } print(f" [代理] 使用天启IP: {ip_port}") return self.current_proxy print(f" [!] 无法解析代理API返回: {text[:100]}") return None except Exception as e: print(f" [!] 获取代理失败: {e}") return None def build_integrated_data(self, author_id: int, author_name: str, cookie_string: str) -> Dict: """构建指定日期的整合数据 Args: author_id: 作者ID author_name: 作者名称 cookie_string: Cookie字符串 Returns: 整合数据字典 """ print(f"\n [构建] 账号 {author_name} 的整合数据...") # 计算当月第一天(用于累计发文量) month_first = self.target_date.replace(day=1).strftime('%Y-%m-%d') # 从数据库获取发文量 daily_count = self.get_daily_article_count(author_id, self.target_date_str) cumulative_count = self.get_cumulative_article_count(author_id, month_first, self.target_date_str) print(f" 单日发文量: {daily_count}") print(f" 累计发文量: {cumulative_count} (从{month_first}至{self.target_date_str})") # 获取发文统计数据(阅读量、评论量等) print(f" [API] 获取发文统计数据...") analytics_data = self.fetch_analytics_api(cookie_string, self.target_date_str) # 提取total_info和list数据 total_info = {} daily_list = [] if analytics_data: data_content = analytics_data.get('data', {}) total_info = data_content.get('total_info', {}) daily_list = data_content.get('list', []) # 获取收入数据 day_revenue = 0.0 date_timestamp = int(self.target_date.replace(hour=0, minute=0, second=0, microsecond=0).timestamp()) print(f" [API] 获取收入数据...") income_data = self.fetch_daily_income(cookie_string, date_timestamp) if income_data and income_data.get('data', {}).get('list'): income_list = income_data['data']['list'] if income_list and len(income_list) > 0: total_income = income_list[0].get('total_income', 0) day_revenue = float(total_income) print(f" 当日收益: ¥{day_revenue:.2f}") else: print(f" 当日收益: ¥0.00 (无收入数据)") else: print(f" 当日收益: ¥0.00 (API调用失败)") # 构建整合数据(模拟BaijiahaoAnalytics的数据结构) integrated_data = { 'account_id': author_name, 'author_id': author_id, 'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'target_date': self.target_date_str, 'status': 'success', 'analytics': { 'apis': [ # 修改:需要包装在apis数组中 { 'data': { 'errno': 0, 'data': { 'list': daily_list if daily_list else [ { 'event_day': self.target_date_str.replace('-', ''), # 格式:20251225 'date': self.target_date_str, 'publish_count': daily_count, 'daily_published_count': daily_count, 'cumulative_published_count': cumulative_count, } ], 'latest_event_day': self.target_date_str.replace('-', ''), # 格式:20251225 'total_info': total_info if total_info else { 'publish_count': daily_count, 'view_count': 0, 'comment_count': 0, 'comment_rate': 0, 'likes_count': 0, 'likes_rate': 0, 'collect_count': 0, 'collect_rate': 0, 'share_count': 0, 'share_rate': 0, 'pic_slide_rate': 0, 'disp_pv': 0, } } } } ] }, 'income': { 'errno': 0, # 添加:标记API调用成功 'data': { 'income': { 'yesterday': { 'income': day_revenue # 修改:使用income字段而不是value }, 'currentMonth': { 'income': 0 # 历史数据无法获取当月收益,设为0 } } } } } return integrated_data def process_single_date(self) -> bool: """处理单个日期的所有账号数据 Returns: 是否成功 """ print(f"\n{'='*70}") print(f"开始处理 {self.target_date_str} 的数据") print(f"{'='*70}") # 获取所有账号 accounts = self.get_all_authors() if not accounts: print("[X] 没有可用的账号,退出") return False # 构建所有账号的整合数据 integrated_data_list = [] for idx, account in enumerate(accounts, 1): author_id = account.get('author_id') author_name = account.get('author_name', '') cookie_string = account.get('toutiao_cookie', '') if not author_id: print(f"\n[{idx}/{len(accounts)}] 跳过: {author_name} (缺少author_id)") continue if not cookie_string: print(f"\n[{idx}/{len(accounts)}] 跳过: {author_name} (缺少Cookie)") continue print(f"\n[{idx}/{len(accounts)}] 处理账号: {author_name} (ID: {author_id})") try: integrated_data = self.build_integrated_data(author_id, author_name, cookie_string) integrated_data_list.append(integrated_data) print(f" [OK] 数据构建成功") # 延迟避免请求过快(增加到3-5秒) if idx < len(accounts): import random delay = random.uniform(3, 5) print(f" [延迟] 等待 {delay:.1f} 秒...") time.sleep(delay) except Exception as e: print(f" [X] 数据构建失败: {e}") import traceback traceback.print_exc() continue if not integrated_data_list: print("[!] 没有成功构建任何数据") return False # 保存整合数据到临时文件 integrated_file = os.path.join(self.temp_dir, f'integrated_{self.target_date_str}.json') try: with open(integrated_file, 'w', encoding='utf-8') as f: json.dump(integrated_data_list, f, ensure_ascii=False, indent=2) print(f"\n[保存] 整合数据: {integrated_file}") except Exception as e: print(f"[X] 保存整合数据失败: {e}") return False # 使用DataExporter导出到三个表 print(f"\n[导出] 开始导出到数据库...") try: exporter = DataExporter(use_database=False) # 临时替换整合数据文件路径 original_file = exporter.integrated_file exporter.integrated_file = integrated_file # 导出三个表的数据 result = exporter.export_all_tables() # 恢复原路径 exporter.integrated_file = original_file if result: print(f"\n[OK] {self.target_date_str} 数据处理完成") return True else: print(f"\n[!] {self.target_date_str} 数据导出失败") return False except Exception as e: print(f"[X] 导出数据失败: {e}") import traceback traceback.print_exc() return False def main(): """主函数""" parser = argparse.ArgumentParser( description='获取指定日期的百家号统计数据', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例用法: python fetch_date_statistics.py 2025-12-01 python fetch_date_statistics.py 2025-12-15 注意事项: 1. 由于百家号API限制,无法获取历史日期的收入数据 2. 脚本会从ai_articles表统计发文量数据 3. 收入字段将被设置为0(需要在数据产生当天运行才能获取真实收入) """ ) parser.add_argument( 'date', type=str, help='目标日期 (格式: YYYY-MM-DD)' ) parser.add_argument( '--no-proxy', action='store_true', help='禁用代理(默认启用天启代理)' ) args = parser.parse_args() # 验证日期格式 try: datetime.strptime(args.date, '%Y-%m-%d') except ValueError: print(f"[X] 日期格式错误: {args.date}") print(" 正确格式: YYYY-MM-DD (例如: 2025-12-01)") return 1 print("\n" + "="*70) print("百家号指定日期统计数据获取工具") print("="*70) print(f"目标日期: {args.date}") print("="*70) try: fetcher = DateStatisticsFetcher(args.date, use_proxy=not args.no_proxy) success = fetcher.process_single_date() return 0 if success else 1 except Exception as e: print(f"\n[X] 程序执行出错: {e}") import traceback traceback.print_exc() return 1 if __name__ == '__main__': sys.exit(main())