Files
baijiahao_data_crawl/bjh_analytics_date.py

810 lines
33 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
百家号指定日期数据抓取工具
根据指定日期范围抓取发文统计和收入数据
"""
import json
import sys
import os
import argparse
from datetime import datetime, timedelta
from typing import Dict, List, Optional
# 导入基础分析器
from bjh_analytics import BaijiahaoAnalytics
# 设置标准输出编码为UTF-8
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
class BaijiahaoDateAnalytics(BaijiahaoAnalytics):
"""百家号指定日期数据抓取器"""
def __init__(self, target_date: str, use_proxy: bool = False, load_from_db: bool = False, db_config: Optional[Dict] = None):
"""初始化
Args:
target_date: 目标日期 (YYYY-MM-DD)
use_proxy: 是否使用代理
load_from_db: 是否从数据库加载Cookie
db_config: 数据库配置
"""
super().__init__(use_proxy=use_proxy, load_from_db=load_from_db, db_config=db_config)
# 解析目标日期
try:
self.target_date = datetime.strptime(target_date, '%Y-%m-%d')
self.target_date_str = target_date
except ValueError:
raise ValueError(f"日期格式错误: {target_date},正确格式: YYYY-MM-DD")
# 修改输出文件名(不带日期,使用固定文件名)
self.output_file = os.path.join(
self.script_dir,
"bjh_integrated_data.json"
)
# 创建备份文件夹
self.backup_dir = os.path.join(self.script_dir, "backup")
if not os.path.exists(self.backup_dir):
os.makedirs(self.backup_dir)
print(f"[配置] 目标日期: {target_date}")
print(f"[配置] 输出文件: {self.output_file}")
print(f"[配置] 备份目录: {self.backup_dir}")
def fetch_analytics_api_for_date(self, days: int = 7, max_retries: int = 3) -> Optional[Dict]:
"""获取指定日期范围的发文统计数据
Args:
days: 查询天数从target_date往前推
max_retries: 最大重试次数
Returns:
发文统计数据
"""
import time
# 计算日期范围从target_date往前推days天
end_date = self.target_date
start_date = end_date - timedelta(days=days-1)
start_day = start_date.strftime('%Y%m%d')
end_day = end_date.strftime('%Y%m%d')
# API端点
api_url = f"{self.base_url}/author/eco/statistics/appStatisticV3"
# 请求参数不使用special_filter_days直接指定日期范围
params = {
'type': 'event',
'start_day': start_day,
'end_day': end_day,
'stat': '0'
}
# 从Cookie中提取token
token_cookie = self.session.cookies.get('bjhStoken') or self.session.cookies.get('devStoken')
# 请求头
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': f'{self.base_url}/builder/rc/analysiscontent',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
}
if token_cookie:
headers['token'] = token_cookie
self.logger.info(f"获取发文统计: {start_date.strftime('%Y-%m-%d')}{end_date.strftime('%Y-%m-%d')}")
print(f"\n[请求] 获取发文统计数据")
print(f" 日期范围: {start_date.strftime('%Y-%m-%d')}{end_date.strftime('%Y-%m-%d')}")
successful_data = []
retry_count = 0
proxy_change_count = 0 # 代理更换次数计数器
max_proxy_changes = 3 # 最多更换3次代理即最多使用4个不同代理
while retry_count <= max_retries:
try:
if retry_count > 0:
wait_time = retry_count * 2
print(f" [重试 {retry_count}/{max_retries}] 等待 {wait_time} 秒...")
time.sleep(wait_time)
proxies = self.fetch_proxy() if self.use_proxy else None
# 调试信息:显示代理使用情况
if self.use_proxy:
if proxies:
proxy_url = proxies.get('http', '')
if '@' in proxy_url:
proxy_ip = proxy_url.split('@')[1]
else:
proxy_ip = proxy_url.replace('http://', '').replace('https://', '')
print(f" [代理] 使用IP: {proxy_ip}")
else:
print(f" [!] 警告代理未生效use_proxy={self.use_proxy}")
response = self.session.get(
api_url,
headers=headers,
params=params,
proxies=proxies,
timeout=15,
verify=False
)
print(f" 状态码: {response.status_code}")
if response.status_code == 200:
data = response.json()
errno = data.get('errno', -1)
if errno == 0:
print(f" [✓] API调用成功")
# 请求成功,重置代理失败计数
self.reset_proxy_fail_count()
# 检查data字段类型
data_field = data.get('data', {})
if isinstance(data_field, list):
print(f" [X] API返回数据格式异常: data字段为列表而非字典")
print(f" 原始响应前500字符: {str(data)[:500]}")
break
if not isinstance(data_field, dict):
print(f" [X] API返回数据格式异常: data字段类型为 {type(data_field).__name__}")
break
total_info = data_field.get('total_info', {})
print(f"\n 发文统计数据:")
print(f" 发文量: {total_info.get('publish_count', '0')}")
print(f" 曝光量: {total_info.get('disp_pv', '0')}")
print(f" 阅读量: {total_info.get('view_count', '0')}")
api_result = {
'endpoint': '/author/eco/statistics/appStatisticV3',
'name': '发文统计',
'date_range': f"{start_day} - {end_day}",
'data': data,
'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
successful_data.append(api_result)
break
else:
errmsg = data.get('errmsg', '')
print(f" [X] API返回错误: errno={errno}, errmsg={errmsg}")
# 特别处理 errno=10000015异常请求这通常是代理未生效
if errno == 10000015 and self.use_proxy:
print(f" [!] 检测到代理未生效,立即更换新代理")
# 检查是否超过代理更换上限
if proxy_change_count >= max_proxy_changes:
print(f" [X] 已达代理更换上限({max_proxy_changes}次),放弃重试")
break
# 立即强制获取新代理
self.current_proxy = None
self.proxy_fail_count = 0
new_proxy = self.fetch_proxy(force_new=True)
if new_proxy and retry_count < max_retries:
proxy_change_count += 1
print(f" [!] 已更换新代理({proxy_change_count}/{max_proxy_changes}),将重试...")
retry_count += 1
continue
else:
print(f" [X] 无法获取新代理或已达重试上限")
break
else:
# 其他API错误不重试
break
else:
print(f" [X] HTTP错误: {response.status_code}")
break
except Exception as e:
error_type = type(e).__name__
is_retry_error = any([
'Connection' in error_type,
'Timeout' in error_type,
'ProxyError' in error_type,
])
if is_retry_error and retry_count < max_retries:
print(f" [!] 连接错误: {error_type}")
# 标记代理失败
self.mark_proxy_failed()
# 如果代理失败次数达到3次强制更换新代理第4次重试用新代理
if self.proxy_fail_count >= 3 and self.use_proxy:
# 检查是否超过代理更换上限
if proxy_change_count >= max_proxy_changes:
print(f" [X] 已达代理更换上限({max_proxy_changes}次),放弃重试")
break
print(f" [!] 代理已失败{self.proxy_fail_count}次,强制更换新代理")
self.current_proxy = None
self.proxy_fail_count = 0
new_proxy = self.fetch_proxy(force_new=True)
if new_proxy:
proxy_change_count += 1
print(f" [✓] 已更换新代理({proxy_change_count}/{max_proxy_changes}),继续重试")
else:
print(f" [X] 无法获取新代理")
break
retry_count += 1
continue
else:
print(f" [X] 请求异常: {e}")
break
if successful_data:
return {
'apis': successful_data,
'count': len(successful_data)
}
return None
def fetch_income_for_date(self, max_retries: int = 3) -> Optional[Dict]:
"""获取指定日期的收入数据
使用overviewhomelist API获取按天的详细收入数据
Returns:
收入数据
"""
import time
from datetime import timedelta
# 计算Unix时间戳从目标日期往前30天以便获取更多数据
end_date = self.target_date
start_date = end_date - timedelta(days=29) # 30天范围
# 转换为Unix时间戳
start_timestamp = int(start_date.timestamp())
end_timestamp = int(end_date.timestamp())
# 使用overviewhomelist API获取每日收入明细
api_url = f"{self.base_url}/author/eco/income4/overviewhomelist"
token_cookie = self.session.cookies.get('bjhStoken') or self.session.cookies.get('devStoken')
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': f'{self.base_url}/builder/rc/incomecenter',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
}
if token_cookie:
headers['token'] = token_cookie
# 请求参数
params = {
'start_date': start_timestamp,
'end_date': end_timestamp
}
print(f"\n[请求] 获取收入数据")
print(f" 日期范围: {start_date.strftime('%Y-%m-%d')}{end_date.strftime('%Y-%m-%d')}")
retry_count = 0
proxy_change_count = 0 # 代理更换次数计数器
max_proxy_changes = 3 # 最多更换3次代理即最多使用4个不同代理
while retry_count <= max_retries:
try:
if retry_count > 0:
wait_time = retry_count * 2
print(f" [重试 {retry_count}/{max_retries}] 等待 {wait_time} 秒...")
time.sleep(wait_time)
proxies = self.fetch_proxy() if self.use_proxy else None
# 调试信息:显示代理使用情况
if self.use_proxy:
if proxies:
proxy_url = proxies.get('http', '')
if '@' in proxy_url:
proxy_ip = proxy_url.split('@')[1]
else:
proxy_ip = proxy_url.replace('http://', '').replace('https://', '')
print(f" [代理] 使用IP: {proxy_ip}")
else:
print(f" [!] 警告代理未生效use_proxy={self.use_proxy}")
response = self.session.get(
api_url,
headers=headers,
params=params,
proxies=proxies,
timeout=15,
verify=False
)
print(f" 状态码: {response.status_code}")
if response.status_code == 200:
data = response.json()
errno = data.get('errno', -1)
if errno == 0:
print(f" [✓] API调用成功")
# 请求成功,重置代理失败计数
self.reset_proxy_fail_count()
# 提取收入列表
income_list = data.get('data', {}).get('list', [])
if income_list:
# 找到目标日期的数据
target_timestamp = int(self.target_date.timestamp())
target_income_data = None
for item in income_list:
if item.get('day_time') == target_timestamp:
target_income_data = item
break
if target_income_data:
day_revenue = target_income_data.get('total_income', 0)
print(f"\n 收入数据详情:")
print(f" {self.target_date_str} 当日收入: ¥{day_revenue:.2f}")
# 计算近7天收入
recent7_revenue = 0.0
recent7_start = self.target_date - timedelta(days=6)
recent7_start_ts = int(recent7_start.timestamp())
for item in income_list:
if recent7_start_ts <= item.get('day_time', 0) <= target_timestamp:
recent7_revenue += item.get('total_income', 0)
print(f" 近7天: ¥{recent7_revenue:.2f}")
# 计算近30天收入
recent30_revenue = sum(item.get('total_income', 0) for item in income_list)
print(f" 近30天: ¥{recent30_revenue:.2f}")
# 计算当月收入(从月初到目标日期)
month_start = self.target_date.replace(day=1)
month_start_ts = int(month_start.timestamp())
current_month_revenue = 0.0
for item in income_list:
if month_start_ts <= item.get('day_time', 0) <= target_timestamp:
current_month_revenue += item.get('total_income', 0)
print(f" 当月收入: ¥{current_month_revenue:.2f}")
# 构造返回数据(与原有格式保持一致)
return {
'errno': 0,
'errmsg': 'success',
'data': {
'income': {
'yesterday': {
'income': day_revenue,
'value': day_revenue
},
'recent7Days': {
'income': recent7_revenue,
'value': recent7_revenue
},
'recent30Days': {
'income': recent30_revenue,
'value': recent30_revenue
},
'currentMonth': {
'income': current_month_revenue,
'value': current_month_revenue
}
}
},
'raw_list': income_list # 保留原始数据
}
else:
print(f" [警告] 未找到 {self.target_date_str} 的收入数据")
return None
else:
print(f" [警告] 收入数据列表为空")
return None
else:
errmsg = data.get('errmsg', '')
print(f" [X] API返回错误: errno={errno}, errmsg={errmsg}")
# 特别处理 errno=10000015异常请求这通常是代理未生效
if errno == 10000015 and self.use_proxy:
print(f" [!] 检测到代理未生效,立即更换新代理")
# 检查是否超过代理更换上限
if proxy_change_count >= max_proxy_changes:
print(f" [X] 已达代理更换上限({max_proxy_changes}次),放弃重试")
return None
# 立即强制获取新代理
self.current_proxy = None
self.proxy_fail_count = 0
new_proxy = self.fetch_proxy(force_new=True)
if new_proxy and retry_count < max_retries:
proxy_change_count += 1
print(f" [!] 已更换新代理({proxy_change_count}/{max_proxy_changes}),将重试...")
retry_count += 1
continue
else:
print(f" [X] 无法获取新代理或已达重试上限")
return None
else:
# 其他API错误不重试
return None
else:
print(f" [X] HTTP错误: {response.status_code}")
return None
except Exception as e:
error_type = type(e).__name__
is_retry_error = any([
'Connection' in error_type,
'Timeout' in error_type,
'ProxyError' in error_type,
])
if is_retry_error and retry_count < max_retries:
print(f" [!] 连接错误: {error_type}")
# 标记代理失败
self.mark_proxy_failed()
# 如果代理失败次数达到3次强制更换新代理第4次重试用新代理
if self.proxy_fail_count >= 3 and self.use_proxy:
# 检查是否超过代理更换上限
if proxy_change_count >= max_proxy_changes:
print(f" [X] 已达代理更换上限({max_proxy_changes}次),放弃重试")
return None
print(f" [!] 代理已失败{self.proxy_fail_count}次,强制更换新代理")
self.current_proxy = None
self.proxy_fail_count = 0
new_proxy = self.fetch_proxy(force_new=True)
if new_proxy:
proxy_change_count += 1
print(f" [✓] 已更换新代理({proxy_change_count}/{max_proxy_changes}),继续重试")
else:
print(f" [X] 无法获取新代理")
return None
retry_count += 1
continue
else:
print(f" [X] 请求异常: {e}")
return None
return None
def extract_integrated_data_for_date(self, account_id: str, days: int = 7) -> Optional[Dict]:
"""提取指定账号在指定日期的整合数据
Args:
account_id: 账号ID
days: 查询天数从target_date往前推
Returns:
整合数据
"""
import time
import random
print(f"\n{'='*70}")
print(f"开始提取账号数据: {account_id}")
print(f"目标日期: {self.target_date_str}")
print(f"{'='*70}")
if not self.set_account_cookies(account_id):
return None
result = {
'account_id': account_id,
'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'target_date': self.target_date_str,
'status': 'unknown',
'analytics': {},
'income': {},
'error_info': {}
}
# 1. 获取发文统计数据
print("\n[1/2] 获取发文统计数据...")
api_data = self.fetch_analytics_api_for_date(days=days)
if api_data:
result['analytics'] = api_data
print("[OK] 发文统计数据获取成功")
else:
print("[X] 发文统计数据获取失败")
result['error_info']['analytics'] = 'API调用失败'
# API调用间隔
api_delay = random.uniform(2, 4)
print(f"\n[间隔] 等待 {api_delay:.1f} 秒...")
time.sleep(api_delay)
# 2. 获取收入数据
print("\n[2/2] 获取收入数据...")
income_data = self.fetch_income_for_date()
if income_data:
result['income'] = income_data
print("[OK] 收入数据获取成功")
else:
print("[X] 收入数据获取失败")
result['error_info']['income'] = 'API调用失败'
# 设置状态
if result['analytics'] and result['income']:
result['status'] = 'success_all'
elif result['analytics'] or result['income']:
result['status'] = 'success_partial'
else:
result['status'] = 'failed'
return result
def extract_all_for_date(self, days: int = 7, delay_seconds: float = 3.0) -> List[Dict]:
"""提取所有账号在指定日期的数据
Args:
days: 查询天数
delay_seconds: 账号间延迟
Returns:
所有账号的数据
"""
import random
if not self.account_cookies:
print("[X] 没有可用的账号Cookie")
return []
print("\n" + "="*70)
print(f"开始提取 {len(self.account_cookies)} 个账号的数据")
print(f"目标日期: {self.target_date_str}")
print("="*70)
results = []
for idx, account_id in enumerate(self.account_cookies.keys(), 1):
print(f"\n[{idx}/{len(self.account_cookies)}] 处理账号: {account_id}")
result = self.extract_integrated_data_for_date(account_id, days=days)
if result:
results.append(result)
# 添加延迟
if idx < len(self.account_cookies):
actual_delay = delay_seconds * random.uniform(0.7, 1.3)
print(f"\n[延迟] 等待 {actual_delay:.1f} 秒后继续...")
import time
time.sleep(actual_delay)
return results
def save_results(self, results: List[Dict]):
"""保存结果到文件(同时备份带时间戳的副本)
Args:
results: 数据分析结果列表
"""
import json
import shutil
try:
# 1. 保存到主文件(不带时间戳)
with open(self.output_file, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"\n{'='*70}")
print(f"[OK] 数据已保存到: {self.output_file}")
# 2. 创建带时间戳的备份文件(只保留日期)
timestamp = datetime.now().strftime('%Y%m%d')
backup_filename = f"bjh_integrated_data_{timestamp}.json"
backup_file = os.path.join(self.backup_dir, backup_filename)
# 复制文件到备份目录
shutil.copy2(self.output_file, backup_file)
print(f"[OK] 备份已保存到: {backup_file}")
print(f"{'='*70}")
# 显示统计
success_count = sum(1 for r in results if r.get('status', '').startswith('success'))
print(f"\n统计信息:")
print(f" - 总账号数: {len(results)}")
print(f" - 成功获取: {success_count}")
print(f" - 失败: {len(results) - success_count}")
except Exception as e:
print(f"[X] 保存文件失败: {e}")
def main():
"""主函数"""
parser = argparse.ArgumentParser(
description='百家号指定日期数据抓取工具',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例用法:
python bjh_analytics_date.py 2025-12-20
python bjh_analytics_date.py 2025-12-20 --days 7
python bjh_analytics_date.py 2025-12-20 --proxy
python bjh_analytics_date.py 2025-12-20 --database
python bjh_analytics_date.py 2025-12-20 --account "乳腺专家林华" # 仅测试单个账号
"""
)
parser.add_argument(
'date',
type=str,
help='目标日期 (格式: YYYY-MM-DD)'
)
parser.add_argument(
'--days',
type=int,
default=7,
help='查询天数从目标日期往前推默认7天'
)
parser.add_argument(
'--proxy',
action='store_true',
default=True, # 默认启用代理
help='启用代理(默认启用)'
)
parser.add_argument(
'--no-proxy',
dest='proxy',
action='store_false',
help='禁用代理'
)
parser.add_argument(
'--database',
action='store_true',
default=True, # 默认从数据库加载Cookie
help='从数据库加载Cookie默认启用'
)
parser.add_argument(
'--local',
dest='database',
action='store_false',
help='从本地JSON文件加载Cookie'
)
parser.add_argument(
'--delay',
type=float,
default=3.0,
help='账号间延迟时间默认3.0'
)
parser.add_argument(
'--account',
type=str,
default=None,
help='仅抓取指定账号(用于测试),格式:账号名称'
)
parser.add_argument(
'--no-confirm',
action='store_true',
help='跳过确认提示,直接开始抓取(用于批量脚本)'
)
args = parser.parse_args()
# 验证日期格式
try:
datetime.strptime(args.date, '%Y-%m-%d')
except ValueError:
print(f"[X] 日期格式错误: {args.date}")
print(" 正确格式: YYYY-MM-DD (例如: 2025-12-20)")
return 1
print("\n" + "="*70)
print("百家号指定日期数据抓取工具")
print("="*70)
print(f"目标日期: {args.date}")
print(f"查询天数: {args.days}")
print(f"使用代理: {'' if args.proxy else ''}")
print(f"数据源: {'数据库' if args.database else '本地文件'}")
print("="*70)
try:
# 创建分析器
analytics = BaijiahaoDateAnalytics(
target_date=args.date,
use_proxy=args.proxy,
load_from_db=args.database
)
if not analytics.account_cookies:
print("\n[X] 未找到可用的账号Cookie")
return 1
# 如果指定了单个账号,验证是否存在
if args.account:
if args.account not in analytics.account_cookies:
print(f"\n[X] 未找到指定账号: {args.account}")
print(f"\n可用账号列表:")
for idx, account_name in enumerate(analytics.account_cookies.keys(), 1):
print(f" {idx}. {account_name}")
return 1
# 只保留指定账号
analytics.account_cookies = {args.account: analytics.account_cookies[args.account]}
print(f"\n[测试模式] 仅抓取账号: {args.account}")
print(f"\n找到 {len(analytics.account_cookies)} 个账号")
# 确认执行(除非使用--no-confirm参数
if not args.no_confirm:
confirm = input("\n是否开始抓取? (y/n): ").strip().lower()
if confirm != 'y':
print("已取消")
return 0
# 提取所有账号数据
results = analytics.extract_all_for_date(
days=args.days,
delay_seconds=args.delay
)
if results:
analytics.save_results(results)
# 显示统计
success_all = sum(1 for r in results if r.get('status') == 'success_all')
success_partial = sum(1 for r in results if r.get('status') == 'success_partial')
failed = sum(1 for r in results if r.get('status') == 'failed')
print(f"\n{'='*70}")
print("数据提取统计")
print(f"{'='*70}")
print(f" 总账号数: {len(results)}")
print(f" 全部成功: {success_all}")
print(f" 部分成功: {success_partial}")
print(f" 失败: {failed}")
print(f"{'='*70}")
return 0
else:
print("\n[X] 未获取到任何数据")
return 1
except Exception as e:
print(f"\n[X] 程序执行出错: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == '__main__':
sys.exit(main())