Files
baijiahao_data_crawl/bjh_analytics_date.py

810 lines
33 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
百家号指定日期数据抓取工具
根据指定日期范围抓取发文统计和收入数据
"""
import json
import sys
import os
import argparse
from datetime import datetime, timedelta
from typing import Dict, List, Optional
# 导入基础分析器
from bjh_analytics import BaijiahaoAnalytics
# 设置标准输出编码为UTF-8
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
class BaijiahaoDateAnalytics(BaijiahaoAnalytics):
"""百家号指定日期数据抓取器"""
def __init__(self, target_date: str, use_proxy: bool = False, load_from_db: bool = False, db_config: Optional[Dict] = None):
"""初始化
Args:
target_date: 目标日期 (YYYY-MM-DD)
use_proxy: 是否使用代理
load_from_db: 是否从数据库加载Cookie
db_config: 数据库配置
"""
super().__init__(use_proxy=use_proxy, load_from_db=load_from_db, db_config=db_config)
# 解析目标日期
try:
self.target_date = datetime.strptime(target_date, '%Y-%m-%d')
self.target_date_str = target_date
except ValueError:
raise ValueError(f"日期格式错误: {target_date},正确格式: YYYY-MM-DD")
# 修改输出文件名(不带日期,使用固定文件名)
self.output_file = os.path.join(
self.script_dir,
"bjh_integrated_data.json"
)
# 创建备份文件夹
self.backup_dir = os.path.join(self.script_dir, "backup")
if not os.path.exists(self.backup_dir):
os.makedirs(self.backup_dir)
print(f"[配置] 目标日期: {target_date}")
print(f"[配置] 输出文件: {self.output_file}")
print(f"[配置] 备份目录: {self.backup_dir}")
def fetch_analytics_api_for_date(self, days: int = 7, max_retries: int = 3) -> Optional[Dict]:
"""获取指定日期范围的发文统计数据
Args:
days: 查询天数从target_date往前推
max_retries: 最大重试次数
Returns:
发文统计数据
"""
import time
# 计算日期范围从target_date往前推days天
end_date = self.target_date
start_date = end_date - timedelta(days=days-1)
start_day = start_date.strftime('%Y%m%d')
end_day = end_date.strftime('%Y%m%d')
# API端点
api_url = f"{self.base_url}/author/eco/statistics/appStatisticV3"
# 请求参数不使用special_filter_days直接指定日期范围
params = {
'type': 'event',
'start_day': start_day,
'end_day': end_day,
'stat': '0'
}
# 从Cookie中提取token
token_cookie = self.session.cookies.get('bjhStoken') or self.session.cookies.get('devStoken')
# 请求头
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': f'{self.base_url}/builder/rc/analysiscontent',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
}
if token_cookie:
headers['token'] = token_cookie
self.logger.info(f"获取发文统计: {start_date.strftime('%Y-%m-%d')}{end_date.strftime('%Y-%m-%d')}")
print(f"\n[请求] 获取发文统计数据")
print(f" 日期范围: {start_date.strftime('%Y-%m-%d')}{end_date.strftime('%Y-%m-%d')}")
successful_data = []
retry_count = 0
proxy_change_count = 0 # 代理更换次数计数器
max_proxy_changes = 3 # 最多更换3次代理即最多使用4个不同代理
while retry_count <= max_retries:
try:
if retry_count > 0:
wait_time = retry_count * 2
print(f" [重试 {retry_count}/{max_retries}] 等待 {wait_time} 秒...")
time.sleep(wait_time)
proxies = self.fetch_proxy() if self.use_proxy else None
# 调试信息:显示代理使用情况
if self.use_proxy:
if proxies:
proxy_url = proxies.get('http', '')
if '@' in proxy_url:
proxy_ip = proxy_url.split('@')[1]
else:
proxy_ip = proxy_url.replace('http://', '').replace('https://', '')
print(f" [代理] 使用IP: {proxy_ip}")
else:
print(f" [!] 警告代理未生效use_proxy={self.use_proxy}")
response = self.session.get(
api_url,
headers=headers,
params=params,
proxies=proxies,
timeout=15,
verify=False
)
print(f" 状态码: {response.status_code}")
if response.status_code == 200:
data = response.json()
errno = data.get('errno', -1)
if errno == 0:
print(f" [✓] API调用成功")
# 请求成功,重置代理失败计数
self.reset_proxy_fail_count()
# 检查data字段类型
data_field = data.get('data', {})
if isinstance(data_field, list):
print(f" [X] API返回数据格式异常: data字段为列表而非字典")
print(f" 原始响应前500字符: {str(data)[:500]}")
break
if not isinstance(data_field, dict):
print(f" [X] API返回数据格式异常: data字段类型为 {type(data_field).__name__}")
break
total_info = data_field.get('total_info', {})
print(f"\n 发文统计数据:")
print(f" 发文量: {total_info.get('publish_count', '0')}")
print(f" 曝光量: {total_info.get('disp_pv', '0')}")
print(f" 阅读量: {total_info.get('view_count', '0')}")
api_result = {
'endpoint': '/author/eco/statistics/appStatisticV3',
'name': '发文统计',
'date_range': f"{start_day} - {end_day}",
'data': data,
'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
successful_data.append(api_result)
break
else:
errmsg = data.get('errmsg', '')
print(f" [X] API返回错误: errno={errno}, errmsg={errmsg}")
# 特别处理 errno=10000015异常请求这通常是代理未生效
if errno == 10000015 and self.use_proxy:
print(f" [!] 检测到代理未生效,立即更换新代理")
# 检查是否超过代理更换上限
if proxy_change_count >= max_proxy_changes:
print(f" [X] 已达代理更换上限({max_proxy_changes}次),放弃重试")
break
# 立即强制获取新代理
self.current_proxy = None
self.proxy_fail_count = 0
new_proxy = self.fetch_proxy(force_new=True)
if new_proxy and retry_count < max_retries:
proxy_change_count += 1
print(f" [!] 已更换新代理({proxy_change_count}/{max_proxy_changes}),将重试...")
retry_count += 1
continue
else:
print(f" [X] 无法获取新代理或已达重试上限")
break
else:
# 其他API错误不重试
break
else:
print(f" [X] HTTP错误: {response.status_code}")
break
except Exception as e:
error_type = type(e).__name__
is_retry_error = any([
'Connection' in error_type,
'Timeout' in error_type,
'ProxyError' in error_type,
])
if is_retry_error and retry_count < max_retries:
print(f" [!] 连接错误: {error_type}")
# 标记代理失败
self.mark_proxy_failed()
# 如果代理失败次数达到3次强制更换新代理第4次重试用新代理
if self.proxy_fail_count >= 3 and self.use_proxy:
# 检查是否超过代理更换上限
if proxy_change_count >= max_proxy_changes:
print(f" [X] 已达代理更换上限({max_proxy_changes}次),放弃重试")
break
print(f" [!] 代理已失败{self.proxy_fail_count}次,强制更换新代理")
self.current_proxy = None
self.proxy_fail_count = 0
new_proxy = self.fetch_proxy(force_new=True)
if new_proxy:
proxy_change_count += 1
print(f" [✓] 已更换新代理({proxy_change_count}/{max_proxy_changes}),继续重试")
else:
print(f" [X] 无法获取新代理")
break
retry_count += 1
continue
else:
print(f" [X] 请求异常: {e}")
break
if successful_data:
return {
'apis': successful_data,
'count': len(successful_data)
}
return None
def fetch_income_for_date(self, max_retries: int = 3) -> Optional[Dict]:
"""获取指定日期的收入数据
使用overviewhomelist API获取按天的详细收入数据
Returns:
收入数据
"""
import time
from datetime import timedelta
# 计算Unix时间戳从目标日期往前30天以便获取更多数据
end_date = self.target_date
start_date = end_date - timedelta(days=29) # 30天范围
# 转换为Unix时间戳
start_timestamp = int(start_date.timestamp())
end_timestamp = int(end_date.timestamp())
# 使用overviewhomelist API获取每日收入明细
api_url = f"{self.base_url}/author/eco/income4/overviewhomelist"
token_cookie = self.session.cookies.get('bjhStoken') or self.session.cookies.get('devStoken')
headers = {
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': f'{self.base_url}/builder/rc/incomecenter',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
}
if token_cookie:
headers['token'] = token_cookie
# 请求参数
params = {
'start_date': start_timestamp,
'end_date': end_timestamp
}
print(f"\n[请求] 获取收入数据")
print(f" 日期范围: {start_date.strftime('%Y-%m-%d')}{end_date.strftime('%Y-%m-%d')}")
retry_count = 0
proxy_change_count = 0 # 代理更换次数计数器
max_proxy_changes = 3 # 最多更换3次代理即最多使用4个不同代理
while retry_count <= max_retries:
try:
if retry_count > 0:
wait_time = retry_count * 2
print(f" [重试 {retry_count}/{max_retries}] 等待 {wait_time} 秒...")
time.sleep(wait_time)
proxies = self.fetch_proxy() if self.use_proxy else None
# 调试信息:显示代理使用情况
if self.use_proxy:
if proxies:
proxy_url = proxies.get('http', '')
if '@' in proxy_url:
proxy_ip = proxy_url.split('@')[1]
else:
proxy_ip = proxy_url.replace('http://', '').replace('https://', '')
print(f" [代理] 使用IP: {proxy_ip}")
else:
print(f" [!] 警告代理未生效use_proxy={self.use_proxy}")
response = self.session.get(
api_url,
headers=headers,
params=params,
proxies=proxies,
timeout=15,
verify=False
)
print(f" 状态码: {response.status_code}")
if response.status_code == 200:
data = response.json()
errno = data.get('errno', -1)
if errno == 0:
print(f" [✓] API调用成功")
# 请求成功,重置代理失败计数
self.reset_proxy_fail_count()
# 提取收入列表
income_list = data.get('data', {}).get('list', [])
if income_list:
# 找到目标日期的数据
target_timestamp = int(self.target_date.timestamp())
target_income_data = None
for item in income_list:
if item.get('day_time') == target_timestamp:
target_income_data = item
break
if target_income_data:
day_revenue = target_income_data.get('total_income', 0)
print(f"\n 收入数据详情:")
print(f" {self.target_date_str} 当日收入: ¥{day_revenue:.2f}")
# 计算近7天收入
recent7_revenue = 0.0
recent7_start = self.target_date - timedelta(days=6)
recent7_start_ts = int(recent7_start.timestamp())
for item in income_list:
if recent7_start_ts <= item.get('day_time', 0) <= target_timestamp:
recent7_revenue += item.get('total_income', 0)
print(f" 近7天: ¥{recent7_revenue:.2f}")
# 计算近30天收入
recent30_revenue = sum(item.get('total_income', 0) for item in income_list)
print(f" 近30天: ¥{recent30_revenue:.2f}")
# 计算当月收入(从月初到目标日期)
month_start = self.target_date.replace(day=1)
month_start_ts = int(month_start.timestamp())
current_month_revenue = 0.0
for item in income_list:
if month_start_ts <= item.get('day_time', 0) <= target_timestamp:
current_month_revenue += item.get('total_income', 0)
print(f" 当月收入: ¥{current_month_revenue:.2f}")
# 构造返回数据(与原有格式保持一致)
return {
'errno': 0,
'errmsg': 'success',
'data': {
'income': {
'yesterday': {
'income': day_revenue,
'value': day_revenue
},
'recent7Days': {
'income': recent7_revenue,
'value': recent7_revenue
},
'recent30Days': {
'income': recent30_revenue,
'value': recent30_revenue
},
'currentMonth': {
'income': current_month_revenue,
'value': current_month_revenue
}
}
},
'raw_list': income_list # 保留原始数据
}
else:
print(f" [警告] 未找到 {self.target_date_str} 的收入数据")
return None
else:
print(f" [警告] 收入数据列表为空")
return None
else:
errmsg = data.get('errmsg', '')
print(f" [X] API返回错误: errno={errno}, errmsg={errmsg}")
# 特别处理 errno=10000015异常请求这通常是代理未生效
if errno == 10000015 and self.use_proxy:
print(f" [!] 检测到代理未生效,立即更换新代理")
# 检查是否超过代理更换上限
if proxy_change_count >= max_proxy_changes:
print(f" [X] 已达代理更换上限({max_proxy_changes}次),放弃重试")
return None
# 立即强制获取新代理
self.current_proxy = None
self.proxy_fail_count = 0
new_proxy = self.fetch_proxy(force_new=True)
if new_proxy and retry_count < max_retries:
proxy_change_count += 1
print(f" [!] 已更换新代理({proxy_change_count}/{max_proxy_changes}),将重试...")
retry_count += 1
continue
else:
print(f" [X] 无法获取新代理或已达重试上限")
return None
else:
# 其他API错误不重试
return None
else:
print(f" [X] HTTP错误: {response.status_code}")
return None
except Exception as e:
error_type = type(e).__name__
is_retry_error = any([
'Connection' in error_type,
'Timeout' in error_type,
'ProxyError' in error_type,
])
if is_retry_error and retry_count < max_retries:
print(f" [!] 连接错误: {error_type}")
# 标记代理失败
self.mark_proxy_failed()
# 如果代理失败次数达到3次强制更换新代理第4次重试用新代理
if self.proxy_fail_count >= 3 and self.use_proxy:
# 检查是否超过代理更换上限
if proxy_change_count >= max_proxy_changes:
print(f" [X] 已达代理更换上限({max_proxy_changes}次),放弃重试")
return None
print(f" [!] 代理已失败{self.proxy_fail_count}次,强制更换新代理")
self.current_proxy = None
self.proxy_fail_count = 0
new_proxy = self.fetch_proxy(force_new=True)
if new_proxy:
proxy_change_count += 1
print(f" [✓] 已更换新代理({proxy_change_count}/{max_proxy_changes}),继续重试")
else:
print(f" [X] 无法获取新代理")
return None
retry_count += 1
continue
else:
print(f" [X] 请求异常: {e}")
return None
return None
def extract_integrated_data_for_date(self, account_id: str, days: int = 7) -> Optional[Dict]:
"""提取指定账号在指定日期的整合数据
Args:
account_id: 账号ID
days: 查询天数从target_date往前推
Returns:
整合数据
"""
import time
import random
print(f"\n{'='*70}")
print(f"开始提取账号数据: {account_id}")
print(f"目标日期: {self.target_date_str}")
print(f"{'='*70}")
if not self.set_account_cookies(account_id):
return None
result = {
'account_id': account_id,
'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'target_date': self.target_date_str,
'status': 'unknown',
'analytics': {},
'income': {},
'error_info': {}
}
# 1. 获取发文统计数据
print("\n[1/2] 获取发文统计数据...")
api_data = self.fetch_analytics_api_for_date(days=days)
if api_data:
result['analytics'] = api_data
print("[OK] 发文统计数据获取成功")
else:
print("[X] 发文统计数据获取失败")
result['error_info']['analytics'] = 'API调用失败'
# API调用间隔
api_delay = random.uniform(2, 4)
print(f"\n[间隔] 等待 {api_delay:.1f} 秒...")
time.sleep(api_delay)
# 2. 获取收入数据
print("\n[2/2] 获取收入数据...")
income_data = self.fetch_income_for_date()
if income_data:
result['income'] = income_data
print("[OK] 收入数据获取成功")
else:
print("[X] 收入数据获取失败")
result['error_info']['income'] = 'API调用失败'
# 设置状态
if result['analytics'] and result['income']:
result['status'] = 'success_all'
elif result['analytics'] or result['income']:
result['status'] = 'success_partial'
else:
result['status'] = 'failed'
return result
def extract_all_for_date(self, days: int = 7, delay_seconds: float = 3.0) -> List[Dict]:
"""提取所有账号在指定日期的数据
Args:
days: 查询天数
delay_seconds: 账号间延迟
Returns:
所有账号的数据
"""
import random
if not self.account_cookies:
print("[X] 没有可用的账号Cookie")
return []
print("\n" + "="*70)
print(f"开始提取 {len(self.account_cookies)} 个账号的数据")
print(f"目标日期: {self.target_date_str}")
print("="*70)
results = []
for idx, account_id in enumerate(self.account_cookies.keys(), 1):
print(f"\n[{idx}/{len(self.account_cookies)}] 处理账号: {account_id}")
result = self.extract_integrated_data_for_date(account_id, days=days)
if result:
results.append(result)
# 添加延迟
if idx < len(self.account_cookies):
actual_delay = delay_seconds * random.uniform(0.7, 1.3)
print(f"\n[延迟] 等待 {actual_delay:.1f} 秒后继续...")
import time
time.sleep(actual_delay)
return results
def save_results(self, results: List[Dict]):
"""保存结果到文件(同时备份带时间戳的副本)
Args:
results: 数据分析结果列表
"""
import json
import shutil
try:
# 1. 保存到主文件(不带时间戳)
with open(self.output_file, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"\n{'='*70}")
print(f"[OK] 数据已保存到: {self.output_file}")
# 2. 创建带时间戳的备份文件(只保留日期)
timestamp = datetime.now().strftime('%Y%m%d')
backup_filename = f"bjh_integrated_data_{timestamp}.json"
backup_file = os.path.join(self.backup_dir, backup_filename)
# 复制文件到备份目录
shutil.copy2(self.output_file, backup_file)
print(f"[OK] 备份已保存到: {backup_file}")
print(f"{'='*70}")
# 显示统计
success_count = sum(1 for r in results if r.get('status', '').startswith('success'))
print(f"\n统计信息:")
print(f" - 总账号数: {len(results)}")
print(f" - 成功获取: {success_count}")
print(f" - 失败: {len(results) - success_count}")
except Exception as e:
print(f"[X] 保存文件失败: {e}")
def main():
"""主函数"""
parser = argparse.ArgumentParser(
description='百家号指定日期数据抓取工具',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例用法:
python bjh_analytics_date.py 2025-12-20
python bjh_analytics_date.py 2025-12-20 --days 7
python bjh_analytics_date.py 2025-12-20 --proxy
python bjh_analytics_date.py 2025-12-20 --database
python bjh_analytics_date.py 2025-12-20 --account "乳腺专家林华" # 仅测试单个账号
"""
)
parser.add_argument(
'date',
type=str,
help='目标日期 (格式: YYYY-MM-DD)'
)
parser.add_argument(
'--days',
type=int,
default=7,
help='查询天数从目标日期往前推默认7天'
)
parser.add_argument(
'--proxy',
action='store_true',
default=True, # 默认启用代理
help='启用代理(默认启用)'
)
parser.add_argument(
'--no-proxy',
dest='proxy',
action='store_false',
help='禁用代理'
)
parser.add_argument(
'--database',
action='store_true',
default=True, # 默认从数据库加载Cookie
help='从数据库加载Cookie默认启用'
)
parser.add_argument(
'--local',
dest='database',
action='store_false',
help='从本地JSON文件加载Cookie'
)
parser.add_argument(
'--delay',
type=float,
default=3.0,
help='账号间延迟时间默认3.0'
)
parser.add_argument(
'--account',
type=str,
default=None,
help='仅抓取指定账号(用于测试),格式:账号名称'
)
parser.add_argument(
'--no-confirm',
action='store_true',
help='跳过确认提示,直接开始抓取(用于批量脚本)'
)
args = parser.parse_args()
# 验证日期格式
try:
datetime.strptime(args.date, '%Y-%m-%d')
except ValueError:
print(f"[X] 日期格式错误: {args.date}")
print(" 正确格式: YYYY-MM-DD (例如: 2025-12-20)")
return 1
print("\n" + "="*70)
print("百家号指定日期数据抓取工具")
print("="*70)
print(f"目标日期: {args.date}")
print(f"查询天数: {args.days}")
print(f"使用代理: {'' if args.proxy else ''}")
print(f"数据源: {'数据库' if args.database else '本地文件'}")
print("="*70)
try:
# 创建分析器
analytics = BaijiahaoDateAnalytics(
target_date=args.date,
use_proxy=args.proxy,
load_from_db=args.database
)
if not analytics.account_cookies:
print("\n[X] 未找到可用的账号Cookie")
return 1
# 如果指定了单个账号,验证是否存在
if args.account:
if args.account not in analytics.account_cookies:
print(f"\n[X] 未找到指定账号: {args.account}")
print(f"\n可用账号列表:")
for idx, account_name in enumerate(analytics.account_cookies.keys(), 1):
print(f" {idx}. {account_name}")
return 1
# 只保留指定账号
analytics.account_cookies = {args.account: analytics.account_cookies[args.account]}
print(f"\n[测试模式] 仅抓取账号: {args.account}")
print(f"\n找到 {len(analytics.account_cookies)} 个账号")
# 确认执行(除非使用--no-confirm参数
if not args.no_confirm:
confirm = input("\n是否开始抓取? (y/n): ").strip().lower()
if confirm != 'y':
print("已取消")
return 0
# 提取所有账号数据
results = analytics.extract_all_for_date(
days=args.days,
delay_seconds=args.delay
)
if results:
analytics.save_results(results)
# 显示统计
success_all = sum(1 for r in results if r.get('status') == 'success_all')
success_partial = sum(1 for r in results if r.get('status') == 'success_partial')
failed = sum(1 for r in results if r.get('status') == 'failed')
print(f"\n{'='*70}")
print("数据提取统计")
print(f"{'='*70}")
print(f" 总账号数: {len(results)}")
print(f" 全部成功: {success_all}")
print(f" 部分成功: {success_partial}")
print(f" 失败: {failed}")
print(f"{'='*70}")
return 0
else:
print("\n[X] 未获取到任何数据")
return 1
except Exception as e:
print(f"\n[X] 程序执行出错: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == '__main__':
sys.exit(main())