#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 百家号数据整合抓取工具 同时获取发文统计和收入数据 """ import json import sys import os import requests import time from datetime import datetime, timedelta from typing import Dict, List, Optional # 设置标准输出编码为UTF-8 if sys.platform == 'win32': import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') # 禁用SSL警告 import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # 导入日志配置 from log_config import setup_bjh_analytics_logger from database_config import DatabaseManager, DB_CONFIG # 代理配置 - 大麦代理IP PROXY_API_URL = ( 'https://api2.damaiip.com/index.php?s=/front/user/getIPlist&xsn=e054861d08471263d970bde4f4905181&osn=TC_NO176655872088456223&tiqu=1' ) # 大麦代理账号密码认证 PROXY_USERNAME = '694b8c3172af7' PROXY_PASSWORD = 'q8yA8x1dwCpdyIK' # 备用固定代理IP池(格式:'IP:端口', '用户名', '密码') BACKUP_PROXY_POOL = [ {'ip': '61.171.69.167:50000', 'user': '6jinnh', 'password': 'fi9k7q5d'}, {'ip': '36.213.32.122:50001', 'user': '9w6xpg', 'password': 'tqswr1ee'}, ] class BaijiahaoAnalytics: """百家号数据整合抓取器(发文统计 + 收入数据)""" def __init__(self, use_proxy: bool = False, load_from_db: bool = False, db_config: Optional[Dict] = None): """初始化 Args: use_proxy: 是否使用代理,默认False load_from_db: 是否从MySSQL数据库加载账号Cookie,默认False db_config: 数据库配置,默认使用database_config.DB_CONFIG """ self.base_url = "https://baijiahao.baidu.com" self.analytics_url = f"{self.base_url}/builder/rc/analysiscontent" self.session = requests.Session() self.session.verify = False # 代理配置 self.use_proxy = use_proxy self.current_proxy = None # 数据库配置 self.load_from_db = load_from_db self.db_manager = None if load_from_db: self.db_manager = DatabaseManager(db_config) # 初始化日志 self.logger = setup_bjh_analytics_logger() if self.use_proxy: self.logger.info("已启用代理模式") print("[配置] 已启用代理模式") if self.load_from_db: self.logger.info("已启用数据库加载模式") print("[配置] 已启用数据库加载模式") # 获取脚本所在目录 self.script_dir = os.path.dirname(os.path.abspath(__file__)) # 加载捕获的Cookie数据 self.captured_cookies_file = os.path.join(self.script_dir, "captured_account_cookies.json") # 根据配置选择加载方式 if self.load_from_db: self.account_cookies = self.load_cookies_from_database() else: self.account_cookies = self.load_captured_cookies() # 输出文件(整合数据) self.output_file = os.path.join(self.script_dir, "bjh_integrated_data.json") # 兼容旧的输出文件 self.analytics_output = os.path.join(self.script_dir, "bjh_analytics_data.json") self.income_output = os.path.join(self.script_dir, "bjh_income_data_v2.json") def cookie_string_to_dict(self, cookie_string: str) -> Dict: """将Cookie字符串转换为字典格式 Args: cookie_string: Cookie字符串,格式: "key1=value1; key2=value2" Returns: Cookie字典 """ cookie_dict = {} if not cookie_string: return cookie_dict for item in cookie_string.split(';'): item = item.strip() if '=' in item: key, value = item.split('=', 1) cookie_dict[key.strip()] = value.strip() return cookie_dict def load_cookies_from_database(self) -> Dict: """从数据库加载Cookie数据 Returns: 账号Cookie字典,格式与 load_captured_cookies() 相同 """ try: if not self.db_manager: print("[X] 数据库管理器未初始化") return {} # 查询所有激活且有Cookie的账号 # channel=1 表示百度百家号 sql = """ SELECT id, author_name, app_id, toutiao_cookie, department_name FROM ai_authors WHERE channel = 1 AND status = 'active' AND toutiao_cookie IS NOT NULL AND toutiao_cookie != '' ORDER BY id """ results = self.db_manager.execute_query(sql) if not results: print("[X] 数据库中未找到可用的账号Cookie") return {} # 转换为与 JSON 文件相同的格式 account_cookies = {} for row in results: author_id = row['id'] author_name = row['author_name'] app_id = row['app_id'] or '' cookie_string = row['toutiao_cookie'] domain = row['department_name'] or '其它' # 将Cookie字符串转换为字典 cookies = self.cookie_string_to_dict(cookie_string) if not cookies: continue # 使用 author_name 作为 key,与 JSON 文件保持一致 account_cookies[author_name] = { 'author_id': author_id, 'app_id': app_id, 'nick': author_name, 'domain': domain, 'cookies': cookies, 'first_captured': None, # 数据库中没有此字段 'last_updated': None, 'source': 'database' # 标记数据来源 } self.logger.info(f"从数据库加载了 {len(account_cookies)} 个账号的Cookie") print(f"[OK] 从数据库加载了 {len(account_cookies)} 个账号的Cookie") return account_cookies except Exception as e: self.logger.error(f"从数据库加载Cookie失败: {e}", exc_info=True) print(f"[X] 从数据库加载Cookie失败: {e}") return {} def load_captured_cookies(self) -> Dict: """从本地JSON文件加载已捕获的Cookie数据""" try: with open(self.captured_cookies_file, 'r', encoding='utf-8') as f: data = json.load(f) print(f"[OK] 从文件加载了 {len(data)} 个账号的Cookie") return data except FileNotFoundError: print(f"[X] 未找到Cookie文件: {self.captured_cookies_file}") print(" 请先运行一键捕获Cookie工具") return {} except Exception as e: print(f"[X] 加载Cookie失败: {e}") return {} def set_account_cookies(self, account_id: str) -> bool: """设置指定账号的Cookie到会话 Args: account_id: 账号ID Returns: bool: 是否成功设置 """ if account_id not in self.account_cookies: print(f"[X] 未找到账号 {account_id} 的Cookie") return False account_data = self.account_cookies[account_id] cookies = account_data.get('cookies', {}) if not cookies: print(f"[X] 账号 {account_id} 的Cookie为空") return False # 设置Cookie self.session.cookies.clear() for key, value in cookies.items(): self.session.cookies.set(key, value, domain='.baidu.com') print(f"[OK] 已设置账号 {account_id} 的Cookie ({len(cookies)} 个字段)") return True def fetch_proxy(self) -> Optional[Dict]: """从代理服务获取一个可用代理,失败时使用备用固定代理 Returns: 代理配置字典,格式: {'http': 'http://...', 'https': 'http://...'} """ if not self.use_proxy: return None try: # 使用大麦代理API获取IP resp = requests.get(PROXY_API_URL, timeout=10) resp.raise_for_status() # 首先尝试解析为纯文本格式(最常见) text = resp.text.strip() # 尝试直接解析为IP:PORT格式 lines = text.split('\n') for line in lines: line = line.strip() if ':' in line and not line.startswith('{') and not line.startswith('['): # 找到第一个IP:PORT格式 ip_port = line.split()[0] if ' ' in line else line # 处理可能带有其他信息的情况 if ip_port.count(':') == 1: # 确保是IP:PORT格式 nowtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) self.logger.info(f'提取大麦代理IP(文本): {ip_port} at {nowtime}') print(f'[代理] 提取大麦IP: {ip_port}') host, port = ip_port.split(':', 1) proxy_url = f'http://{PROXY_USERNAME}:{PROXY_PASSWORD}@{host}:{port}' self.current_proxy = { 'http': proxy_url, 'https': proxy_url, } return self.current_proxy # 如果文本解析失败,尝试JSON格式 try: result = resp.json() if result.get('code') == 0 and result.get('data'): # 获取第一个IP ip_list = result['data'] if ip_list and len(ip_list) > 0: ip_info = ip_list[0] ip_port = f"{ip_info['ip']}:{ip_info['port']}" nowtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) self.logger.info(f'提取大麦代理IP(JSON): {ip_port} at {nowtime}') print(f'[代理] 提取大麦IP: {ip_port}') # 构建带账密的代理URL: http://username:password@host:port proxy_url = f'http://{PROXY_USERNAME}:{PROXY_PASSWORD}@{ip_info["ip"]}:{ip_info["port"]}' self.current_proxy = { 'http': proxy_url, 'https': proxy_url, } return self.current_proxy except: pass # JSON解析失败,继续到备用代理 raise Exception(f"无法解析代理API返回结果: {text[:100]}") except Exception as exc: self.logger.warning(f'大麦代理API获取失败: {exc},使用备用固定代理池') print(f'[代理] 大麦API获取失败,使用备用代理池') # 从备用代理池随机选择一个(支持账密认证) import random backup_proxy = random.choice(BACKUP_PROXY_POOL) ip_port = backup_proxy['ip'] username = backup_proxy['user'] password = backup_proxy['password'] # 构建带账密的代理URL: http://username:password@host:port host, port = ip_port.split(':', 1) proxy_url = f'http://{username}:{password}@{host}:{port}' self.logger.info(f'使用备用代理: {username}@{ip_port}') print(f'[代理] 使用备用: {username}@{ip_port}') self.current_proxy = { 'http': proxy_url, 'https': proxy_url, } return self.current_proxy def get_common_headers(self) -> Dict: """获取通用请求头""" return { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Cache-Control': 'max-age=0', } def fetch_analytics_page(self) -> Optional[str]: """获取数据分析页面HTML Returns: Optional[str]: 页面HTML内容,失败返回None """ try: headers = self.get_common_headers() headers['Referer'] = f'{self.base_url}/builder/rc/home' # 获取代理(如果启用) proxies = self.fetch_proxy() if self.use_proxy else None response = self.session.get( self.analytics_url, headers=headers, proxies=proxies, timeout=30, verify=False ) if response.status_code == 200: print(f"[OK] 成功获取数据分析页面 (长度: {len(response.text)})") return response.text else: print(f"[X] 获取页面失败,状态码: {response.status_code}") return None except Exception as e: print(f"[X] 请求异常: {e}") return None def fetch_analytics_api(self, days: int = 7, max_retries: int = 3) -> Optional[Dict]: """通过API接口获取数据分析数据 使用真实的百家号发文统计API (appStatisticV3) Args: days: 查询天数,默认7天 Returns: Optional[Dict]: 数据分析结果,失败返回None """ from datetime import datetime, timedelta # 计算日期范围(注意:从N天前到昨天,不包括今天) end_date = datetime.now() - timedelta(days=1) # 昨天 start_date = end_date - timedelta(days=days-1) # N天前 start_day = start_date.strftime('%Y%m%d') end_day = end_date.strftime('%Y%m%d') # 真实的API端点(使用appStatisticV3) api_url = f"{self.base_url}/author/eco/statistics/appStatisticV3" # 请求参数 params = { 'type': 'event', # 使用event类型可获取每日滑图占比数据 'start_day': start_day, 'end_day': end_day, 'stat': '0', 'special_filter_days': str(days) } # 从Cookie中提取token(JWT) token_cookie = self.session.cookies.get('bjhStoken') or self.session.cookies.get('devStoken') # 请求头 headers = { 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.9', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Referer': f'{self.base_url}/builder/rc/analysiscontent', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', } # 添加token头(重要!) if token_cookie: headers['token'] = token_cookie self.logger.debug(f"使用token: {token_cookie[:50]}...") print(f"[调试] 使用token: {token_cookie[:50]}...") else: self.logger.warning("未找到token,请求可能失败") print("[!] 警告: 未找到token,请求可能失败") self.logger.info(f"获取发文统计数据: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')} ({days}天)") print(f"\n[请求] 获取发文统计数据") print(f" 日期范围: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')} ({days}天)") print(f" API: {api_url}") successful_data = [] retry_count = 0 while retry_count <= max_retries: try: # 如果是重试,先等待一段时间 if retry_count > 0: wait_time = retry_count * 2 # 递增等待时间:2秒、4秒、6秒 self.logger.info(f"发文统计API 第{retry_count}次重试,等待{wait_time}秒") print(f" [重试 {retry_count}/{max_retries}] 等待{wait_time}秒...") time.sleep(wait_time) # 获取代理(如果启用) proxies = self.fetch_proxy() if self.use_proxy else None response = self.session.get( api_url, headers=headers, params=params, proxies=proxies, timeout=15, verify=False ) self.logger.info(f"API响应状态码: {response.status_code}") print(f" 状态码: {response.status_code}") if response.status_code == 200: try: data = response.json() # 检查API响应 errno = data.get('errno', -1) errmsg = data.get('errmsg', '') if errno == 0: self.logger.info("发文统计API调用成功") print(f" [✓] API调用成功") # 提取发文统计数据 total_info = data.get('data', {}).get('total_info', {}) # 记录关键指标 self.logger.info(f"发文量: {total_info.get('publish_count', '0')}") self.logger.info(f"曝光量: {total_info.get('disp_pv', '0')}") self.logger.info(f"阅读量: {total_info.get('view_count', '0')}") self.logger.info(f"点击率: {total_info.get('click_rate', '0')}%") # 显示数据摘要 print(f"\n 发文统计数据:") print(f" 发文量: {total_info.get('publish_count', '0')}") print(f" 曝光量: {total_info.get('disp_pv', '0')}") print(f" 阅读量: {total_info.get('view_count', '0')}") print(f" 点击率: {total_info.get('click_rate', '0')}%") api_result = { 'endpoint': '/author/eco/statistics/appStatisticV3', 'name': '发文统计', 'date_range': f"{start_day} - {end_day}", 'data': data, 'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } successful_data.append(api_result) break # 成功,跳出重试循环 else: self.logger.error(f"API返回错误: errno={errno}, errmsg={errmsg}") print(f" [X] API返回错误: errno={errno}, errmsg={errmsg}") break # API错误,不重试 except json.JSONDecodeError as e: self.logger.error(f"JSON解析失败: {e}") print(f" [X] JSON解析失败: {e}") print(f" 响应内容: {response.text[:500]}") break # JSON错误,不重试 else: self.logger.error(f"HTTP错误: {response.status_code}") print(f" [X] HTTP错误: {response.status_code}") break # HTTP错误,不重试 except Exception as e: error_type = type(e).__name__ # 判断是否是代理相关错误 is_proxy_error = any([ 'Connection' in error_type, 'RemoteDisconnected' in error_type, 'ProxyError' in error_type, 'Timeout' in error_type, 'ConnectionError' in str(e), 'Connection aborted' in str(e), 'Remote end closed' in str(e), 'Tunnel connection failed' in str(e), ]) if is_proxy_error: if retry_count < max_retries: self.logger.warning(f"发文统计API代理连接错误: {error_type},将重试") print(f" [!] 代理连接错误: {error_type}") retry_count += 1 continue else: self.logger.error(f"发文统计API请求失败,已达最大重试次数: {error_type}") print(f" [X] 请求失败,已达最大重试次数") break else: self.logger.error(f"请求异常: {e}", exc_info=True) print(f" [X] 请求异常: {e}") break # 返回成功的API数据 if successful_data: return { 'apis': successful_data, 'count': len(successful_data) } return None def parse_analytics_data(self, html: str) -> Dict: """解析页面中的数据分析指标 Args: html: 页面HTML内容 Returns: Dict: 提取的数据指标 """ import re analytics_data = { 'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'metrics': {} } # 尝试提取页面中的数据 # 百家号通常会在页面中嵌入JSON数据或通过异步加载 # 方法1: 查找JSON数据块 json_pattern = r']*>\s*(?:var|window\.)?\s*(\w+)\s*=\s*(\{[^<]+\})\s*;?\s*' matches = re.finditer(json_pattern, html, re.DOTALL) for match in matches: var_name = match.group(1) json_str = match.group(2) try: data = json.loads(json_str) if isinstance(data, dict) and len(data) > 0: analytics_data['metrics'][var_name] = data print(f"[发现] 数据块: {var_name}") except: pass # 方法2: 查找数值模式 patterns = { '单日发文量': [ r'(?:单日发文|今日发文|日发文).*?(\d+)', r'todayPublish.*?(\d+)', ], '累计发文量': [ r'(?:累计发文|总发文|发文总数).*?(\d+)', r'totalPublish.*?(\d+)', ], '当月收益': [ r'(?:当月收益|本月收益|月收益).*?([\d.]+)', r'monthIncome.*?([\d.]+)', ], '当周收益': [ r'(?:当周收益|本周收益|周收益).*?([\d.]+)', r'weekIncome.*?([\d.]+)', ], '收益月环比': [ r'(?:月环比|月同比).*?([\d.]+)%', r'monthRate.*?([\d.]+)', ], '周环比': [ r'(?:周环比|周同比).*?([\d.]+)%', r'weekRate.*?([\d.]+)', ], } for metric_name, pattern_list in patterns.items(): for pattern in pattern_list: match = re.search(pattern, html, re.IGNORECASE) if match: value = match.group(1) analytics_data['metrics'][metric_name] = value print(f"[提取] {metric_name}: {value}") break return analytics_data def extract_account_analytics(self, account_id: str, days: int = 7) -> Optional[Dict]: """提取指定账号的数据分析指标 Args: account_id: 账号ID days: 查询天数,默认7天 Returns: Optional[Dict]: 数据分析结果 """ print(f"\n{'='*70}") print(f"开始提取账号数据: {account_id}") print(f"{'='*70}") # 设置Cookie if not self.set_account_cookies(account_id): return None result = { 'account_id': account_id, 'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'status': 'unknown', 'data': {} } # 尝试方法1: 通过API获取 print("\n[方法1] 尝试通过API接口获取数据...") api_data = self.fetch_analytics_api(days=days) if api_data: result['data']['api_data'] = api_data result['status'] = 'success_api' print("[OK] 通过API成功获取数据") # 尝试方法2: 解析页面 print("\n[方法2] 尝试解析页面HTML获取数据...") html = self.fetch_analytics_page() if html: parsed_data = self.parse_analytics_data(html) result['data']['parsed_data'] = parsed_data if result['status'] == 'unknown': result['status'] = 'success_html' print("[OK] 页面数据解析完成") if result['status'] == 'unknown': result['status'] = 'failed' print("[X] 所有方法均未获取到数据") return result def fetch_income_data_v2(self, max_retries: int = 3) -> Optional[Dict]: """获取收入数据(使用v2 API,一次返回多个时间段) Returns: 收入数据字典,包含昨日、近7天、近30天、本月等多个时间段 失败返回None """ # API端点(使用v2版本) api_url = f"{self.base_url}/author/eco/income4/overviewhometabv2" # 从Cookie中提取token(JWT) token_cookie = self.session.cookies.get('bjhStoken') or self.session.cookies.get('devStoken') # 简洁的请求头 headers = { 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.9', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Referer': f'{self.base_url}/builder/rc/incomecenter', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', } # 添加token头 if token_cookie: headers['token'] = token_cookie self.logger.debug(f"使用token: {token_cookie[:50]}...") else: self.logger.warning("未找到token,收入数据请求可能失败") self.logger.info("获取收入数据(v2多时段API)") print(f"\n[请求] 获取收入数据(v2多时段API)") print(f" API: {api_url}") retry_count = 0 while retry_count <= max_retries: try: # 如果是重试,先等待一段时间 if retry_count > 0: wait_time = retry_count * 2 # 递增等待时间:2秒、4秒、6秒 self.logger.info(f"收入数据API 第{retry_count}次重试,等待{wait_time}秒") print(f" [重试 {retry_count}/{max_retries}] 等待{wait_time}秒...") time.sleep(wait_time) # 获取代理(如果启用) proxies = self.fetch_proxy() if self.use_proxy else None response = self.session.get( api_url, headers=headers, proxies=proxies, timeout=15, verify=False ) self.logger.info(f"收入API响应状态码: {response.status_code}") print(f" 状态码: {response.status_code}") if response.status_code == 200: try: data = response.json() errno = data.get('errno', -1) errmsg = data.get('errmsg', '') if errno == 0: self.logger.info("收入数据API调用成功") print(f" [✓] API调用成功") # 显示收入数据摘要 income_data = data.get('data', {}).get('income', {}) if 'recent7Days' in income_data: recent7 = income_data['recent7Days'] value7 = recent7.get('value', '0.00') self.logger.info(f"近7天收入: ¥{value7}") print(f" 近7天: ¥{value7}") if 'recent30Days' in income_data: recent30 = income_data['recent30Days'] value30 = recent30.get('value', '0.00') self.logger.info(f"近30天收入: ¥{value30}") print(f" 近30天: ¥{value30}") return data else: self.logger.error(f"收入API返回错误: errno={errno}, errmsg={errmsg}") print(f" [X] API返回错误: errno={errno}, errmsg={errmsg}") return None except json.JSONDecodeError as e: self.logger.error(f"收入数据JSON解析失败: {e}") print(f" [X] JSON解析失败: {e}") return None else: self.logger.error(f"收入API HTTP错误: {response.status_code}") print(f" [X] HTTP错误: {response.status_code}") return None except Exception as e: error_type = type(e).__name__ # 判断是否是代理相关错误 is_proxy_error = any([ 'Connection' in error_type, 'RemoteDisconnected' in error_type, 'ProxyError' in error_type, 'Timeout' in error_type, 'ConnectionError' in str(e), 'Connection aborted' in str(e), 'Remote end closed' in str(e), 'Tunnel connection failed' in str(e), ]) if is_proxy_error: if retry_count < max_retries: self.logger.warning(f"收入数据API代理连接错误: {error_type},将重试") print(f" [!] 代理连接错误: {error_type}") retry_count += 1 continue else: self.logger.error(f"收入数据API请求失败,已达最大重试次数: {error_type}") print(f" [X] 请求失败,已达最大重试次数") return None else: self.logger.error(f"收入数据请求异常: {e}", exc_info=True) print(f" [X] 请求异常: {e}") return None return None def fetch_daily_income(self, target_date: datetime, max_retries: int = 3) -> Optional[Dict]: """获取指定日期的收入数据(单日收入API) Args: target_date: 目标日期 max_retries: 最大重试次数 Returns: 单日收入数据字典,失败返回None """ # API端点(单日收入查询) api_url = f"{self.base_url}/author/eco/income4/overviewhometab" # 计算日期的Unix时间戳(当天0点) date_timestamp = int(target_date.replace(hour=0, minute=0, second=0, microsecond=0).timestamp()) # 请求参数 params = { 'start_date': date_timestamp, 'end_date': date_timestamp } # 从Cookie中提取token(JWT) token_cookie = self.session.cookies.get('bjhStoken') or self.session.cookies.get('devStoken') # 请求头 headers = { 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'zh-CN,zh;q=0.9', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Referer': f'{self.base_url}/builder/rc/incomecenter', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', } # 添加token头 if token_cookie: headers['token'] = token_cookie else: self.logger.warning("未找到token,单日收入请求可能失败") # 重试机制 retry_count = 0 last_error = None while retry_count <= max_retries: try: # 如果是重试,先等待一段时间 if retry_count > 0: wait_time = retry_count * 2 # 递增等待时间:2秒、4秒、6秒 self.logger.info(f"单日收入 {target_date.strftime('%Y-%m-%d')} 第{retry_count}次重试,等待{wait_time}秒") time.sleep(wait_time) # 获取代理(如果启用) proxies = self.fetch_proxy() if self.use_proxy else None response = self.session.get( api_url, headers=headers, params=params, proxies=proxies, timeout=15, verify=False ) if response.status_code == 200: try: data = response.json() errno = data.get('errno', -1) if errno == 0: return data else: self.logger.error(f"单日收入API返回错误: errno={errno}") return None except json.JSONDecodeError as e: self.logger.error(f"单日收入JSON解析失败: {e}") return None else: self.logger.error(f"单日收入HTTP错误: {response.status_code}") return None except Exception as e: last_error = str(e) error_type = type(e).__name__ # 判断是否是代理相关错误 is_proxy_error = any([ 'Connection' in error_type, 'RemoteDisconnected' in error_type, 'ProxyError' in error_type, 'Timeout' in error_type, 'ConnectionError' in str(e), 'Connection aborted' in str(e), 'Remote end closed' in str(e), ]) if is_proxy_error: if retry_count < max_retries: self.logger.warning(f"单日收入代理连接错误 ({target_date.strftime('%Y-%m-%d')}): {error_type},将重试") retry_count += 1 continue else: self.logger.error(f"单日收入请求失败 ({target_date.strftime('%Y-%m-%d')}),已达最大重试次数: {error_type} - {last_error}") return None else: self.logger.error(f"单日收入请求异常 ({target_date.strftime('%Y-%m-%d')}): {e}", exc_info=True) return None return None def extract_integrated_data(self, account_id: str, days: int = 7) -> Optional[Dict]: """提取指定账号的整合数据(发文统计 + 收入数据) Args: account_id: 账号ID days: 查询天数,默认7天 Returns: Optional[Dict]: 整合数据结果 """ print(f"\n{'='*70}") print(f"开始提取账号数据: {account_id}") print(f"{'='*70}") # 设置Cookie if not self.set_account_cookies(account_id): return None result = { 'account_id': account_id, 'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'status': 'unknown', 'analytics': {}, 'income': {}, 'error_info': {} } # 1. 获取发文统计数据 print("\n[1/2] 获取发文统计数据...") api_data = self.fetch_analytics_api(days=days) if api_data: result['analytics'] = api_data print("[OK] 发文统计数据获取成功") else: print("[X] 发文统计数据获取失败") result['error_info']['analytics'] = 'API调用失败或被限流' # API调用之间添加随机延迟(2-4秒),模拟人工操作 import random api_delay = random.uniform(2, 4) print(f"\n[间隔] 等待 {api_delay:.1f} 秒...") time.sleep(api_delay) # 2. 获取收入数据(当日收益存入day_revenue,当周收益从数据库汇总计算) print("\n[2/2] 获取收入数据...") income_data = self.fetch_income_data_v2() if income_data: result['income'] = income_data print("[OK] 收入数据获取成功") else: print("[X] 收入数据获取失败") result['error_info']['income'] = 'API调用失败或被限流' # 设置状态 if result['analytics'] and result['income']: result['status'] = 'success_all' elif result['analytics'] or result['income']: result['status'] = 'success_partial' elif result['error_info']: # 判断是否为限流 result['status'] = 'rate_limited' else: result['status'] = 'failed' return result def extract_all_integrated_data(self, days: int = 7, delay_seconds: float = 3.0, stop_on_rate_limit: bool = False) -> List[Dict]: """提取所有账号的整合数据 Args: days: 查询天数,默认7天 delay_seconds: 每个账号之间的延迟时间(秒) stop_on_rate_limit: 遇到连续限流时是否停止(默认False) Returns: List[Dict]: 所有账号的整合数据结果 """ if not self.account_cookies: print("[X] 没有可用的账号Cookie") return [] print("\n" + "="*70) print(f"开始提取 {len(self.account_cookies)} 个账号的整合数据(发文统计 + 收入)") print("="*70) import random results = [] rate_limited_count = 0 for idx, account_id in enumerate(self.account_cookies.keys(), 1): print(f"\n[{idx}/{len(self.account_cookies)}] 处理账号: {account_id}") result = self.extract_integrated_data(account_id, days=days) if result: results.append(result) # 检测限流情况 if result.get('status') == 'rate_limited': rate_limited_count += 1 self.logger.warning(f"账号 {account_id} 被限流,连续限流次数: {rate_limited_count}") # 限流后增加额外等待时间,避免连续触发 extra_delay = random.uniform(15, 25) # 额外等待15-25秒 print(f"\n[!] 检测到限流,额外等待 {extra_delay:.1f} 秒") time.sleep(extra_delay) # 如果启用了限流停止,且连续3个账号被限流 if stop_on_rate_limit and rate_limited_count >= 3: print("\n" + "="*70) print("[!] 检测到连续限流,停止本次更新") print("[!] 建议稍后再试") print("="*70) break else: # 重置限流计数 rate_limited_count = 0 # 添加延迟 if idx < len(self.account_cookies): actual_delay = delay_seconds * random.uniform(0.7, 1.3) print(f"\n[延迟] 等待 {actual_delay:.1f} 秒后继续...") time.sleep(actual_delay) return results def extract_all_accounts(self, days: int = 7) -> List[Dict]: """提取所有账号的数据分析指标 Args: days: 查询天数,默认7天 Returns: List[Dict]: 所有账号的数据分析结果 """ if not self.account_cookies: print("[X] 没有可用的账号Cookie") return [] print("\n" + "="*70) print(f"开始提取 {len(self.account_cookies)} 个账号的数据分析") print("="*70) results = [] for idx, account_id in enumerate(self.account_cookies.keys(), 1): print(f"\n[{idx}/{len(self.account_cookies)}] 处理账号: {account_id}") result = self.extract_account_analytics(account_id, days=days) if result: results.append(result) # 避免请求过快,添加随机延迟(2-5秒) if idx < len(self.account_cookies): import random delay = random.uniform(2, 5) print(f"\n[延迟] 等待 {delay:.1f} 秒后继续...") time.sleep(delay) return results def save_results(self, results: List[Dict]): """保存结果到文件 Args: results: 数据分析结果列表 """ try: with open(self.output_file, 'w', encoding='utf-8') as f: json.dump(results, f, ensure_ascii=False, indent=2) print(f"\n{'='*70}") print(f"[OK] 数据已保存到: {self.output_file}") print(f"{'='*70}") # 显示统计 success_count = sum(1 for r in results if r.get('status', '').startswith('success')) print(f"\n统计信息:") print(f" - 总账号数: {len(results)}") print(f" - 成功获取: {success_count}") print(f" - 失败: {len(results) - success_count}") except Exception as e: print(f"[X] 保存文件失败: {e}") def display_results_summary(self, results: List[Dict]): """显示结果摘要 Args: results: 数据分析结果列表 """ print(f"\n{'='*70}") print("数据提取摘要") print(f"{'='*70}") for result in results: account_id = result.get('account_id', 'unknown') status = result.get('status', 'unknown') print(f"\n账号: {account_id}") print(f"状态: {status}") data = result.get('data', {}) # 显示API数据 if 'api_data' in data: api_data = data['api_data'] # 检查是否是新的多个API结果 if 'apis' in api_data: apis_list = api_data['apis'] print(f"\n成功获取 {len(apis_list)} 个API端点数据:") for api_item in apis_list: endpoint = api_item.get('endpoint', 'unknown') api_resp_data = api_item.get('data', {}) print(f"\n API: {endpoint}") # 尝试提取数据 self._print_api_metrics(api_resp_data) else: # 旧格式 endpoint = api_data.get('endpoint', 'unknown') print(f"API端点: {endpoint}") print(f"获取时间: {api_data.get('fetch_time', 'unknown')}") # 显示解析的指标 if 'parsed_data' in data: parsed = data['parsed_data'] metrics = parsed.get('metrics', {}) if metrics: print("\n从页面提取的指标:") for key, value in metrics.items(): print(f" - {key}: {value}") print("-" * 70) def _print_api_metrics(self, api_data: Dict): """打印API返回的指标数据 Args: api_data: API响应数据 """ # 查找常见指标 metric_keys = [ 'publish_count', # 发文量 'today_publish', # 今日发文 'total_publish', # 累计发文 'month_income', # 月收益 'week_income', # 周收益 'month_rate', # 月环比 'week_rate', # 周环比 'income', # 收益 'count', # 计数 ] # 检查data字段 if 'data' in api_data: inner_data = api_data['data'] if isinstance(inner_data, dict): print(f" 数据字段: {list(inner_data.keys())}") for key in metric_keys: if key in inner_data: print(f" - {key}: {inner_data[key]}") # 检查result字段 if 'result' in api_data: result_data = api_data['result'] if isinstance(result_data, dict): print(f" 结果字段: {list(result_data.keys())}") for key in metric_keys: if key in result_data: print(f" - {key}: {result_data[key]}") def main(): """主函数""" print("\n" + "="*70) print("百家号数据整合抓取工具(发文统计 + 收入数据)") print("="*70) # 选择数据源 print("\n请选择Cookie数据源:") print(" 1. 本地JSON文件 (captured_account_cookies.json)") print(" 2. MySQL数据库 (ai_authors表)") source_input = input("\n请选择 (1/2, 默认1): ").strip() or '1' load_from_db = source_input == '2' # 是否启用代理 proxy_input = input("\n是否启用代理?(y/n,默认n): ").strip().lower() use_proxy = proxy_input == 'y' # 创建分析器 if load_from_db: print("\n[配置] 使用数据库加载模式") analytics = BaijiahaoAnalytics(use_proxy=use_proxy, load_from_db=True) else: print("\n[配置] 使用本地文件加载模式") analytics = BaijiahaoAnalytics(use_proxy=use_proxy) if not analytics.account_cookies: print("\n[!] 未找到可用的账号Cookie") if not load_from_db: print(" 请先运行一键捕获Cookie工具") return # 显示可用账号 print(f"\n找到 {len(analytics.account_cookies)} 个账号:") for idx, (account_id, data) in enumerate(analytics.account_cookies.items(), 1): domain = data.get('domain', 'unknown') capture_time = data.get('first_captured', 'unknown') print(f" {idx}. {account_id} (域名: {domain}, 捕获: {capture_time})") # 询问用户选择 print("\n" + "="*70) print("请选择操作:") print(" 1. 提取所有账号的整合数据(发文统计 + 收入)") print(" 2. 仅提取发文统计数据") print(" 3. 提取指定账号数据") print(" 0. 退出") print("="*70) choice = input("\n请选择 (0-3): ").strip() if choice == '0': print("\n退出程序") return elif choice == '1': # 提取所有账号的整合数据 days_input = input("\n请输入查询天数 (默认7天): ").strip() days = int(days_input) if days_input.isdigit() else 7 print(f"\n开始获取所有账号的整合数据 (最近{days}天)...\n") results = analytics.extract_all_integrated_data(days=days) if results: analytics.save_results(results) # 显示简单统计 success_all = sum(1 for r in results if r.get('status') == 'success_all') success_partial = sum(1 for r in results if r.get('status') == 'success_partial') failed = sum(1 for r in results if r.get('status') == 'failed') print(f"\n{'='*70}") print("数据提取统计") print(f"{'='*70}") print(f" 总账号数: {len(results)}") print(f" 全部成功: {success_all} (发文+收入)") print(f" 部分成功: {success_partial}") print(f" 失败: {failed}") print(f"{'='*70}") elif choice == '2': # 仅提取发文统计数据 days_input = input("\n请输入查询天数 (默认7天): ").strip() days = int(days_input) if days_input.isdigit() else 7 print(f"\n开始获取所有账号的发文统计数据 (最近{days}天)...\n") results = analytics.extract_all_accounts(days=days) if results: # 保存到旧文件 with open(analytics.analytics_output, 'w', encoding='utf-8') as f: json.dump(results, f, ensure_ascii=False, indent=2) print(f"\n[OK] 数据已保存到: {analytics.analytics_output}") analytics.display_results_summary(results) elif choice == '3': # 提取指定账号 account_list = list(analytics.account_cookies.keys()) print("\n可用账号:") for idx, account_id in enumerate(account_list, 1): print(f" {idx}. {account_id}") try: idx = int(input("\n请输入账号序号: ").strip()) if 1 <= idx <= len(account_list): account_id = account_list[idx - 1] days_input = input("\n请输入查询天数 (默认7天): ").strip() days = int(days_input) if days_input.isdigit() else 7 # 选择整合数据还是只要发文统计 data_type = input("\n1-整合数据 2-仅发文统计 (默认1): ").strip() if data_type == '2': result = analytics.extract_account_analytics(account_id, days=days) else: result = analytics.extract_integrated_data(account_id, days=days) if result: results = [result] analytics.save_results(results) else: print("[X] 无效的序号") except ValueError: print("[X] 请输入有效的数字") else: print("[X] 无效的选择") print("\n" + "="*70) print("程序执行完成") print("="*70 + "\n") if __name__ == '__main__': main()