Files
baijiahao_data_crawl/bjh_data_daemon.py

424 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
百家号数据定时更新守护进程
24小时持续运行定期获取发文统计和收入数据
"""
import json
import sys
import os
import time
import random
from datetime import datetime, timedelta
from typing import Dict, List, Optional
# 设置UTF-8编码
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
# 导入核心功能模块
from bjh_analytics import BaijiahaoAnalytics
from log_config import setup_bjh_daemon_logger
class BaijiahaoDataDaemon:
"""百家号数据定时更新守护进程"""
def __init__(self, update_interval_hours: int = 1, use_proxy: bool = False, load_from_db: bool = False):
"""
初始化守护进程
Args:
update_interval_hours: 更新间隔小时默认1小时
use_proxy: 是否使用代理默认False
load_from_db: 是否从数据库加载Cookie默认False
"""
self.update_interval_hours = update_interval_hours
self.update_interval_seconds = update_interval_hours * 3600
self.use_proxy = use_proxy
self.load_from_db = load_from_db
# 初始化日志
self.logger = setup_bjh_daemon_logger()
self.logger.info(f"守护进程初始化,更新间隔: {update_interval_hours}小时, 代理模式: {use_proxy}, 数据库加载: {load_from_db}")
# 初始化数据抓取器(传入代理和数据库配置)
self.analytics = BaijiahaoAnalytics(use_proxy=use_proxy, load_from_db=load_from_db)
# 获取脚本所在目录
self.script_dir = os.path.dirname(os.path.abspath(__file__))
# 状态文件
self.status_file = os.path.join(self.script_dir, "daemon_status.json")
# 加载状态
self.status = self.load_status()
# 错误处理配置
self.max_consecutive_rate_limits = 3 # 连续限流次数阈值
self.rate_limit_cooldown_hours = 2 # 限流冷却时间(小时)
self.last_rate_limit_time = None # 上次限流时间
def load_status(self) -> Dict:
"""加载守护进程状态"""
try:
if os.path.exists(self.status_file):
with open(self.status_file, 'r', encoding='utf-8') as f:
return json.load(f)
return {
'last_update': None,
'total_runs': 0,
'successful_runs': 0,
'failed_runs': 0,
'rate_limited_runs': 0,
'consecutive_rate_limits': 0, # 连续限流次数
'last_rate_limit_time': None, # 上次限流时间
'in_cooldown': False, # 是否在冷却期
'start_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
except Exception as e:
print(f"[!] 加载状态文件失败: {e}")
return {
'last_update': None,
'total_runs': 0,
'successful_runs': 0,
'failed_runs': 0,
'rate_limited_runs': 0,
'consecutive_rate_limits': 0,
'last_rate_limit_time': None,
'in_cooldown': False,
'start_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
def check_cooldown_status(self) -> tuple:
"""
检查是否在冷却期内
Returns:
tuple: (是否在冷却期, 剩余小时数)
"""
if not self.status.get('last_rate_limit_time'):
return False, 0
last_limit = datetime.strptime(
self.status['last_rate_limit_time'],
'%Y-%m-%d %H:%M:%S'
)
cooldown_end = last_limit + timedelta(hours=self.rate_limit_cooldown_hours)
now = datetime.now()
if now < cooldown_end:
remaining = (cooldown_end - now).total_seconds() / 3600
return True, remaining
# 冷却期已过,重置状态
self.status['in_cooldown'] = False
self.status['consecutive_rate_limits'] = 0
self.save_status()
return False, 0
def handle_rate_limit(self):
"""
处理限流情况
"""
self.status['rate_limited_runs'] += 1
self.status['consecutive_rate_limits'] = self.status.get('consecutive_rate_limits', 0) + 1
self.status['last_rate_limit_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.logger.warning(f"检测到限流,连续次数: {self.status['consecutive_rate_limits']}/{self.max_consecutive_rate_limits}")
# 如果连续限流次数达到阈值,进入冷却期
if self.status['consecutive_rate_limits'] >= self.max_consecutive_rate_limits:
self.status['in_cooldown'] = True
self.logger.error(f"连续{self.max_consecutive_rate_limits}次限流,进入冷却期 {self.rate_limit_cooldown_hours}小时")
print(f"\n[!] 连续{self.max_consecutive_rate_limits}次遭遇限流,进入冷却期")
print(f"[!] 将暂停 {self.rate_limit_cooldown_hours} 小时后自动恢复")
self.save_status()
def save_status(self):
try:
with open(self.status_file, 'w', encoding='utf-8') as f:
json.dump(self.status, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"[!] 保存状态文件失败: {e}")
def should_update(self) -> bool:
"""判断是否应该更新数据"""
if not self.status['last_update']:
return True
last_update = datetime.strptime(self.status['last_update'], '%Y-%m-%d %H:%M:%S')
time_since_update = (datetime.now() - last_update).total_seconds()
return time_since_update >= self.update_interval_seconds
def get_next_update_time(self) -> str:
"""获取下次更新时间"""
if not self.status['last_update']:
return "立即"
last_update = datetime.strptime(self.status['last_update'], '%Y-%m-%d %H:%M:%S')
next_update = last_update + timedelta(seconds=self.update_interval_seconds)
return next_update.strftime('%Y-%m-%d %H:%M:%S')
def update_data(self, days: int = 7) -> bool:
"""
执行一次数据更新
Args:
days: 查询天数
Returns:
bool: 是否成功
"""
# 检查冷却状态
in_cooldown, remaining_hours = self.check_cooldown_status()
if in_cooldown:
self.logger.info(f"当前处于冷却期,剩余 {remaining_hours:.1f} 小时")
print(f"\n[!] 当前处于冷却期,剩余 {remaining_hours:.1f} 小时")
print("[!] 跳过本次更新,等待冷却期结束")
return False
self.logger.info(f"开始数据更新 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("\n" + "="*70)
print(f"开始数据更新 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*70)
self.status['total_runs'] += 1
try:
# 获取所有账号的整合数据
# 使用较长的基础延迟时间,降低被限流的风险
# 每次请求在基础值上添加随机波动,模拟人工操作
base_delay = 10 # 基础延迟10秒
print(f"\n配置信息:")
print(f" - 查询天数: {days}")
print(f" - 账号间隔: {base_delay}秒 ± 30% 随机波动")
print(f" - 账号数量: {len(self.analytics.account_cookies)}")
results = self.analytics.extract_all_integrated_data(
days=days,
delay_seconds=base_delay,
stop_on_rate_limit=True # 遇到连续限流时停止
)
if not results:
self.logger.warning("未获取到任何数据")
print("\n[X] 未获取到任何数据")
self.status['failed_runs'] += 1
return False
# 统计结果
success_all = sum(1 for r in results if r.get('status') == 'success_all')
success_partial = sum(1 for r in results if r.get('status') == 'success_partial')
rate_limited = sum(1 for r in results if r.get('status') == 'rate_limited')
failed = sum(1 for r in results if r.get('status') == 'failed')
# 保存数据
self.analytics.save_results(results)
# 更新状态
self.status['last_update'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 处理限流情况
if rate_limited > 0:
self.handle_rate_limit()
self.logger.warning(f"检测到 {rate_limited} 个账号被限流")
print(f"\n[!] 检测到 {rate_limited} 个账号被限流")
print(f"[!] 连续限流次数: {self.status['consecutive_rate_limits']}/{self.max_consecutive_rate_limits}")
# 如果进入冷却期,建议用户手动检查
if self.status.get('in_cooldown'):
print("\n" + "="*70)
print("⚠️ 建议操作:")
print(" 1. 检查账号Cookie是否有效运行: python update_account_info.py")
print(" 2. 手动登录百家号后台查看是否被限制")
print(" 3. 考虑增加更新间隔时间如12-24小时")
print(" 4. 守护进程将在冷却期后自动恢复")
print("="*70)
elif success_all > 0 or success_partial > 0:
# 成功,重置连续限流计数
self.status['consecutive_rate_limits'] = 0
self.status['successful_runs'] += 1
self.logger.info(f"数据更新成功: 全部成功={success_all}, 部分成功={success_partial}")
else:
self.status['failed_runs'] += 1
self.logger.error("数据更新失败")
# 显示统计
print(f"\n{'='*70}")
print("本次更新统计")
print(f"{'='*70}")
print(f" 总账号数: {len(results)}")
print(f" 全部成功: {success_all} (发文+收入)")
print(f" 部分成功: {success_partial}")
print(f" 限流: {rate_limited}")
print(f" 失败: {failed}")
print(f"{'='*70}")
return True
except Exception as e:
self.logger.error(f"更新数据时发生错误: {e}", exc_info=True)
print(f"\n[X] 更新数据时发生错误: {e}")
self.status['failed_runs'] += 1
return False
finally:
self.save_status()
def run(self, days: int = 7):
"""
启动守护进程
Args:
days: 查询天数
"""
self.logger.info(f"守护进程启动: 更新间隔={self.update_interval_hours}小时, 查询天数={days}")
print("\n" + "="*70)
print("百家号数据定时更新守护进程")
print("="*70)
print(f"\n配置:")
print(f" - 更新间隔: 每 {self.update_interval_hours} 小时")
print(f" - 查询天数: {days}")
print(f" - 启动时间: {self.status['start_time']}")
if self.status['last_update']:
print(f" - 上次更新: {self.status['last_update']}")
print(f"\n运行统计:")
print(f" - 总运行次数: {self.status['total_runs']}")
print(f" - 成功: {self.status['successful_runs']}")
print(f" - 限流: {self.status['rate_limited_runs']}")
print(f" - 失败: {self.status['failed_runs']}")
print("\n" + "="*70)
print("按 Ctrl+C 停止守护进程")
print("="*70)
try:
while True:
# 检查是否需要更新
if self.should_update():
self.update_data(days=days)
# 如果遇到限流,增加额外等待时间
if self.status.get('rate_limited_runs', 0) > self.status.get('successful_runs', 0):
extra_wait = random.uniform(1800, 3600) # 额外等待30-60分钟
print(f"\n[!] 检测到频繁限流,额外等待 {extra_wait/60:.1f} 分钟")
time.sleep(extra_wait)
# 计算距离下次更新的时间
next_update_time = self.get_next_update_time()
if next_update_time == "立即":
continue
next_update_dt = datetime.strptime(next_update_time, '%Y-%m-%d %H:%M:%S')
seconds_until_update = (next_update_dt - datetime.now()).total_seconds()
if seconds_until_update > 0:
print(f"\n下次更新时间: {next_update_time}")
print(f"等待中... ({seconds_until_update/3600:.1f} 小时)")
# 每5分钟显示一次状态
check_interval = 300 # 5分钟
elapsed = 0
while elapsed < seconds_until_update:
sleep_time = min(check_interval, seconds_until_update - elapsed)
time.sleep(sleep_time)
elapsed += sleep_time
remaining = seconds_until_update - elapsed
if remaining > 0:
print(f" 剩余时间: {remaining/3600:.1f} 小时 ({datetime.now().strftime('%H:%M:%S')}")
except KeyboardInterrupt:
self.logger.info("守护进程收到停止信号")
print("\n\n" + "="*70)
print("守护进程已停止")
print("="*70)
print(f"\n运行总结:")
print(f" - 启动时间: {self.status['start_time']}")
print(f" - 停止时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f" - 总运行次数: {self.status['total_runs']}")
print(f" - 成功: {self.status['successful_runs']}")
print(f" - 限流: {self.status['rate_limited_runs']}")
print(f" - 失败: {self.status['failed_runs']}")
if self.status['last_update']:
print(f" - 最后更新: {self.status['last_update']}")
print("="*70 + "\n")
self.logger.info(f"守护进程运行总结: 总运行={self.status['total_runs']}, 成功={self.status['successful_runs']}, 限流={self.status['rate_limited_runs']}, 失败={self.status['failed_runs']}")
self.save_status()
def main():
"""主函数"""
print("\n" + "="*70)
print("百家号数据定时更新守护进程")
print("="*70)
# 配置参数
print("\n请配置守护进程参数:\n")
# 更新间隔
interval_input = input("1. 更新间隔小时默认1小时: ").strip()
update_interval = int(interval_input) if interval_input.isdigit() and int(interval_input) > 0 else 1
# 查询天数
days_input = input("2. 查询天数默认7天: ").strip()
days = int(days_input) if days_input.isdigit() and int(days_input) > 0 else 7
# 选择Cookie数据源
print("\n3. 选择Cookie数据源:")
print(" 1) 本地JSON文件")
print(" 2) MySQL数据库")
source_input = input(" 请选择 (1/2, 默认1): ").strip() or '1'
load_from_db = source_input == '2'
# 是否使用代理
proxy_input = input("\n4. 是否使用代理y/n默认n: ").strip().lower()
use_proxy = proxy_input == 'y'
# 创建守护进程
daemon = BaijiahaoDataDaemon(update_interval_hours=update_interval, use_proxy=use_proxy, load_from_db=load_from_db)
# 检查账号
if not daemon.analytics.account_cookies:
print("\n[X] 未找到账号Cookie请先运行Cookie捕获工具")
return
print(f"\n找到 {len(daemon.analytics.account_cookies)} 个账号")
# 确认启动
print("\n" + "="*70)
print("配置确认:")
print(f" - 更新间隔: 每 {update_interval} 小时")
print(f" - 查询天数: {days}")
print(f" - 数据源: {'数据库' if load_from_db else '本地文件'}")
print(f" - 代理模式: {'已启用' if use_proxy else '未启用'}")
print(f" - 账号数量: {len(daemon.analytics.account_cookies)}")
print(f" - 每次更新预计耗时: {len(daemon.analytics.account_cookies) * 12 / 60:.1f} 分钟")
print("="*70)
confirm = input("\n确认启动守护进程?(y/n): ").strip().lower()
if confirm != 'y':
print("\n已取消")
return
# 启动守护进程
daemon.run(days=days)
if __name__ == '__main__':
main()