#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 百家号指定日期数据抓取工具 根据指定日期范围抓取发文统计和收入数据 """ import json import sys import os import argparse from datetime import datetime, timedelta from typing import Dict, List, Optional # 导入基础分析器 from bjh_analytics import BaijiahaoAnalytics # 设置标准输出编码为UTF-8 if sys.platform == 'win32': import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') class BaijiahaoDateAnalytics(BaijiahaoAnalytics): """百家号指定日期数据抓取器""" def __init__(self, target_date: str, use_proxy: bool = False, load_from_db: bool = False, db_config: Optional[Dict] = None): """初始化 Args: target_date: 目标日期 (YYYY-MM-DD) use_proxy: 是否使用代理 load_from_db: 是否从数据库加载Cookie db_config: 数据库配置 """ super().__init__(use_proxy=use_proxy, load_from_db=load_from_db, db_config=db_config) # 解析目标日期 try: self.target_date = datetime.strptime(target_date, '%Y-%m-%d') self.target_date_str = target_date except ValueError: raise ValueError(f"日期格式错误: {target_date},正确格式: YYYY-MM-DD") # 修改输出文件名(不带日期,使用固定文件名) self.output_file = os.path.join( self.script_dir, "bjh_integrated_data.json" ) # 创建备份文件夹 self.backup_dir = os.path.join(self.script_dir, "backup") if not os.path.exists(self.backup_dir): os.makedirs(self.backup_dir) print(f"[配置] 目标日期: {target_date}") print(f"[配置] 输出文件: {self.output_file}") print(f"[配置] 备份目录: {self.backup_dir}") def fetch_analytics_api_for_date(self, days: int = 7, max_retries: int = 3) -> Optional[Dict]: """获取指定日期范围的发文统计数据 Args: days: 查询天数(从target_date往前推) max_retries: 最大重试次数 Returns: 发文统计数据 """ import time # 计算日期范围(从target_date往前推days天) end_date = self.target_date start_date = end_date - timedelta(days=days-1) start_day = start_date.strftime('%Y%m%d') end_day = end_date.strftime('%Y%m%d') # API端点 api_url = f"{self.base_url}/author/eco/statistics/appStatisticV3" # 请求参数(不使用special_filter_days,直接指定日期范围) params = { 'type': 'event', 'start_day': start_day, 'end_day': end_day, 'stat': '0' } # 从Cookie中提取token token_cookie = self.session.cookies.get('bjhStoken') or self.session.cookies.get('devStoken') # 请求头 headers = { 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.9', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Referer': f'{self.base_url}/builder/rc/analysiscontent', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', } if token_cookie: headers['token'] = token_cookie self.logger.info(f"获取发文统计: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}") print(f"\n[请求] 获取发文统计数据") print(f" 日期范围: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}") successful_data = [] retry_count = 0 proxy_change_count = 0 # 代理更换次数计数器 max_proxy_changes = 3 # 最多更换3次代理(即最多使用4个不同代理) while retry_count <= max_retries: try: if retry_count > 0: wait_time = retry_count * 2 print(f" [重试 {retry_count}/{max_retries}] 等待 {wait_time} 秒...") time.sleep(wait_time) proxies = self.fetch_proxy() if self.use_proxy else None # 调试信息:显示代理使用情况 if self.use_proxy: if proxies: proxy_url = proxies.get('http', '') if '@' in proxy_url: proxy_ip = proxy_url.split('@')[1] else: proxy_ip = proxy_url.replace('http://', '').replace('https://', '') print(f" [代理] 使用IP: {proxy_ip}") else: print(f" [!] 警告:代理未生效!use_proxy={self.use_proxy}") response = self.session.get( api_url, headers=headers, params=params, proxies=proxies, timeout=15, verify=False ) print(f" 状态码: {response.status_code}") if response.status_code == 200: data = response.json() errno = data.get('errno', -1) if errno == 0: print(f" [✓] API调用成功") # 请求成功,重置代理失败计数 self.reset_proxy_fail_count() # 检查data字段类型 data_field = data.get('data', {}) if isinstance(data_field, list): print(f" [X] API返回数据格式异常: data字段为列表而非字典") print(f" 原始响应前500字符: {str(data)[:500]}") break if not isinstance(data_field, dict): print(f" [X] API返回数据格式异常: data字段类型为 {type(data_field).__name__}") break total_info = data_field.get('total_info', {}) print(f"\n 发文统计数据:") print(f" 发文量: {total_info.get('publish_count', '0')}") print(f" 曝光量: {total_info.get('disp_pv', '0')}") print(f" 阅读量: {total_info.get('view_count', '0')}") api_result = { 'endpoint': '/author/eco/statistics/appStatisticV3', 'name': '发文统计', 'date_range': f"{start_day} - {end_day}", 'data': data, 'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } successful_data.append(api_result) break else: errmsg = data.get('errmsg', '') print(f" [X] API返回错误: errno={errno}, errmsg={errmsg}") # 特别处理 errno=10000015(异常请求),这通常是代理未生效 if errno == 10000015 and self.use_proxy: print(f" [!] 检测到代理未生效,立即更换新代理") # 检查是否超过代理更换上限 if proxy_change_count >= max_proxy_changes: print(f" [X] 已达代理更换上限({max_proxy_changes}次),放弃重试") break # 立即强制获取新代理 self.current_proxy = None self.proxy_fail_count = 0 new_proxy = self.fetch_proxy(force_new=True) if new_proxy and retry_count < max_retries: proxy_change_count += 1 print(f" [!] 已更换新代理({proxy_change_count}/{max_proxy_changes}),将重试...") retry_count += 1 continue else: print(f" [X] 无法获取新代理或已达重试上限") break else: # 其他API错误,不重试 break else: print(f" [X] HTTP错误: {response.status_code}") break except Exception as e: error_type = type(e).__name__ is_retry_error = any([ 'Connection' in error_type, 'Timeout' in error_type, 'ProxyError' in error_type, ]) if is_retry_error and retry_count < max_retries: print(f" [!] 连接错误: {error_type}") # 标记代理失败 self.mark_proxy_failed() # 如果代理失败次数达到3次,强制更换新代理(第4次重试用新代理) if self.proxy_fail_count >= 3 and self.use_proxy: # 检查是否超过代理更换上限 if proxy_change_count >= max_proxy_changes: print(f" [X] 已达代理更换上限({max_proxy_changes}次),放弃重试") break print(f" [!] 代理已失败{self.proxy_fail_count}次,强制更换新代理") self.current_proxy = None self.proxy_fail_count = 0 new_proxy = self.fetch_proxy(force_new=True) if new_proxy: proxy_change_count += 1 print(f" [✓] 已更换新代理({proxy_change_count}/{max_proxy_changes}),继续重试") else: print(f" [X] 无法获取新代理") break retry_count += 1 continue else: print(f" [X] 请求异常: {e}") break if successful_data: return { 'apis': successful_data, 'count': len(successful_data) } return None def fetch_income_for_date(self, max_retries: int = 3) -> Optional[Dict]: """获取指定日期的收入数据 使用overviewhomelist API获取按天的详细收入数据 Returns: 收入数据 """ import time from datetime import timedelta # 计算Unix时间戳(从目标日期往前30天,以便获取更多数据) end_date = self.target_date start_date = end_date - timedelta(days=29) # 30天范围 # 转换为Unix时间戳(秒) start_timestamp = int(start_date.timestamp()) end_timestamp = int(end_date.timestamp()) # 使用overviewhomelist API获取每日收入明细 api_url = f"{self.base_url}/author/eco/income4/overviewhomelist" token_cookie = self.session.cookies.get('bjhStoken') or self.session.cookies.get('devStoken') headers = { 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.9', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Referer': f'{self.base_url}/builder/rc/incomecenter', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', } if token_cookie: headers['token'] = token_cookie # 请求参数 params = { 'start_date': start_timestamp, 'end_date': end_timestamp } print(f"\n[请求] 获取收入数据") print(f" 日期范围: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}") retry_count = 0 proxy_change_count = 0 # 代理更换次数计数器 max_proxy_changes = 3 # 最多更换3次代理(即最多使用4个不同代理) while retry_count <= max_retries: try: if retry_count > 0: wait_time = retry_count * 2 print(f" [重试 {retry_count}/{max_retries}] 等待 {wait_time} 秒...") time.sleep(wait_time) proxies = self.fetch_proxy() if self.use_proxy else None # 调试信息:显示代理使用情况 if self.use_proxy: if proxies: proxy_url = proxies.get('http', '') if '@' in proxy_url: proxy_ip = proxy_url.split('@')[1] else: proxy_ip = proxy_url.replace('http://', '').replace('https://', '') print(f" [代理] 使用IP: {proxy_ip}") else: print(f" [!] 警告:代理未生效!use_proxy={self.use_proxy}") response = self.session.get( api_url, headers=headers, params=params, proxies=proxies, timeout=15, verify=False ) print(f" 状态码: {response.status_code}") if response.status_code == 200: data = response.json() errno = data.get('errno', -1) if errno == 0: print(f" [✓] API调用成功") # 请求成功,重置代理失败计数 self.reset_proxy_fail_count() # 提取收入列表 income_list = data.get('data', {}).get('list', []) if income_list: # 找到目标日期的数据 target_timestamp = int(self.target_date.timestamp()) target_income_data = None for item in income_list: if item.get('day_time') == target_timestamp: target_income_data = item break if target_income_data: day_revenue = target_income_data.get('total_income', 0) print(f"\n 收入数据详情:") print(f" {self.target_date_str} 当日收入: ¥{day_revenue:.2f}") # 计算近7天收入 recent7_revenue = 0.0 recent7_start = self.target_date - timedelta(days=6) recent7_start_ts = int(recent7_start.timestamp()) for item in income_list: if recent7_start_ts <= item.get('day_time', 0) <= target_timestamp: recent7_revenue += item.get('total_income', 0) print(f" 近7天: ¥{recent7_revenue:.2f}") # 计算近30天收入 recent30_revenue = sum(item.get('total_income', 0) for item in income_list) print(f" 近30天: ¥{recent30_revenue:.2f}") # 计算当月收入(从月初到目标日期) month_start = self.target_date.replace(day=1) month_start_ts = int(month_start.timestamp()) current_month_revenue = 0.0 for item in income_list: if month_start_ts <= item.get('day_time', 0) <= target_timestamp: current_month_revenue += item.get('total_income', 0) print(f" 当月收入: ¥{current_month_revenue:.2f}") # 构造返回数据(与原有格式保持一致) return { 'errno': 0, 'errmsg': 'success', 'data': { 'income': { 'yesterday': { 'income': day_revenue, 'value': day_revenue }, 'recent7Days': { 'income': recent7_revenue, 'value': recent7_revenue }, 'recent30Days': { 'income': recent30_revenue, 'value': recent30_revenue }, 'currentMonth': { 'income': current_month_revenue, 'value': current_month_revenue } } }, 'raw_list': income_list # 保留原始数据 } else: print(f" [警告] 未找到 {self.target_date_str} 的收入数据") return None else: print(f" [警告] 收入数据列表为空") return None else: errmsg = data.get('errmsg', '') print(f" [X] API返回错误: errno={errno}, errmsg={errmsg}") # 特别处理 errno=10000015(异常请求),这通常是代理未生效 if errno == 10000015 and self.use_proxy: print(f" [!] 检测到代理未生效,立即更换新代理") # 检查是否超过代理更换上限 if proxy_change_count >= max_proxy_changes: print(f" [X] 已达代理更换上限({max_proxy_changes}次),放弃重试") return None # 立即强制获取新代理 self.current_proxy = None self.proxy_fail_count = 0 new_proxy = self.fetch_proxy(force_new=True) if new_proxy and retry_count < max_retries: proxy_change_count += 1 print(f" [!] 已更换新代理({proxy_change_count}/{max_proxy_changes}),将重试...") retry_count += 1 continue else: print(f" [X] 无法获取新代理或已达重试上限") return None else: # 其他API错误,不重试 return None else: print(f" [X] HTTP错误: {response.status_code}") return None except Exception as e: error_type = type(e).__name__ is_retry_error = any([ 'Connection' in error_type, 'Timeout' in error_type, 'ProxyError' in error_type, ]) if is_retry_error and retry_count < max_retries: print(f" [!] 连接错误: {error_type}") # 标记代理失败 self.mark_proxy_failed() # 如果代理失败次数达到3次,强制更换新代理(第4次重试用新代理) if self.proxy_fail_count >= 3 and self.use_proxy: # 检查是否超过代理更换上限 if proxy_change_count >= max_proxy_changes: print(f" [X] 已达代理更换上限({max_proxy_changes}次),放弃重试") return None print(f" [!] 代理已失败{self.proxy_fail_count}次,强制更换新代理") self.current_proxy = None self.proxy_fail_count = 0 new_proxy = self.fetch_proxy(force_new=True) if new_proxy: proxy_change_count += 1 print(f" [✓] 已更换新代理({proxy_change_count}/{max_proxy_changes}),继续重试") else: print(f" [X] 无法获取新代理") return None retry_count += 1 continue else: print(f" [X] 请求异常: {e}") return None return None def extract_integrated_data_for_date(self, account_id: str, days: int = 7) -> Optional[Dict]: """提取指定账号在指定日期的整合数据 Args: account_id: 账号ID days: 查询天数(从target_date往前推) Returns: 整合数据 """ import time import random print(f"\n{'='*70}") print(f"开始提取账号数据: {account_id}") print(f"目标日期: {self.target_date_str}") print(f"{'='*70}") if not self.set_account_cookies(account_id): return None result = { 'account_id': account_id, 'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'target_date': self.target_date_str, 'status': 'unknown', 'analytics': {}, 'income': {}, 'error_info': {} } # 1. 获取发文统计数据 print("\n[1/2] 获取发文统计数据...") api_data = self.fetch_analytics_api_for_date(days=days) if api_data: result['analytics'] = api_data print("[OK] 发文统计数据获取成功") else: print("[X] 发文统计数据获取失败") result['error_info']['analytics'] = 'API调用失败' # API调用间隔 api_delay = random.uniform(2, 4) print(f"\n[间隔] 等待 {api_delay:.1f} 秒...") time.sleep(api_delay) # 2. 获取收入数据 print("\n[2/2] 获取收入数据...") income_data = self.fetch_income_for_date() if income_data: result['income'] = income_data print("[OK] 收入数据获取成功") else: print("[X] 收入数据获取失败") result['error_info']['income'] = 'API调用失败' # 设置状态 if result['analytics'] and result['income']: result['status'] = 'success_all' elif result['analytics'] or result['income']: result['status'] = 'success_partial' else: result['status'] = 'failed' return result def extract_all_for_date(self, days: int = 7, delay_seconds: float = 3.0) -> List[Dict]: """提取所有账号在指定日期的数据 Args: days: 查询天数 delay_seconds: 账号间延迟 Returns: 所有账号的数据 """ import random if not self.account_cookies: print("[X] 没有可用的账号Cookie") return [] print("\n" + "="*70) print(f"开始提取 {len(self.account_cookies)} 个账号的数据") print(f"目标日期: {self.target_date_str}") print("="*70) results = [] for idx, account_id in enumerate(self.account_cookies.keys(), 1): print(f"\n[{idx}/{len(self.account_cookies)}] 处理账号: {account_id}") result = self.extract_integrated_data_for_date(account_id, days=days) if result: results.append(result) # 添加延迟 if idx < len(self.account_cookies): actual_delay = delay_seconds * random.uniform(0.7, 1.3) print(f"\n[延迟] 等待 {actual_delay:.1f} 秒后继续...") import time time.sleep(actual_delay) return results def save_results(self, results: List[Dict]): """保存结果到文件(同时备份带时间戳的副本) Args: results: 数据分析结果列表 """ import json import shutil try: # 1. 保存到主文件(不带时间戳) with open(self.output_file, 'w', encoding='utf-8') as f: json.dump(results, f, ensure_ascii=False, indent=2) print(f"\n{'='*70}") print(f"[OK] 数据已保存到: {self.output_file}") # 2. 创建带时间戳的备份文件(只保留日期) timestamp = datetime.now().strftime('%Y%m%d') backup_filename = f"bjh_integrated_data_{timestamp}.json" backup_file = os.path.join(self.backup_dir, backup_filename) # 复制文件到备份目录 shutil.copy2(self.output_file, backup_file) print(f"[OK] 备份已保存到: {backup_file}") print(f"{'='*70}") # 显示统计 success_count = sum(1 for r in results if r.get('status', '').startswith('success')) print(f"\n统计信息:") print(f" - 总账号数: {len(results)}") print(f" - 成功获取: {success_count}") print(f" - 失败: {len(results) - success_count}") except Exception as e: print(f"[X] 保存文件失败: {e}") def main(): """主函数""" parser = argparse.ArgumentParser( description='百家号指定日期数据抓取工具', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例用法: python bjh_analytics_date.py 2025-12-20 python bjh_analytics_date.py 2025-12-20 --days 7 python bjh_analytics_date.py 2025-12-20 --proxy python bjh_analytics_date.py 2025-12-20 --database python bjh_analytics_date.py 2025-12-20 --account "乳腺专家林华" # 仅测试单个账号 """ ) parser.add_argument( 'date', type=str, help='目标日期 (格式: YYYY-MM-DD)' ) parser.add_argument( '--days', type=int, default=7, help='查询天数(从目标日期往前推,默认7天)' ) parser.add_argument( '--proxy', action='store_true', default=True, # 默认启用代理 help='启用代理(默认启用)' ) parser.add_argument( '--no-proxy', dest='proxy', action='store_false', help='禁用代理' ) parser.add_argument( '--database', action='store_true', default=True, # 默认从数据库加载Cookie help='从数据库加载Cookie(默认启用)' ) parser.add_argument( '--local', dest='database', action='store_false', help='从本地JSON文件加载Cookie' ) parser.add_argument( '--delay', type=float, default=3.0, help='账号间延迟时间(秒,默认3.0)' ) parser.add_argument( '--account', type=str, default=None, help='仅抓取指定账号(用于测试),格式:账号名称' ) parser.add_argument( '--no-confirm', action='store_true', help='跳过确认提示,直接开始抓取(用于批量脚本)' ) args = parser.parse_args() # 验证日期格式 try: datetime.strptime(args.date, '%Y-%m-%d') except ValueError: print(f"[X] 日期格式错误: {args.date}") print(" 正确格式: YYYY-MM-DD (例如: 2025-12-20)") return 1 print("\n" + "="*70) print("百家号指定日期数据抓取工具") print("="*70) print(f"目标日期: {args.date}") print(f"查询天数: {args.days}") print(f"使用代理: {'是' if args.proxy else '否'}") print(f"数据源: {'数据库' if args.database else '本地文件'}") print("="*70) try: # 创建分析器 analytics = BaijiahaoDateAnalytics( target_date=args.date, use_proxy=args.proxy, load_from_db=args.database ) if not analytics.account_cookies: print("\n[X] 未找到可用的账号Cookie") return 1 # 如果指定了单个账号,验证是否存在 if args.account: if args.account not in analytics.account_cookies: print(f"\n[X] 未找到指定账号: {args.account}") print(f"\n可用账号列表:") for idx, account_name in enumerate(analytics.account_cookies.keys(), 1): print(f" {idx}. {account_name}") return 1 # 只保留指定账号 analytics.account_cookies = {args.account: analytics.account_cookies[args.account]} print(f"\n[测试模式] 仅抓取账号: {args.account}") print(f"\n找到 {len(analytics.account_cookies)} 个账号") # 确认执行(除非使用--no-confirm参数) if not args.no_confirm: confirm = input("\n是否开始抓取? (y/n): ").strip().lower() if confirm != 'y': print("已取消") return 0 # 提取所有账号数据 results = analytics.extract_all_for_date( days=args.days, delay_seconds=args.delay ) if results: analytics.save_results(results) # 显示统计 success_all = sum(1 for r in results if r.get('status') == 'success_all') success_partial = sum(1 for r in results if r.get('status') == 'success_partial') failed = sum(1 for r in results if r.get('status') == 'failed') print(f"\n{'='*70}") print("数据提取统计") print(f"{'='*70}") print(f" 总账号数: {len(results)}") print(f" 全部成功: {success_all}") print(f" 部分成功: {success_partial}") print(f" 失败: {failed}") print(f"{'='*70}") return 0 else: print("\n[X] 未获取到任何数据") return 1 except Exception as e: print(f"\n[X] 程序执行出错: {e}") import traceback traceback.print_exc() return 1 if __name__ == '__main__': sys.exit(main())