Files
baijiahao_data_crawl/data_sync_daemon.py
“shengyudong” 322ac74336 2025-12-25 upload
2025-12-25 11:16:59 +08:00

507 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
数据同步守护进程
功能:
1. 24小时不间断运行
2. 在每天午夜00:00自动执行数据抓取和同步
3. 自动执行流程:
- 从百家号API抓取最新数据
- 生成CSV文件包含从数据库查询的author_id
- 将CSV数据导入到数据库
4. 支持手动触发刷新
5. 详细的日志记录
使用场景:
- 24/7运行每天凌晨自动更新数据
- 无需人工干预,自动化数据同步
"""
import sys
import os
import time
import schedule
from datetime import datetime, timedelta
from typing import Optional
# 设置UTF-8编码
if sys.platform == 'win32':
import io
if not isinstance(sys.stdout, io.TextIOWrapper) or sys.stdout.encoding != 'utf-8':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
if not isinstance(sys.stderr, io.TextIOWrapper) or sys.stderr.encoding != 'utf-8':
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
from bjh_analytics import BaijiahaoAnalytics
from export_to_csv import DataExporter
from import_csv_to_database import CSVImporter
from log_config import setup_logger
class DataSyncDaemon:
"""数据同步守护进程"""
def __init__(self, use_proxy: bool = False, load_from_db: bool = True, days: int = 7, max_retries: int = 3):
"""初始化守护进程
Args:
use_proxy: 是否使用代理
load_from_db: 是否从数据库加载Cookie
days: 抓取最近多少天的数据
max_retries: 最大重试次数
"""
self.script_dir = os.path.dirname(os.path.abspath(__file__))
self.use_proxy = use_proxy
self.load_from_db = load_from_db
self.days = days
self.max_retries = max_retries
# 初始化日志
self.logger = setup_logger('data_sync_daemon', os.path.join(self.script_dir, 'logs', 'data_sync_daemon.log'))
# 统计信息
self.stats = {
'total_runs': 0,
'successful_runs': 0,
'failed_runs': 0,
'last_run_time': None,
'last_success_time': None,
'last_error': None
}
print("\n" + "="*70)
print("数据同步守护进程")
print("="*70)
print(f" 使用代理: {'' if use_proxy else ''}")
print(f" Cookie来源: {'数据库' if load_from_db else '本地文件'}")
print(f" 抓取天数: {days}")
print(f" 错误重试: 最大{max_retries}")
print(f" 定时执行: 每天午夜00:00")
print("="*70 + "\n")
self.logger.info("="*70)
self.logger.info("数据同步守护进程启动")
self.logger.info(f"使用代理: {use_proxy}, Cookie来源: {'数据库' if load_from_db else '本地文件'}, 抓取天数: {days}, 重试: {max_retries}")
self.logger.info("="*70)
def fetch_data(self) -> bool:
"""步骤1抓取数据"""
print("\n" + "="*70)
print("【步骤1/3】抓取数据")
print("="*70)
try:
# 初始化分析器
analytics = BaijiahaoAnalytics(use_proxy=self.use_proxy, load_from_db=self.load_from_db)
if not analytics.account_cookies:
print("[X] 未找到可用的账号Cookie跳过本次同步")
self.logger.error("未找到可用的账号Cookie")
return False
account_ids = list(analytics.account_cookies.keys())
print(f"\n找到 {len(account_ids)} 个账号")
self.logger.info(f"找到 {len(account_ids)} 个账号")
# 抓取所有账号数据
print("\n开始抓取数据...")
print(f" 错误重试策略:遇到代理连接错误时自动重试,最大{self.max_retries}\n")
self.logger.info(f"开始抓取数据,错误重试: 最大{self.max_retries}")
success_count = 0
failed_accounts = []
partial_success_accounts = []
results = []
for i, account_id in enumerate(account_ids, 1):
print(f"\n[{i}/{len(account_ids)}] 抓取账号: {account_id}")
print("-"*70)
# 重试机制
retry_count = 0
result = None
last_error = None
while retry_count <= self.max_retries:
try:
# 如果是重试,先等待一段时间
if retry_count > 0:
wait_time = retry_count * 2 # 递增等待时间2秒、4秒、6秒
print(f" [重试 {retry_count}/{self.max_retries}] 等待 {wait_time} 秒后重试...")
self.logger.info(f"账号 {account_id}{retry_count}次重试,等待{wait_time}")
time.sleep(wait_time)
print(f" [重试 {retry_count}/{self.max_retries}] 开始重新抓取...")
result = analytics.extract_integrated_data(account_id, days=self.days)
if result:
results.append(result)
status = result.get('status', '')
if status == 'success_all':
success_count += 1
print(f" [✓] 抓取成功")
self.logger.info(f"账号 {account_id} 抓取成功 (完全成功)")
break # 成功,跳出重试循环
elif 'success' in status:
# 部分成功
partial_success_accounts.append(account_id)
print(f" [!] 部分成功: {status}")
self.logger.warning(f"账号 {account_id} 部分成功: {status}")
break # 部分成功也跳出重试循环
else:
# 失败
last_error = f"状态: {status}"
if retry_count >= self.max_retries:
failed_accounts.append(account_id)
print(f" [X] 抓取失败: {status}")
self.logger.error(f"账号 {account_id} 抓取失败: {status}")
break
else:
retry_count += 1
continue
else:
# 返回None表示完全失败
last_error = "返回结果为空"
if retry_count >= self.max_retries:
failed_accounts.append(account_id)
print(f" [X] 抓取失败: 无返回结果")
self.logger.error(f"账号 {account_id} 抓取失败: 无返回结果")
break
else:
retry_count += 1
continue
except Exception as e:
last_error = str(e)
error_type = type(e).__name__
# 判断是否是代理相关错误
is_proxy_error = any([
'Connection' in error_type,
'RemoteDisconnected' in error_type,
'ProxyError' in error_type,
'Timeout' in error_type,
'ConnectionError' in str(e),
'Connection aborted' in str(e),
'Remote end closed' in str(e),
])
if is_proxy_error:
print(f" [!] 代理连接错误: {error_type}")
self.logger.warning(f"账号 {account_id} 代理连接错误: {error_type} - {last_error}")
else:
print(f" [!] 发生异常: {error_type} - {e}")
self.logger.error(f"账号 {account_id} 异常: {error_type} - {e}", exc_info=True)
if retry_count >= self.max_retries:
failed_accounts.append(account_id)
print(f" [X] 抓取失败(已达最大重试次数)")
self.logger.error(f"账号 {account_id} 最终失败,已达最大重试次数")
break
else:
retry_count += 1
continue
# 保存到文件
if results:
analytics.save_results(results)
self.logger.info(f"数据已保存到 bjh_integrated_data.json{len(results)} 个账号")
print(f"\n数据抓取完成:")
print(f" 完全成功: {success_count}/{len(account_ids)}")
self.logger.info(f"数据抓取完成: 完全成功 {success_count}/{len(account_ids)}")
if partial_success_accounts:
print(f" 部分成功: {len(partial_success_accounts)} 个账号")
print(f" 账号列表: {', '.join(partial_success_accounts)}")
self.logger.info(f"部分成功: {len(partial_success_accounts)} 个账号 - {', '.join(partial_success_accounts)}")
if failed_accounts:
print(f" 完全失败: {len(failed_accounts)} 个账号")
print(f" 账号列表: {', '.join(failed_accounts)}")
self.logger.error(f"完全失败: {len(failed_accounts)} 个账号 - {', '.join(failed_accounts)}")
# 只要有成功或部分成功的数据,就继续执行
if results:
return True
else:
print("\n[X] 没有成功抓取任何数据")
self.logger.error("没有成功抓取任何数据")
return False
except Exception as e:
print(f"\n[X] 数据抓取失败: {e}")
self.logger.error(f"数据抓取失败: {e}", exc_info=True)
return False
def generate_csv(self) -> bool:
"""步骤2生成CSV文件"""
print("\n" + "="*70)
print("【步骤2/3】生成CSV文件")
print("="*70)
try:
# 初始化导出器CSV模式连接数据库查询author_id
exporter = DataExporter(use_database=False)
print("\n生成三个CSV文件...")
print(" 注意author_id 将从 ai_authors 表中查询")
# 执行CSV导出
success = exporter.export_all_tables()
if success:
print("\n[OK] CSV文件生成成功")
self.logger.info("CSV文件生成成功")
return True
else:
print("\n[X] CSV文件生成失败")
self.logger.error("CSV文件生成失败")
return False
except Exception as e:
print(f"\n[X] CSV生成失败: {e}")
self.logger.error(f"CSV生成失败: {e}", exc_info=True)
return False
def import_to_database(self) -> bool:
"""步骤3导入数据库"""
print("\n" + "="*70)
print("【步骤3/3】导入数据库")
print("="*70)
try:
# 创建导入器
importer = CSVImporter()
print("\n将CSV数据导入到数据库...")
# 导入所有CSV文件
success = importer.import_all()
if success:
print("\n[OK] 数据库导入成功")
self.logger.info("数据库导入成功")
return True
else:
print("\n[X] 数据库导入失败")
self.logger.error("数据库导入失败")
return False
except Exception as e:
print(f"\n[X] 数据库导入失败: {e}")
self.logger.error(f"数据库导入失败: {e}", exc_info=True)
return False
def sync_data(self):
"""执行完整的数据同步流程"""
start_time = datetime.now()
self.stats['total_runs'] += 1
self.stats['last_run_time'] = start_time
print("\n" + "="*70)
print(f"开始数据同步 - {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
print("="*70)
self.logger.info("="*70)
self.logger.info(f"开始数据同步 - {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
self.logger.info("="*70)
try:
# 步骤1抓取数据
if not self.fetch_data():
raise Exception("数据抓取失败")
# 步骤2生成CSV
if not self.generate_csv():
raise Exception("CSV生成失败")
# 步骤3导入数据库
if not self.import_to_database():
raise Exception("数据库导入失败")
# 成功
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
self.stats['successful_runs'] += 1
self.stats['last_success_time'] = end_time
self.stats['last_error'] = None
print("\n" + "="*70)
print(f"✓ 数据同步完成 - 用时 {duration:.1f}")
print(f" 完成时间: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
print("="*70 + "\n")
self.logger.info(f"数据同步成功 - 用时 {duration:.1f}")
self.print_stats()
except Exception as e:
# 失败
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
self.stats['failed_runs'] += 1
self.stats['last_error'] = str(e)
print("\n" + "="*70)
print(f"✗ 数据同步失败 - 用时 {duration:.1f}")
print(f" 错误: {e}")
print("="*70 + "\n")
self.logger.error(f"数据同步失败: {e}", exc_info=True)
self.print_stats()
def print_stats(self):
"""打印统计信息"""
print("\n" + "-"*70)
print("运行统计:")
print(f" 总运行次数: {self.stats['total_runs']}")
print(f" 成功次数: {self.stats['successful_runs']}")
print(f" 失败次数: {self.stats['failed_runs']}")
if self.stats['last_run_time']:
print(f" 上次运行: {self.stats['last_run_time'].strftime('%Y-%m-%d %H:%M:%S')}")
if self.stats['last_success_time']:
print(f" 上次成功: {self.stats['last_success_time'].strftime('%Y-%m-%d %H:%M:%S')}")
if self.stats['last_error']:
print(f" 上次错误: {self.stats['last_error']}")
print("-"*70 + "\n")
self.logger.info(f"运行统计: 总{self.stats['total_runs']}次, 成功{self.stats['successful_runs']}次, 失败{self.stats['failed_runs']}")
def get_next_midnight(self) -> datetime:
"""获取下一个午夜时刻"""
now = datetime.now()
tomorrow = now + timedelta(days=1)
next_midnight = tomorrow.replace(hour=0, minute=0, second=0, microsecond=0)
return next_midnight
def run(self):
"""启动守护进程"""
print("\n" + "="*70)
print("守护进程已启动")
print("="*70)
# 设置定时任务每天午夜00:00执行
schedule.every().day.at("00:00").do(self.sync_data)
# 计算下次执行时间
next_run = self.get_next_midnight()
time_until_next = (next_run - datetime.now()).total_seconds()
print(f"\n下次执行时间: {next_run.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"距离下次执行: {time_until_next/3600:.1f} 小时")
print("\n按 Ctrl+C 可以停止守护进程")
print("="*70 + "\n")
self.logger.info(f"守护进程已启动,下次执行时间: {next_run.strftime('%Y-%m-%d %H:%M:%S')}")
try:
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次
except KeyboardInterrupt:
print("\n\n" + "="*70)
print("守护进程已停止")
print("="*70)
self.print_stats()
self.logger.info("守护进程已停止")
def main():
"""主程序入口"""
print("\n" + "="*70)
print("数据同步守护进程配置")
print("="*70)
# 检查是否为非交互模式systemd服务或环境变量
non_interactive = not sys.stdin.isatty() or os.getenv('NON_INTERACTIVE', '').lower() == 'true'
if non_interactive:
# 非交互模式:使用环境变量或默认值
print("\n[检测到非交互模式,使用默认配置]")
print("可通过环境变量配置:")
print(" LOAD_FROM_DB=true/false - Cookie来源")
print(" USE_PROXY=true/false - 是否使用代理")
print(" DAYS=7 - 抓取天数")
print(" MAX_RETRIES=3 - 重试次数")
print(" RUN_NOW=true/false - 是否立即执行\n")
load_from_db = os.getenv('LOAD_FROM_DB', 'true').lower() == 'true'
use_proxy = os.getenv('USE_PROXY', 'true').lower() == 'true'
days = int(os.getenv('DAYS', '7'))
max_retries = int(os.getenv('MAX_RETRIES', '3'))
run_now = os.getenv('RUN_NOW', 'true').lower() == 'true'
else:
# 交互模式:显示菜单
# 配置选项
print("\n请配置守护进程参数:\n")
# 1. Cookie来源
print("1. Cookie来源")
print(" 1) 从本地文件加载 (captured_account_cookies.json)")
print(" 2) 从数据库加载 (推荐)")
source = input("\n 输入选项 (1/2, 默认2): ").strip() or '2'
load_from_db = (source == '2')
# 2. 是否使用代理
print("\n2. 是否使用代理?")
use_proxy_input = input(" (y/n, 默认n): ").strip().lower() or 'n'
use_proxy = (use_proxy_input == 'y')
# 3. 抓取天数
print("\n3. 抓取最近多少天的数据?")
days_input = input(" (默认7天): ").strip() or '7'
try:
days = int(days_input)
except ValueError:
days = 7
# 4. 错误重试次数(默认启用)
print("\n4. 最大重试次数?")
print(" (遇到代理连接错误时自动重试)")
retries_input = input(" (默认3次): ").strip() or '3'
try:
max_retries = int(retries_input)
except ValueError:
max_retries = 3
# 5. 是否立即执行一次
print("\n5. 是否立即执行一次同步?")
print(" (否则等待到午夜00:00执行)")
run_now_input = input(" (y/n, 默认n): ").strip().lower() or 'n'
run_now = (run_now_input == 'y')
# 确认配置
print("\n" + "="*70)
print("配置确认:")
print(f" Cookie来源: {'数据库' if load_from_db else '本地文件'}")
print(f" 使用代理: {'' if use_proxy else ''}")
print(f" 抓取天数: {days}")
print(f" 错误重试: 最大{max_retries}")
print(f" 立即执行: {'' if run_now else ''}")
print(f" 定时执行: 每天午夜00:00")
print("="*70)
confirm = input("\n确认启动守护进程?(y/n): ").strip().lower()
if confirm != 'y':
print("\n已取消")
return
# 创建守护进程
daemon = DataSyncDaemon(use_proxy=use_proxy, load_from_db=load_from_db, days=days, max_retries=max_retries)
# 如果选择立即执行,先执行一次
if run_now:
print("\n立即执行一次同步...")
daemon.sync_data()
# 启动守护进程
daemon.run()
if __name__ == '__main__':
main()