Files
baijiahao_data_crawl/bjh_analytics.py

1593 lines
69 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
百家号数据整合抓取工具
同时获取发文统计和收入数据
"""
import json
import sys
import os
import requests
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional
# 设置标准输出编码为UTF-8
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
# 禁用SSL警告
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 导入日志配置
from log_config import setup_bjh_analytics_logger
from database_config import DatabaseManager, DB_CONFIG
# 代理配置 - 大麦代理IP
PROXY_API_URL = (
'https://api2.damaiip.com/index.php?s=/front/user/getIPlist&xsn=2912cb2b22d3b7ae724f045012790479&osn=TC_NO176707424165606223&tiqu=1'
)
# 大麦代理账号密码认证
PROXY_USERNAME = '69538fdef04e1'
PROXY_PASSWORD = '63v0kQBr2yJXnjf'
# 备用固定代理IP池格式'IP:端口', '用户名', '密码'
BACKUP_PROXY_POOL = [
{'ip': '61.171.69.167:50000', 'user': '6jinnh', 'password': 'fi9k7q5d'},
{'ip': '36.213.32.122:50001', 'user': '9w6xpg', 'password': 'tqswr1ee'},
]
class BaijiahaoAnalytics:
"""百家号数据整合抓取器(发文统计 + 收入数据)"""
def __init__(self, use_proxy: bool = False, load_from_db: bool = False, db_config: Optional[Dict] = None):
"""初始化
Args:
use_proxy: 是否使用代理默认False
load_from_db: 是否从MySSQL数据库加载账号Cookie默认False
db_config: 数据库配置默认使用database_config.DB_CONFIG
"""
self.base_url = "https://baijiahao.baidu.com"
self.analytics_url = f"{self.base_url}/builder/rc/analysiscontent"
self.session = requests.Session()
self.session.verify = False
# 代理配置
self.use_proxy = use_proxy
self.current_proxy = None # 当前IP使用完后/失败后才重新获取
self.proxy_fail_count = 0 # 当前代理失败次数
# 数据库配置
self.load_from_db = load_from_db
self.db_manager = None
if load_from_db:
self.db_manager = DatabaseManager(db_config)
# 初始化日志
self.logger = setup_bjh_analytics_logger()
if self.use_proxy:
self.logger.info("已启用代理模式")
print("[配置] 已启用代理模式")
# 初始化时获取第一个代理
self.fetch_proxy(force_new=True)
if self.load_from_db:
self.logger.info("已启用数据库加载模式")
print("[配置] 已启用数据库加载模式")
# 获取脚本所在目录
self.script_dir = os.path.dirname(os.path.abspath(__file__))
# 加载捕获的Cookie数据
self.captured_cookies_file = os.path.join(self.script_dir, "captured_account_cookies.json")
# 根据配置选择加载方式
if self.load_from_db:
self.account_cookies = self.load_cookies_from_database()
else:
self.account_cookies = self.load_captured_cookies()
# 输出文件(整合数据)
self.output_file = os.path.join(self.script_dir, "bjh_integrated_data.json")
# 兼容旧的输出文件
self.analytics_output = os.path.join(self.script_dir, "bjh_analytics_data.json")
self.income_output = os.path.join(self.script_dir, "bjh_income_data_v2.json")
# 创建备份文件夹
self.backup_dir = os.path.join(self.script_dir, "backup")
if not os.path.exists(self.backup_dir):
os.makedirs(self.backup_dir)
print(f"[OK] 创建备份文件夹: {self.backup_dir}")
def cookie_string_to_dict(self, cookie_string: str) -> Dict:
"""将Cookie字符串转换为字典格式
Args:
cookie_string: Cookie字符串格式: "key1=value1; key2=value2"
Returns:
Cookie字典
"""
cookie_dict = {}
if not cookie_string:
return cookie_dict
for item in cookie_string.split(';'):
item = item.strip()
if '=' in item:
key, value = item.split('=', 1)
cookie_dict[key.strip()] = value.strip()
return cookie_dict
def load_cookies_from_database(self) -> Dict:
"""从数据库加载Cookie数据
Returns:
账号Cookie字典格式与 load_captured_cookies() 相同
"""
try:
if not self.db_manager:
print("[X] 数据库管理器未初始化")
return {}
# 查询所有激活且有Cookie的账号
# channel=1 表示百度百家号
sql = """
SELECT id, author_name, app_id, toutiao_cookie, department_name
FROM ai_authors
WHERE channel = 1
AND status = 'active'
AND toutiao_cookie IS NOT NULL
AND toutiao_cookie != ''
ORDER BY id
"""
results = self.db_manager.execute_query(sql)
if not results:
print("[X] 数据库中未找到可用的账号Cookie")
return {}
# 转换为与 JSON 文件相同的格式
account_cookies = {}
for row in results:
author_id = row['id']
author_name = row['author_name']
app_id = row['app_id'] or ''
cookie_string = row['toutiao_cookie']
domain = row['department_name'] or '其它'
# 将Cookie字符串转换为字典
cookies = self.cookie_string_to_dict(cookie_string)
if not cookies:
continue
# 使用 author_name 作为 key与 JSON 文件保持一致
account_cookies[author_name] = {
'author_id': author_id,
'app_id': app_id,
'nick': author_name,
'domain': domain,
'cookies': cookies,
'first_captured': None, # 数据库中没有此字段
'last_updated': None,
'source': 'database' # 标记数据来源
}
self.logger.info(f"从数据库加载了 {len(account_cookies)} 个账号的Cookie")
print(f"[OK] 从数据库加载了 {len(account_cookies)} 个账号的Cookie")
return account_cookies
except Exception as e:
self.logger.error(f"从数据库加载Cookie失败: {e}", exc_info=True)
print(f"[X] 从数据库加载Cookie失败: {e}")
return {}
def load_captured_cookies(self) -> Dict:
"""从本地JSON文件加载已捕获的Cookie数据"""
try:
with open(self.captured_cookies_file, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"[OK] 从文件加载了 {len(data)} 个账号的Cookie")
return data
except FileNotFoundError:
print(f"[X] 未找到Cookie文件: {self.captured_cookies_file}")
print(" 请先运行一键捕获Cookie工具")
return {}
except Exception as e:
print(f"[X] 加载Cookie失败: {e}")
return {}
def set_account_cookies(self, account_id: str) -> bool:
"""设置指定账号的Cookie到会话
Args:
account_id: 账号ID
Returns:
bool: 是否成功设置
"""
if account_id not in self.account_cookies:
print(f"[X] 未找到账号 {account_id} 的Cookie")
return False
account_data = self.account_cookies[account_id]
cookies = account_data.get('cookies', {})
if not cookies:
print(f"[X] 账号 {account_id} 的Cookie为空")
return False
# 设置Cookie
self.session.cookies.clear()
for key, value in cookies.items():
self.session.cookies.set(key, value, domain='.baidu.com')
print(f"[OK] 已设置账号 {account_id} 的Cookie ({len(cookies)} 个字段)")
return True
def fetch_proxy(self, force_new: bool = False) -> Optional[Dict]:
"""从代理服务获取一个可用代理,失败时使用备用固定代理
Args:
force_new: 是否强制获取新代理默认False优先使用当前IP
Returns:
代理配置字典,格式: {'http': 'http://...', 'https': 'http://...'}
"""
if not self.use_proxy:
return None
# 如果已有可用代理且不强制获取新代理,直接返回
if self.current_proxy and not force_new:
return self.current_proxy
# 获取新代理
try:
# 使用大麦代理API获取IP
resp = requests.get(PROXY_API_URL, timeout=10)
resp.raise_for_status()
# 首先尝试解析为纯文本格式(最常见)
text = resp.text.strip()
# 检测是否返回错误信息
if text.upper().startswith('ERROR'):
raise Exception(f"代理API返回错误: {text}")
# 尝试直接解析为IP:PORT格式
lines = text.split('\n')
for line in lines:
line = line.strip()
if ':' in line and not line.startswith('{') and not line.startswith('['):
# 找到第一个IP:PORT格式
ip_port = line.split()[0] if ' ' in line else line
if ip_port.count(':') == 1: # 确保是IP:PORT格式
nowtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.logger.info(f'提取大麦代理IP(文本): {ip_port} at {nowtime}')
print(f'[代理] 提取大麦IP: {ip_port}')
# 大麦代理使用账号密码认证
host, port = ip_port.split(':', 1)
if PROXY_USERNAME and PROXY_PASSWORD:
proxy_url = f'http://{PROXY_USERNAME}:{PROXY_PASSWORD}@{host}:{port}'
else:
proxy_url = f'http://{host}:{port}'
self.current_proxy = {
'http': proxy_url,
'https': proxy_url,
}
return self.current_proxy
# 如果文本解析失败尝试JSON格式
try:
result = resp.json()
if result.get('code') == 0 and result.get('data'):
# 获取第一个IP
ip_list = result['data']
if ip_list and len(ip_list) > 0:
ip_info = ip_list[0]
ip_port = f"{ip_info['ip']}:{ip_info['port']}"
nowtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.logger.info(f'提取大麦代理IP(JSON): {ip_port} at {nowtime}')
print(f'[代理] 提取大麦IP: {ip_port}')
# 大麦代理使用账号密码认证
if PROXY_USERNAME and PROXY_PASSWORD:
proxy_url = f'http://{PROXY_USERNAME}:{PROXY_PASSWORD}@{ip_info["ip"]}:{ip_info["port"]}'
else:
proxy_url = f'http://{ip_info["ip"]}:{ip_info["port"]}'
self.current_proxy = {
'http': proxy_url,
'https': proxy_url,
}
return self.current_proxy
except:
pass # JSON解析失败继续到备用代理
raise Exception(f"无法解析代理API返回结果: {text[:100]}")
except Exception as exc:
self.logger.warning(f'大麦代理API获取失败: {exc},使用备用固定代理池')
print(f'[代理] 大麦API获取失败使用备用代理池')
# 从备用代理池随机选择一个(支持账密认证)
import random
backup_proxy = random.choice(BACKUP_PROXY_POOL)
ip_port = backup_proxy['ip']
username = backup_proxy['user']
password = backup_proxy['password']
# 构建带账密的代理URL: http://username:password@host:port
host, port = ip_port.split(':', 1)
proxy_url = f'http://{username}:{password}@{host}:{port}'
self.logger.info(f'使用备用代理: {username}@{ip_port}')
print(f'[代理] 使用备用: {username}@{ip_port}')
self.current_proxy = {
'http': proxy_url,
'https': proxy_url,
}
return self.current_proxy
def mark_proxy_failed(self):
"""标记当前代理失败失败超过3次后重新获取代理
Returns:
bool: 是否需要重新获取代理
"""
if not self.use_proxy or not self.current_proxy:
return False
self.proxy_fail_count += 1
self.logger.warning(f"当前代理失败次数: {self.proxy_fail_count}")
# 失败超过3次重新获取代理
if self.proxy_fail_count >= 3:
self.logger.info("当前代理失败次数过多,重新获取新代理")
print(f"[代理] 失败{self.proxy_fail_count}次,重新获取新代理")
self.current_proxy = None
self.proxy_fail_count = 0
# 强制获取新代理
self.fetch_proxy(force_new=True)
return True
return False
def reset_proxy_fail_count(self):
"""重置代理失败计数(请求成功后调用)"""
self.proxy_fail_count = 0
def get_common_headers(self) -> Dict:
"""获取通用请求头"""
return {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0',
}
def fetch_analytics_page(self) -> Optional[str]:
"""获取数据分析页面HTML
Returns:
Optional[str]: 页面HTML内容失败返回None
"""
try:
headers = self.get_common_headers()
headers['Referer'] = f'{self.base_url}/builder/rc/home'
# 获取代理(如果启用)
proxies = self.fetch_proxy() if self.use_proxy else None
response = self.session.get(
self.analytics_url,
headers=headers,
proxies=proxies,
timeout=30,
verify=False
)
if response.status_code == 200:
print(f"[OK] 成功获取数据分析页面 (长度: {len(response.text)})")
return response.text
else:
print(f"[X] 获取页面失败,状态码: {response.status_code}")
return None
except Exception as e:
print(f"[X] 请求异常: {e}")
return None
def fetch_analytics_api(self, days: int = 7, max_retries: int = 3) -> Optional[Dict]:
"""通过API接口获取数据分析数据
使用真实的百家号发文统计API (appStatisticV3)
Args:
days: 查询天数默认7天
Returns:
Optional[Dict]: 数据分析结果失败返回None
"""
from datetime import datetime, timedelta
# 计算日期范围注意从N天前到昨天不包括今天
end_date = datetime.now() - timedelta(days=1) # 昨天
start_date = end_date - timedelta(days=days-1) # N天前
start_day = start_date.strftime('%Y%m%d')
end_day = end_date.strftime('%Y%m%d')
# 真实的API端点使用appStatisticV3
api_url = f"{self.base_url}/author/eco/statistics/appStatisticV3"
# 请求参数
params = {
'type': 'event', # 使用event类型可获取每日滑图占比数据
'start_day': start_day,
'end_day': end_day,
'stat': '0',
'special_filter_days': str(days)
}
# 从Cookie中提取tokenJWT
token_cookie = self.session.cookies.get('bjhStoken') or self.session.cookies.get('devStoken')
# 请求头
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': f'{self.base_url}/builder/rc/analysiscontent',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
}
# 添加token头重要
if token_cookie:
headers['token'] = token_cookie
self.logger.debug(f"使用token: {token_cookie[:50]}...")
print(f"[调试] 使用token: {token_cookie[:50]}...")
else:
self.logger.warning("未找到token请求可能失败")
print("[!] 警告: 未找到token请求可能失败")
self.logger.info(f"获取发文统计数据: {start_date.strftime('%Y-%m-%d')}{end_date.strftime('%Y-%m-%d')} ({days}天)")
print(f"\n[请求] 获取发文统计数据")
print(f" 日期范围: {start_date.strftime('%Y-%m-%d')}{end_date.strftime('%Y-%m-%d')} ({days}天)")
print(f" API: {api_url}")
successful_data = []
retry_count = 0
proxy_change_count = 0 # 代理更换次数计数器
max_proxy_changes = 3 # 最多更换3次代理即最多使用4个不同代理
while retry_count <= max_retries:
try:
# 如果是重试,先等待一段时间
if retry_count > 0:
wait_time = retry_count * 2 # 递增等待时间2秒、4秒、6秒
self.logger.info(f"发文统计API 第{retry_count}次重试,等待{wait_time}")
print(f" [重试 {retry_count}/{max_retries}] 等待{wait_time}秒...")
time.sleep(wait_time)
# 获取代理(如果启用)
proxies = self.fetch_proxy() if self.use_proxy else None
# 调试信息:显示代理使用情况
if self.use_proxy:
if proxies:
proxy_url = proxies.get('http', '')
if '@' in proxy_url:
# 提取IP部分隐藏账号密码
proxy_ip = proxy_url.split('@')[1]
else:
proxy_ip = proxy_url.replace('http://', '').replace('https://', '')
self.logger.info(f"发文统计API 使用代理: {proxy_ip}")
print(f" [代理] 使用IP: {proxy_ip}")
else:
self.logger.warning(f"发文统计API 代理未生效use_proxy={self.use_proxy}")
print(f" [!] 警告代理未生效use_proxy={self.use_proxy}")
response = self.session.get(
api_url,
headers=headers,
params=params,
proxies=proxies,
timeout=15,
verify=False
)
self.logger.info(f"API响应状态码: {response.status_code}")
print(f" 状态码: {response.status_code}")
if response.status_code == 200:
try:
data = response.json()
# 检查API响应
errno = data.get('errno', -1)
errmsg = data.get('errmsg', '')
if errno == 0:
self.logger.info("发文统计API调用成功")
print(f" [✓] API调用成功")
# 请求成功,重置代理失败计数
self.reset_proxy_fail_count()
# 提取发文统计数据
total_info = data.get('data', {}).get('total_info', {})
# 记录关键指标
self.logger.info(f"发文量: {total_info.get('publish_count', '0')}")
self.logger.info(f"曝光量: {total_info.get('disp_pv', '0')}")
self.logger.info(f"阅读量: {total_info.get('view_count', '0')}")
self.logger.info(f"点击率: {total_info.get('click_rate', '0')}%")
# 显示数据摘要
print(f"\n 发文统计数据:")
print(f" 发文量: {total_info.get('publish_count', '0')}")
print(f" 曝光量: {total_info.get('disp_pv', '0')}")
print(f" 阅读量: {total_info.get('view_count', '0')}")
print(f" 点击率: {total_info.get('click_rate', '0')}%")
api_result = {
'endpoint': '/author/eco/statistics/appStatisticV3',
'name': '发文统计',
'date_range': f"{start_day} - {end_day}",
'data': data,
'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
successful_data.append(api_result)
break # 成功,跳出重试循环
else:
self.logger.error(f"API返回错误: errno={errno}, errmsg={errmsg}")
print(f" [X] API返回错误: errno={errno}, errmsg={errmsg}")
# 特别处理 errno=10000015 (异常请求),这通常是代理未生效
if errno == 10000015 and self.use_proxy:
self.logger.warning("检测到 errno=10000015异常请求代理未生效立即强制更换新代理")
print(f" [!] 检测到代理未生效,立即更换新代理")
# 检查是否超过代理更换上限
if proxy_change_count >= max_proxy_changes:
print(f" [X] 已达代理更换上限({max_proxy_changes}次),放弃重试")
break
# 立即强制获取新代理不等待3次
self.current_proxy = None
self.proxy_fail_count = 0
new_proxy = self.fetch_proxy(force_new=True)
if new_proxy:
# 如果还没达到重试上限,尝试重试
if retry_count < max_retries:
proxy_change_count += 1
self.logger.info(f"已更换新代理({proxy_change_count}/{max_proxy_changes}),将重试,当前第{retry_count+1}")
print(f" [!] 已更换新代理({proxy_change_count}/{max_proxy_changes}),将重试...")
retry_count += 1
continue
else:
self.logger.error("无法获取新代理,放弃重试")
print(f" [X] 无法获取新代理")
break # API错误不重试
except json.JSONDecodeError as e:
self.logger.error(f"JSON解析失败: {e}")
print(f" [X] JSON解析失败: {e}")
print(f" 响应内容: {response.text[:500]}")
break # JSON错误不重试
else:
self.logger.error(f"HTTP错误: {response.status_code}")
print(f" [X] HTTP错误: {response.status_code}")
break # HTTP错误不重试
except Exception as e:
error_type = type(e).__name__
# 判断是否是代理相关错误
is_proxy_error = any([
'Connection' in error_type,
'RemoteDisconnected' in error_type,
'ProxyError' in error_type,
'Timeout' in error_type,
'ConnectionError' in str(e),
'Connection aborted' in str(e),
'Remote end closed' in str(e),
'Tunnel connection failed' in str(e),
])
if is_proxy_error:
if retry_count < max_retries:
self.logger.warning(f"发文统计API代理连接错误: {error_type},将重试")
print(f" [!] 代理连接错误: {error_type}")
# 标记代理失败
self.mark_proxy_failed()
# 超时或连接错误立即更换代理不等待3次失败
if self.use_proxy and ('Timeout' in error_type or 'Connection' in error_type or 'ProxyError' in error_type):
# 检查是否超过代理更换上限
if proxy_change_count >= max_proxy_changes:
self.logger.error(f"已达代理更换上限({max_proxy_changes}次),放弃重试")
print(f" [X] 已达代理更换上限({max_proxy_changes}次),放弃重试")
break
self.logger.warning(f"检测到{error_type}错误,立即更换新代理")
print(f" [!] 检测到{error_type},立即更换新代理")
self.current_proxy = None
self.proxy_fail_count = 0
new_proxy = self.fetch_proxy(force_new=True)
if new_proxy:
proxy_change_count += 1
self.logger.info(f"已更换新代理({proxy_change_count}/{max_proxy_changes}),继续重试")
print(f" [✓] 已更换新代理({proxy_change_count}/{max_proxy_changes}),继续重试")
# 更换代理后不增加retry_count直接continue重试
continue
else:
self.logger.error("无法获取新代理,放弃重试")
print(f" [X] 无法获取新代理")
break
# 其他代理错误等待3次失败后更换
elif self.proxy_fail_count >= 3 and self.use_proxy:
# 检查是否超过代理更换上限
if proxy_change_count >= max_proxy_changes:
self.logger.error(f"已达代理更换上限({max_proxy_changes}次),放弃重试")
print(f" [X] 已达代理更换上限({max_proxy_changes}次),放弃重试")
break
print(f" [!] 代理已失败{self.proxy_fail_count}次,强制更换新代理")
self.current_proxy = None
self.proxy_fail_count = 0
new_proxy = self.fetch_proxy(force_new=True)
if new_proxy:
proxy_change_count += 1
self.logger.info(f"已更换新代理({proxy_change_count}/{max_proxy_changes}),继续重试")
print(f" [✓] 已更换新代理({proxy_change_count}/{max_proxy_changes}),继续重试")
# 更换代理后不增加retry_count直接continue重试
continue
else:
self.logger.error("无法获取新代理")
print(f" [X] 无法获取新代理")
break
# 其他情况才增加retry_count
retry_count += 1
continue
else:
self.logger.error(f"发文统计API请求失败已达最大重试次数: {error_type}")
print(f" [X] 请求失败,已达最大重试次数")
break
else:
self.logger.error(f"请求异常: {e}", exc_info=True)
print(f" [X] 请求异常: {e}")
break
# 返回成功的API数据
if successful_data:
return {
'apis': successful_data,
'count': len(successful_data)
}
return None
def parse_analytics_data(self, html: str) -> Dict:
"""解析页面中的数据分析指标
Args:
html: 页面HTML内容
Returns:
Dict: 提取的数据指标
"""
import re
analytics_data = {
'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'metrics': {}
}
# 尝试提取页面中的数据
# 百家号通常会在页面中嵌入JSON数据或通过异步加载
# 方法1: 查找JSON数据块
json_pattern = r'<script[^>]*>\s*(?:var|window\.)?\s*(\w+)\s*=\s*(\{[^<]+\})\s*;?\s*</script>'
matches = re.finditer(json_pattern, html, re.DOTALL)
for match in matches:
var_name = match.group(1)
json_str = match.group(2)
try:
data = json.loads(json_str)
if isinstance(data, dict) and len(data) > 0:
analytics_data['metrics'][var_name] = data
print(f"[发现] 数据块: {var_name}")
except:
pass
# 方法2: 查找数值模式
patterns = {
'单日发文量': [
r'(?:单日发文|今日发文|日发文).*?(\d+)',
r'todayPublish.*?(\d+)',
],
'累计发文量': [
r'(?:累计发文|总发文|发文总数).*?(\d+)',
r'totalPublish.*?(\d+)',
],
'当月收益': [
r'(?:当月收益|本月收益|月收益).*?([\d.]+)',
r'monthIncome.*?([\d.]+)',
],
'当周收益': [
r'(?:当周收益|本周收益|周收益).*?([\d.]+)',
r'weekIncome.*?([\d.]+)',
],
'收益月环比': [
r'(?:月环比|月同比).*?([\d.]+)%',
r'monthRate.*?([\d.]+)',
],
'周环比': [
r'(?:周环比|周同比).*?([\d.]+)%',
r'weekRate.*?([\d.]+)',
],
}
for metric_name, pattern_list in patterns.items():
for pattern in pattern_list:
match = re.search(pattern, html, re.IGNORECASE)
if match:
value = match.group(1)
analytics_data['metrics'][metric_name] = value
print(f"[提取] {metric_name}: {value}")
break
return analytics_data
def extract_account_analytics(self, account_id: str, days: int = 7) -> Optional[Dict]:
"""提取指定账号的数据分析指标
Args:
account_id: 账号ID
days: 查询天数默认7天
Returns:
Optional[Dict]: 数据分析结果
"""
print(f"\n{'='*70}")
print(f"开始提取账号数据: {account_id}")
print(f"{'='*70}")
# 设置Cookie
if not self.set_account_cookies(account_id):
return None
result = {
'account_id': account_id,
'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'status': 'unknown',
'data': {}
}
# 尝试方法1: 通过API获取
print("\n[方法1] 尝试通过API接口获取数据...")
api_data = self.fetch_analytics_api(days=days)
if api_data:
result['data']['api_data'] = api_data
result['status'] = 'success_api'
print("[OK] 通过API成功获取数据")
# 尝试方法2: 解析页面
print("\n[方法2] 尝试解析页面HTML获取数据...")
html = self.fetch_analytics_page()
if html:
parsed_data = self.parse_analytics_data(html)
result['data']['parsed_data'] = parsed_data
if result['status'] == 'unknown':
result['status'] = 'success_html'
print("[OK] 页面数据解析完成")
if result['status'] == 'unknown':
result['status'] = 'failed'
print("[X] 所有方法均未获取到数据")
return result
def fetch_income_data_v2(self, max_retries: int = 3) -> Optional[Dict]:
"""获取收入数据使用v2 API一次返回多个时间段
Returns:
收入数据字典包含昨日、近7天、近30天、本月等多个时间段
失败返回None
"""
# API端点使用v2版本
api_url = f"{self.base_url}/author/eco/income4/overviewhometabv2"
# 从Cookie中提取tokenJWT
token_cookie = self.session.cookies.get('bjhStoken') or self.session.cookies.get('devStoken')
# 简洁的请求头
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': f'{self.base_url}/builder/rc/incomecenter',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
}
# 添加token头
if token_cookie:
headers['token'] = token_cookie
self.logger.debug(f"使用token: {token_cookie[:50]}...")
else:
self.logger.warning("未找到token收入数据请求可能失败")
self.logger.info("获取收入数据v2多时段API")
print(f"\n[请求] 获取收入数据v2多时段API")
print(f" API: {api_url}")
retry_count = 0
proxy_change_count = 0 # 代理更换次数计数器
max_proxy_changes = 3 # 最多更换3次代理即最多使用4个不同代理
while retry_count <= max_retries:
try:
# 如果是重试,先等待一段时间
if retry_count > 0:
wait_time = retry_count * 2 # 递增等待时间2秒、4秒、6秒
self.logger.info(f"收入数据API 第{retry_count}次重试,等待{wait_time}")
print(f" [重试 {retry_count}/{max_retries}] 等待{wait_time}秒...")
time.sleep(wait_time)
# 获取代理(如果启用)
proxies = self.fetch_proxy() if self.use_proxy else None
# 调试信息:显示代理使用情况
if self.use_proxy:
if proxies:
proxy_url = proxies.get('http', '')
if '@' in proxy_url:
# 提取IP部分隐藏账号密码
proxy_ip = proxy_url.split('@')[1]
else:
proxy_ip = proxy_url.replace('http://', '').replace('https://', '')
self.logger.info(f"收入API 使用代理: {proxy_ip}")
print(f" [代理] 使用IP: {proxy_ip}")
else:
self.logger.warning(f"收入API 代理未生效use_proxy={self.use_proxy}")
print(f" [!] 警告代理未生效use_proxy={self.use_proxy}")
response = self.session.get(
api_url,
headers=headers,
proxies=proxies,
timeout=15,
verify=False
)
self.logger.info(f"收入API响应状态码: {response.status_code}")
print(f" 状态码: {response.status_code}")
if response.status_code == 200:
try:
data = response.json()
errno = data.get('errno', -1)
errmsg = data.get('errmsg', '')
if errno == 0:
self.logger.info("收入数据API调用成功")
print(f" [✓] API调用成功")
# 请求成功,重置代理失败计数
self.reset_proxy_fail_count()
# 显示收入数据摘要
income_data = data.get('data', {}).get('income', {})
if 'recent7Days' in income_data:
recent7 = income_data['recent7Days']
value7 = recent7.get('value', '0.00')
self.logger.info(f"近7天收入: ¥{value7}")
print(f" 近7天: ¥{value7}")
if 'recent30Days' in income_data:
recent30 = income_data['recent30Days']
value30 = recent30.get('value', '0.00')
self.logger.info(f"近30天收入: ¥{value30}")
print(f" 近30天: ¥{value30}")
return data
else:
self.logger.error(f"收入API返回错误: errno={errno}, errmsg={errmsg}")
print(f" [X] API返回错误: errno={errno}, errmsg={errmsg}")
# 特别处理 errno=10000015 (异常请求),这通常是代理未生效
if errno == 10000015 and self.use_proxy:
self.logger.warning("检测到收入API errno=10000015异常请求代理未生效立即强制更换新代理")
print(f" [!] 检测到代理未生效,立即更换新代理")
# 检查是否超过代理更换上限
if proxy_change_count >= max_proxy_changes:
print(f" [X] 已达代理更换上限({max_proxy_changes}次),放弃重试")
return None
# 立即强制获取新代理不等待3次
self.current_proxy = None
self.proxy_fail_count = 0
new_proxy = self.fetch_proxy(force_new=True)
if new_proxy:
# 如果还没达到重试上限,尝试重试
if retry_count < max_retries:
proxy_change_count += 1
self.logger.info(f"已更换新代理({proxy_change_count}/{max_proxy_changes})将重试收入API当前第{retry_count+1}")
print(f" [!] 已更换新代理({proxy_change_count}/{max_proxy_changes}),将重试...")
retry_count += 1
continue
else:
self.logger.error("无法获取新代理,放弃重试")
print(f" [X] 无法获取新代理")
return None
except json.JSONDecodeError as e:
self.logger.error(f"收入数据JSON解析失败: {e}")
print(f" [X] JSON解析失败: {e}")
return None
else:
self.logger.error(f"收入API HTTP错误: {response.status_code}")
print(f" [X] HTTP错误: {response.status_code}")
return None
except Exception as e:
error_type = type(e).__name__
# 判断是否是代理相关错误
is_proxy_error = any([
'Connection' in error_type,
'RemoteDisconnected' in error_type,
'ProxyError' in error_type,
'Timeout' in error_type,
'ConnectionError' in str(e),
'Connection aborted' in str(e),
'Remote end closed' in str(e),
'Tunnel connection failed' in str(e),
])
if is_proxy_error:
if retry_count < max_retries:
self.logger.warning(f"收入数据API代理连接错误: {error_type},将重试")
print(f" [!] 代理连接错误: {error_type}")
# 标记代理失败
self.mark_proxy_failed()
# 超时或连接错误立即更换代理不等待3次失败
if self.use_proxy and ('Timeout' in error_type or 'Connection' in error_type or 'ProxyError' in error_type):
# 检查是否超过代理更换上限
if proxy_change_count >= max_proxy_changes:
self.logger.error(f"已达代理更换上限({max_proxy_changes}次),放弃重试")
print(f" [X] 已达代理更换上限({max_proxy_changes}次),放弃重试")
return None
self.logger.warning(f"检测到{error_type}错误,立即更换新代理")
print(f" [!] 检测到{error_type},立即更换新代理")
self.current_proxy = None
self.proxy_fail_count = 0
new_proxy = self.fetch_proxy(force_new=True)
if new_proxy:
proxy_change_count += 1
self.logger.info(f"已更换新代理({proxy_change_count}/{max_proxy_changes}),继续重试")
print(f" [✓] 已更换新代理({proxy_change_count}/{max_proxy_changes}),继续重试")
# 更换代理后不增加retry_count直接continue重试
continue
else:
self.logger.error("无法获取新代理,放弃重试")
print(f" [X] 无法获取新代理")
return None
# 其他代理错误等待3次失败后更换
elif self.proxy_fail_count >= 3 and self.use_proxy:
# 检查是否超过代理更换上限
if proxy_change_count >= max_proxy_changes:
self.logger.error(f"已达代理更换上限({max_proxy_changes}次),放弃重试")
print(f" [X] 已达代理更换上限({max_proxy_changes}次),放弃重试")
return None
print(f" [!] 代理已失败{self.proxy_fail_count}次,强制更换新代理")
self.current_proxy = None
self.proxy_fail_count = 0
new_proxy = self.fetch_proxy(force_new=True)
if new_proxy:
proxy_change_count += 1
self.logger.info(f"已更换新代理({proxy_change_count}/{max_proxy_changes}),继续重试")
print(f" [✓] 已更换新代理({proxy_change_count}/{max_proxy_changes}),继续重试")
# 更换代理后不增加retry_count直接continue重试
continue
else:
self.logger.error("无法获取新代理")
print(f" [X] 无法获取新代理")
return None
# 其他情况才增加retry_count
retry_count += 1
continue
else:
self.logger.error(f"收入数据API请求失败已达最大重试次数: {error_type}")
print(f" [X] 请求失败,已达最大重试次数")
return None
else:
self.logger.error(f"收入数据请求异常: {e}", exc_info=True)
print(f" [X] 请求异常: {e}")
return None
return None
def fetch_daily_income(self, target_date: datetime, max_retries: int = 3) -> Optional[Dict]:
"""获取指定日期的收入数据单日收入API
Args:
target_date: 目标日期
max_retries: 最大重试次数
Returns:
单日收入数据字典失败返回None
"""
# API端点单日收入查询
api_url = f"{self.base_url}/author/eco/income4/overviewhometab"
# 计算日期的Unix时间戳当天0点
date_timestamp = int(target_date.replace(hour=0, minute=0, second=0, microsecond=0).timestamp())
# 请求参数
params = {
'start_date': date_timestamp,
'end_date': date_timestamp
}
# 从Cookie中提取tokenJWT
token_cookie = self.session.cookies.get('bjhStoken') or self.session.cookies.get('devStoken')
# 请求头
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': f'{self.base_url}/builder/rc/incomecenter',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
}
# 添加token头
if token_cookie:
headers['token'] = token_cookie
else:
self.logger.warning("未找到token单日收入请求可能失败")
# 重试机制
retry_count = 0
last_error = None
while retry_count <= max_retries:
try:
# 如果是重试,先等待一段时间
if retry_count > 0:
wait_time = retry_count * 2 # 递增等待时间2秒、4秒、6秒
self.logger.info(f"单日收入 {target_date.strftime('%Y-%m-%d')}{retry_count}次重试,等待{wait_time}")
time.sleep(wait_time)
# 获取代理(如果启用)
proxies = self.fetch_proxy() if self.use_proxy else None
response = self.session.get(
api_url,
headers=headers,
params=params,
proxies=proxies,
timeout=15,
verify=False
)
if response.status_code == 200:
try:
data = response.json()
errno = data.get('errno', -1)
if errno == 0:
# 请求成功,重置代理失败计数
self.reset_proxy_fail_count()
return data
else:
self.logger.error(f"单日收入API返回错误: errno={errno}")
return None
except json.JSONDecodeError as e:
self.logger.error(f"单日收入JSON解析失败: {e}")
return None
else:
self.logger.error(f"单日收入HTTP错误: {response.status_code}")
return None
except Exception as e:
last_error = str(e)
error_type = type(e).__name__
# 判断是否是代理相关错误
is_proxy_error = any([
'Connection' in error_type,
'RemoteDisconnected' in error_type,
'ProxyError' in error_type,
'Timeout' in error_type,
'ConnectionError' in str(e),
'Connection aborted' in str(e),
'Remote end closed' in str(e),
])
if is_proxy_error:
if retry_count < max_retries:
self.logger.warning(f"单日收入代理连接错误 ({target_date.strftime('%Y-%m-%d')}): {error_type},将重试")
# 标记代理失败
self.mark_proxy_failed()
retry_count += 1
continue
else:
self.logger.error(f"单日收入请求失败 ({target_date.strftime('%Y-%m-%d')}),已达最大重试次数: {error_type} - {last_error}")
return None
else:
self.logger.error(f"单日收入请求异常 ({target_date.strftime('%Y-%m-%d')}): {e}", exc_info=True)
return None
return None
def extract_integrated_data(self, account_id: str, days: int = 7) -> Optional[Dict]:
"""提取指定账号的整合数据(发文统计 + 收入数据)
Args:
account_id: 账号ID
days: 查询天数默认7天
Returns:
Optional[Dict]: 整合数据结果
"""
print(f"\n{'='*70}")
print(f"开始提取账号数据: {account_id}")
print(f"{'='*70}")
# 设置Cookie
if not self.set_account_cookies(account_id):
return None
result = {
'account_id': account_id,
'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'status': 'unknown',
'analytics': {},
'income': {},
'error_info': {}
}
# 1. 获取发文统计数据
print("\n[1/2] 获取发文统计数据...")
api_data = self.fetch_analytics_api(days=days)
if api_data:
result['analytics'] = api_data
print("[OK] 发文统计数据获取成功")
else:
print("[X] 发文统计数据获取失败")
result['error_info']['analytics'] = 'API调用失败或被限流'
# API调用之间添加随机延迟2-4秒模拟人工操作
import random
api_delay = random.uniform(2, 4)
print(f"\n[间隔] 等待 {api_delay:.1f} 秒...")
time.sleep(api_delay)
# 2. 获取收入数据当日收益存入day_revenue当周收益从数据库汇总计算
print("\n[2/2] 获取收入数据...")
income_data = self.fetch_income_data_v2()
if income_data:
result['income'] = income_data
print("[OK] 收入数据获取成功")
else:
print("[X] 收入数据获取失败")
result['error_info']['income'] = 'API调用失败或被限流'
# 设置状态
if result['analytics'] and result['income']:
result['status'] = 'success_all'
elif result['analytics'] or result['income']:
result['status'] = 'success_partial'
elif result['error_info']:
# 判断是否为限流
result['status'] = 'rate_limited'
else:
result['status'] = 'failed'
return result
def extract_all_integrated_data(self, days: int = 7, delay_seconds: float = 3.0, stop_on_rate_limit: bool = False) -> List[Dict]:
"""提取所有账号的整合数据
Args:
days: 查询天数默认7天
delay_seconds: 每个账号之间的延迟时间(秒)
stop_on_rate_limit: 遇到连续限流时是否停止默认False
Returns:
List[Dict]: 所有账号的整合数据结果
"""
if not self.account_cookies:
print("[X] 没有可用的账号Cookie")
return []
print("\n" + "="*70)
print(f"开始提取 {len(self.account_cookies)} 个账号的整合数据(发文统计 + 收入)")
print("="*70)
import random
results = []
rate_limited_count = 0
for idx, account_id in enumerate(self.account_cookies.keys(), 1):
print(f"\n[{idx}/{len(self.account_cookies)}] 处理账号: {account_id}")
result = self.extract_integrated_data(account_id, days=days)
if result:
results.append(result)
# 检测限流情况
if result.get('status') == 'rate_limited':
rate_limited_count += 1
self.logger.warning(f"账号 {account_id} 被限流,连续限流次数: {rate_limited_count}")
# 限流后增加额外等待时间,避免连续触发
extra_delay = random.uniform(15, 25) # 额外等待15-25秒
print(f"\n[!] 检测到限流,额外等待 {extra_delay:.1f}")
time.sleep(extra_delay)
# 如果启用了限流停止且连续3个账号被限流
if stop_on_rate_limit and rate_limited_count >= 3:
print("\n" + "="*70)
print("[!] 检测到连续限流,停止本次更新")
print("[!] 建议稍后再试")
print("="*70)
break
else:
# 重置限流计数
rate_limited_count = 0
# 添加延迟
if idx < len(self.account_cookies):
actual_delay = delay_seconds * random.uniform(0.7, 1.3)
print(f"\n[延迟] 等待 {actual_delay:.1f} 秒后继续...")
time.sleep(actual_delay)
return results
def extract_all_accounts(self, days: int = 7) -> List[Dict]:
"""提取所有账号的数据分析指标
Args:
days: 查询天数默认7天
Returns:
List[Dict]: 所有账号的数据分析结果
"""
if not self.account_cookies:
print("[X] 没有可用的账号Cookie")
return []
print("\n" + "="*70)
print(f"开始提取 {len(self.account_cookies)} 个账号的数据分析")
print("="*70)
results = []
for idx, account_id in enumerate(self.account_cookies.keys(), 1):
print(f"\n[{idx}/{len(self.account_cookies)}] 处理账号: {account_id}")
result = self.extract_account_analytics(account_id, days=days)
if result:
results.append(result)
# 避免请求过快添加随机延迟2-5秒
if idx < len(self.account_cookies):
import random
delay = random.uniform(2, 5)
print(f"\n[延迟] 等待 {delay:.1f} 秒后继续...")
time.sleep(delay)
return results
def save_results(self, results: List[Dict]):
"""保存结果到文件(同时备份带日期的副本)
Args:
results: 数据分析结果列表
"""
import shutil
try:
# 1. 保存到主文件(不带时间戳)
with open(self.output_file, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"\n{'='*70}")
print(f"[OK] 数据已保存到: {self.output_file}")
# 2. 创建带日期的备份文件(只保留日期)
timestamp = datetime.now().strftime('%Y%m%d')
backup_filename = f"bjh_integrated_data_{timestamp}.json"
backup_file = os.path.join(self.backup_dir, backup_filename)
# 复制文件到备份目录
shutil.copy2(self.output_file, backup_file)
print(f"[OK] 备份已保存到: {backup_file}")
print(f"{'='*70}")
# 显示统计
success_count = sum(1 for r in results if r.get('status', '').startswith('success'))
print(f"\n统计信息:")
print(f" - 总账号数: {len(results)}")
print(f" - 成功获取: {success_count}")
print(f" - 失败: {len(results) - success_count}")
except Exception as e:
print(f"[X] 保存文件失败: {e}")
def display_results_summary(self, results: List[Dict]):
"""显示结果摘要
Args:
results: 数据分析结果列表
"""
print(f"\n{'='*70}")
print("数据提取摘要")
print(f"{'='*70}")
for result in results:
account_id = result.get('account_id', 'unknown')
status = result.get('status', 'unknown')
print(f"\n账号: {account_id}")
print(f"状态: {status}")
data = result.get('data', {})
# 显示API数据
if 'api_data' in data:
api_data = data['api_data']
# 检查是否是新的多个API结果
if 'apis' in api_data:
apis_list = api_data['apis']
print(f"\n成功获取 {len(apis_list)} 个API端点数据:")
for api_item in apis_list:
endpoint = api_item.get('endpoint', 'unknown')
api_resp_data = api_item.get('data', {})
print(f"\n API: {endpoint}")
# 尝试提取数据
self._print_api_metrics(api_resp_data)
else:
# 旧格式
endpoint = api_data.get('endpoint', 'unknown')
print(f"API端点: {endpoint}")
print(f"获取时间: {api_data.get('fetch_time', 'unknown')}")
# 显示解析的指标
if 'parsed_data' in data:
parsed = data['parsed_data']
metrics = parsed.get('metrics', {})
if metrics:
print("\n从页面提取的指标:")
for key, value in metrics.items():
print(f" - {key}: {value}")
print("-" * 70)
def _print_api_metrics(self, api_data: Dict):
"""打印API返回的指标数据
Args:
api_data: API响应数据
"""
# 查找常见指标
metric_keys = [
'publish_count', # 发文量
'today_publish', # 今日发文
'total_publish', # 累计发文
'month_income', # 月收益
'week_income', # 周收益
'month_rate', # 月环比
'week_rate', # 周环比
'income', # 收益
'count', # 计数
]
# 检查data字段
if 'data' in api_data:
inner_data = api_data['data']
if isinstance(inner_data, dict):
print(f" 数据字段: {list(inner_data.keys())}")
for key in metric_keys:
if key in inner_data:
print(f" - {key}: {inner_data[key]}")
# 检查result字段
if 'result' in api_data:
result_data = api_data['result']
if isinstance(result_data, dict):
print(f" 结果字段: {list(result_data.keys())}")
for key in metric_keys:
if key in result_data:
print(f" - {key}: {result_data[key]}")
def main():
"""主函数"""
print("\n" + "="*70)
print("百家号数据整合抓取工具(发文统计 + 收入数据)")
print("="*70)
# 选择数据源
print("\n请选择Cookie数据源:")
print(" 1. 本地JSON文件 (captured_account_cookies.json)")
print(" 2. MySQL数据库 (ai_authors表)")
source_input = input("\n请选择 (1/2, 默认1): ").strip() or '1'
load_from_db = source_input == '2'
# 是否启用代理
proxy_input = input("\n是否启用代理?(y/n默认n): ").strip().lower()
use_proxy = proxy_input == 'y'
# 创建分析器
if load_from_db:
print("\n[配置] 使用数据库加载模式")
analytics = BaijiahaoAnalytics(use_proxy=use_proxy, load_from_db=True)
else:
print("\n[配置] 使用本地文件加载模式")
analytics = BaijiahaoAnalytics(use_proxy=use_proxy)
if not analytics.account_cookies:
print("\n[!] 未找到可用的账号Cookie")
if not load_from_db:
print(" 请先运行一键捕获Cookie工具")
return
# 显示可用账号
print(f"\n找到 {len(analytics.account_cookies)} 个账号:")
for idx, (account_id, data) in enumerate(analytics.account_cookies.items(), 1):
domain = data.get('domain', 'unknown')
capture_time = data.get('first_captured', 'unknown')
print(f" {idx}. {account_id} (域名: {domain}, 捕获: {capture_time})")
# 询问用户选择
print("\n" + "="*70)
print("请选择操作:")
print(" 1. 提取所有账号的整合数据(发文统计 + 收入)")
print(" 2. 仅提取发文统计数据")
print(" 3. 提取指定账号数据")
print(" 0. 退出")
print("="*70)
choice = input("\n请选择 (0-3): ").strip()
if choice == '0':
print("\n退出程序")
return
elif choice == '1':
# 提取所有账号的整合数据
days_input = input("\n请输入查询天数 (默认7天): ").strip()
days = int(days_input) if days_input.isdigit() else 7
print(f"\n开始获取所有账号的整合数据 (最近{days}天)...\n")
results = analytics.extract_all_integrated_data(days=days)
if results:
analytics.save_results(results)
# 显示简单统计
success_all = sum(1 for r in results if r.get('status') == 'success_all')
success_partial = sum(1 for r in results if r.get('status') == 'success_partial')
failed = sum(1 for r in results if r.get('status') == 'failed')
print(f"\n{'='*70}")
print("数据提取统计")
print(f"{'='*70}")
print(f" 总账号数: {len(results)}")
print(f" 全部成功: {success_all} (发文+收入)")
print(f" 部分成功: {success_partial}")
print(f" 失败: {failed}")
print(f"{'='*70}")
elif choice == '2':
# 仅提取发文统计数据
days_input = input("\n请输入查询天数 (默认7天): ").strip()
days = int(days_input) if days_input.isdigit() else 7
print(f"\n开始获取所有账号的发文统计数据 (最近{days}天)...\n")
results = analytics.extract_all_accounts(days=days)
if results:
# 保存到旧文件
with open(analytics.analytics_output, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"\n[OK] 数据已保存到: {analytics.analytics_output}")
analytics.display_results_summary(results)
elif choice == '3':
# 提取指定账号
account_list = list(analytics.account_cookies.keys())
print("\n可用账号:")
for idx, account_id in enumerate(account_list, 1):
print(f" {idx}. {account_id}")
try:
idx = int(input("\n请输入账号序号: ").strip())
if 1 <= idx <= len(account_list):
account_id = account_list[idx - 1]
days_input = input("\n请输入查询天数 (默认7天): ").strip()
days = int(days_input) if days_input.isdigit() else 7
# 选择整合数据还是只要发文统计
data_type = input("\n1-整合数据 2-仅发文统计 (默认1): ").strip()
if data_type == '2':
result = analytics.extract_account_analytics(account_id, days=days)
else:
result = analytics.extract_integrated_data(account_id, days=days)
if result:
results = [result]
analytics.save_results(results)
else:
print("[X] 无效的序号")
except ValueError:
print("[X] 请输入有效的数字")
else:
print("[X] 无效的选择")
print("\n" + "="*70)
print("程序执行完成")
print("="*70 + "\n")
if __name__ == '__main__':
main()