Files
baijiahao_data_crawl/data_sync_daemon.py

680 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
数据同步守护进程
功能:
1. 24小时不间断运行仅在工作时间8:00-24:00执行任务
2. 每隔1小时自动执行数据抓取和同步
3. 自动执行流程:
- 从百家号API抓取最新数据
- 生成CSV文件包含从数据库查询的author_id
- 将CSV数据导入到数据库
4. 支持手动触发刷新
5. 详细的日志记录
6. 非工作时间0:00-8:00自动休眠减少API请求压力
使用场景:
- 24/7运行在工作时间8:00-24:00每隔1小时自动更新数据
- 无需人工干预,自动化数据同步
- 避免在夜间时段进行数据抓取
"""
import sys
import os
import time
import schedule
from datetime import datetime, timedelta
from typing import Optional
# 设置UTF-8编码
if sys.platform == 'win32':
import io
if not isinstance(sys.stdout, io.TextIOWrapper) or sys.stdout.encoding != 'utf-8':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
if not isinstance(sys.stderr, io.TextIOWrapper) or sys.stderr.encoding != 'utf-8':
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
from bjh_analytics import BaijiahaoAnalytics
from export_to_csv import DataExporter
from import_csv_to_database import CSVImporter
from log_config import setup_logger
# 导入数据验证与短信告警模块
try:
from data_validation_with_sms import DataValidationWithSMS
VALIDATION_AVAILABLE = True
except ImportError:
print("[!] 数据验证模块未找到,验证功能将不可用")
VALIDATION_AVAILABLE = False
class DataSyncDaemon:
"""数据同步守护进程"""
def __init__(self, use_proxy: bool = False, load_from_db: bool = True, days: int = 7, max_retries: int = 3, enable_validation: bool = True):
"""初始化守护进程
Args:
use_proxy: 是否使用代理
load_from_db: 是否从数据库加载Cookie
days: 抓取最近多少天的数据
max_retries: 最大重试次数
enable_validation: 是否启用数据验证与短信告警
"""
self.script_dir = os.path.dirname(os.path.abspath(__file__))
self.use_proxy = use_proxy
self.load_from_db = load_from_db
self.days = days
self.max_retries = max_retries
self.enable_validation = enable_validation and VALIDATION_AVAILABLE
# 工作时间配置8:00-24:00
self.work_start_hour = 8
self.work_end_hour = 24
# 初始化日志
self.logger = setup_logger('data_sync_daemon', os.path.join(self.script_dir, 'logs', 'data_sync_daemon.log'))
# 创建验证报告目录
self.validation_reports_dir = os.path.join(self.script_dir, 'validation_reports')
if not os.path.exists(self.validation_reports_dir):
os.makedirs(self.validation_reports_dir)
self.logger.info(f"创建验证报告目录: {self.validation_reports_dir}")
# 统计信息
self.stats = {
'total_runs': 0,
'successful_runs': 0,
'failed_runs': 0,
'last_run_time': None,
'last_success_time': None,
'last_error': None
}
print("\n" + "="*70)
print("数据同步守护进程")
print("="*70)
print(f" 使用代理: {'' if use_proxy else ''}")
print(f" Cookie来源: {'数据库' if load_from_db else '本地文件'}")
print(f" 抓取天数: {days}")
print(f" 工作时间: {self.work_start_hour}:00 - {self.work_end_hour}:00")
print(f" 错误重试: 最大{max_retries}")
print(f" 定时执行: 每隔1小时")
print(f" 数据验证: {'已启用' if self.enable_validation else '已禁用'}")
if self.enable_validation:
print(f" 短信告警: 验证失败时发送 (错误代码2222)")
print("="*70 + "\n")
self.logger.info("="*70)
self.logger.info("数据同步守护进程启动")
self.logger.info(f"使用代理: {use_proxy}, Cookie来源: {'数据库' if load_from_db else '本地文件'}, 抓取天数: {days}, 工作时间: {self.work_start_hour}:00-{self.work_end_hour}:00, 重试: {max_retries}次, 验证: {'已启用' if self.enable_validation else '已禁用'}")
self.logger.info("="*70)
def fetch_data(self) -> bool:
"""步骤1抓取数据"""
print("\n" + "="*70)
print("【步骤1/3】抓取数据")
print("="*70)
try:
# 初始化分析器
analytics = BaijiahaoAnalytics(use_proxy=self.use_proxy, load_from_db=self.load_from_db)
if not analytics.account_cookies:
print("[X] 未找到可用的账号Cookie跳过本次同步")
self.logger.error("未找到可用的账号Cookie")
return False
account_ids = list(analytics.account_cookies.keys())
print(f"\n找到 {len(account_ids)} 个账号")
self.logger.info(f"找到 {len(account_ids)} 个账号")
# 抓取所有账号数据
print("\n开始抓取数据...")
print(f" 错误重试策略:遇到代理连接错误时自动重试,最大{self.max_retries}\n")
self.logger.info(f"开始抓取数据,错误重试: 最大{self.max_retries}")
success_count = 0
failed_accounts = []
partial_success_accounts = []
results = []
for i, account_id in enumerate(account_ids, 1):
print(f"\n[{i}/{len(account_ids)}] 抓取账号: {account_id}")
print("-"*70)
# 重试机制
retry_count = 0
result = None
last_error = None
while retry_count <= self.max_retries:
try:
# 如果是重试,先等待一段时间
if retry_count > 0:
wait_time = retry_count * 2 # 递增等待时间2秒、4秒、6秒
print(f" [重试 {retry_count}/{self.max_retries}] 等待 {wait_time} 秒后重试...")
self.logger.info(f"账号 {account_id}{retry_count}次重试,等待{wait_time}")
time.sleep(wait_time)
print(f" [重试 {retry_count}/{self.max_retries}] 开始重新抓取...")
result = analytics.extract_integrated_data(account_id, days=self.days)
if result:
results.append(result)
status = result.get('status', '')
if status == 'success_all':
success_count += 1
print(f" [✓] 抓取成功")
self.logger.info(f"账号 {account_id} 抓取成功 (完全成功)")
break # 成功,跳出重试循环
elif 'success' in status:
# 部分成功
partial_success_accounts.append(account_id)
print(f" [!] 部分成功: {status}")
self.logger.warning(f"账号 {account_id} 部分成功: {status}")
break # 部分成功也跳出重试循环
else:
# 失败
last_error = f"状态: {status}"
if retry_count >= self.max_retries:
failed_accounts.append(account_id)
print(f" [X] 抓取失败: {status}")
self.logger.error(f"账号 {account_id} 抓取失败: {status}")
break
else:
retry_count += 1
continue
else:
# 返回None表示完全失败
last_error = "返回结果为空"
if retry_count >= self.max_retries:
failed_accounts.append(account_id)
print(f" [X] 抓取失败: 无返回结果")
self.logger.error(f"账号 {account_id} 抓取失败: 无返回结果")
break
else:
retry_count += 1
continue
except Exception as e:
last_error = str(e)
error_type = type(e).__name__
# 判断是否是代理相关错误
is_proxy_error = any([
'Connection' in error_type,
'RemoteDisconnected' in error_type,
'ProxyError' in error_type,
'Timeout' in error_type,
'ConnectionError' in str(e),
'Connection aborted' in str(e),
'Remote end closed' in str(e),
])
if is_proxy_error:
print(f" [!] 代理连接错误: {error_type}")
self.logger.warning(f"账号 {account_id} 代理连接错误: {error_type} - {last_error}")
else:
print(f" [!] 发生异常: {error_type} - {e}")
self.logger.error(f"账号 {account_id} 异常: {error_type} - {e}", exc_info=True)
if retry_count >= self.max_retries:
failed_accounts.append(account_id)
print(f" [X] 抓取失败(已达最大重试次数)")
self.logger.error(f"账号 {account_id} 最终失败,已达最大重试次数")
break
else:
retry_count += 1
continue
# 保存到文件
if results:
analytics.save_results(results)
self.logger.info(f"数据已保存到 bjh_integrated_data.json{len(results)} 个账号")
print(f"\n数据抓取完成:")
print(f" 完全成功: {success_count}/{len(account_ids)}")
self.logger.info(f"数据抓取完成: 完全成功 {success_count}/{len(account_ids)}")
if partial_success_accounts:
print(f" 部分成功: {len(partial_success_accounts)} 个账号")
print(f" 账号列表: {', '.join(partial_success_accounts)}")
self.logger.info(f"部分成功: {len(partial_success_accounts)} 个账号 - {', '.join(partial_success_accounts)}")
if failed_accounts:
print(f" 完全失败: {len(failed_accounts)} 个账号")
print(f" 账号列表: {', '.join(failed_accounts)}")
self.logger.error(f"完全失败: {len(failed_accounts)} 个账号 - {', '.join(failed_accounts)}")
# 只要有成功或部分成功的数据,就继续执行
if results:
return True
else:
print("\n[X] 没有成功抓取任何数据")
self.logger.error("没有成功抓取任何数据")
return False
except Exception as e:
print(f"\n[X] 数据抓取失败: {e}")
self.logger.error(f"数据抓取失败: {e}", exc_info=True)
return False
def generate_csv(self) -> bool:
"""步骤2生成CSV文件"""
print("\n" + "="*70)
print("【步骤2/3】生成CSV文件")
print("="*70)
try:
# 初始化导出器CSV模式连接数据库查询author_id
exporter = DataExporter(use_database=False)
print("\n生成三个CSV文件...")
print(" 注意author_id 将从 ai_authors 表中查询")
# 执行CSV导出
success = exporter.export_all_tables()
if success:
print("\n[OK] CSV文件生成成功")
self.logger.info("CSV文件生成成功")
return True
else:
print("\n[X] CSV文件生成失败")
self.logger.error("CSV文件生成失败")
return False
except Exception as e:
print(f"\n[X] CSV生成失败: {e}")
self.logger.error(f"CSV生成失败: {e}", exc_info=True)
return False
def import_to_database(self) -> bool:
"""步骤3导入数据库"""
print("\n" + "="*70)
print("【步骤3/3】导入数据库")
print("="*70)
try:
# 创建导入器
importer = CSVImporter()
print("\n将CSV数据导入到数据库...")
# 导入所有CSV文件
success = importer.import_all()
if success:
print("\n[OK] 数据库导入成功")
self.logger.info("数据库导入成功")
return True
else:
print("\n[X] 数据库导入失败")
self.logger.error("数据库导入失败")
return False
except Exception as e:
print(f"\n[X] 数据库导入失败: {e}")
self.logger.error(f"数据库导入失败: {e}", exc_info=True)
return False
def validate_data(self) -> bool:
"""步顷4数据验证与短信告警"""
if not self.enable_validation:
print("\n[跳过] 数据验证功能未启用")
self.logger.info("跳过数据验证(功能未启用)")
return True
print("\n" + "="*70)
print("【步顷4/4】数据验证与短信告警")
print("="*70)
try:
# 等待3秒确保数据库写入完成
print("\n等待3秒确保数据写入完成...")
self.logger.info("等待3秒以确保数据库写入完成")
time.sleep(3)
print("\n执行数据验证...")
self.logger.info("开始执行数据验证")
# 创建验证器(验证昨天的数据)
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
validator = DataValidationWithSMS(date_str=yesterday)
# 执行验证JSON + CSV + Database
passed = validator.run_validation(
sources=['json', 'csv', 'database'],
table='ai_statistics'
)
# 生成验证报告
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
report_file = os.path.join(
self.validation_reports_dir,
f'validation_report_{timestamp}.txt'
)
validator.validator.generate_report(report_file)
if passed:
print("\n[✓] 数据验证通过")
self.logger.info("数据验证通过")
return True
else:
print("\n[X] 数据验证失败,准备发送短信告警")
self.logger.error("数据验证失败")
# 生成错误摘要
error_summary = validator.generate_error_summary()
self.logger.error(f"错误摘要: {error_summary}")
# 发送短信告警错误代码2222
sms_sent = validator.send_sms_alert("2222", error_summary)
if sms_sent:
print("[✓] 告警短信已发送")
self.logger.info("告警短信发送成功")
else:
print("[X] 告警短信发送失败")
self.logger.error("告警短信发送失败")
print(f"\n详细报告: {report_file}")
# 验证失败不阻止后续流程但返回True表示步骤完成
return True
except Exception as e:
print(f"\n[X] 数据验证异常: {e}")
self.logger.error(f"数据验证异常: {e}", exc_info=True)
# 验证异常不影响整体流程
return True
def sync_data(self):
"""执行完整的数据同步流程"""
start_time = datetime.now()
self.stats['total_runs'] += 1
self.stats['last_run_time'] = start_time
print("\n" + "="*70)
print(f"开始数据同步 - {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
print("="*70)
self.logger.info("="*70)
self.logger.info(f"开始数据同步 - {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
self.logger.info("="*70)
try:
# 步骤1抓取数据
if not self.fetch_data():
raise Exception("数据抓取失败")
# 步骤2生成CSV
if not self.generate_csv():
raise Exception("CSV生成失败")
# 步顷3导入数据库
if not self.import_to_database():
raise Exception("数据库导入失败")
# 步顷4数据验证与短信告警
if not self.validate_data():
# 验证失败不阻止整体流程,只记录警告
self.logger.warning("数据验证步骤未成功完成")
# 成功
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
self.stats['successful_runs'] += 1
self.stats['last_success_time'] = end_time
self.stats['last_error'] = None
print("\n" + "="*70)
print(f"✓ 数据同步完成 - 用时 {duration:.1f}")
print(f" 完成时间: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
print("="*70 + "\n")
self.logger.info(f"数据同步成功 - 用时 {duration:.1f}")
self.print_stats()
except Exception as e:
# 失败
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
self.stats['failed_runs'] += 1
self.stats['last_error'] = str(e)
print("\n" + "="*70)
print(f"✗ 数据同步失败 - 用时 {duration:.1f}")
print(f" 错误: {e}")
print("="*70 + "\n")
self.logger.error(f"数据同步失败: {e}", exc_info=True)
self.print_stats()
def print_stats(self):
"""打印统计信息"""
print("\n" + "-"*70)
print("运行统计:")
print(f" 总运行次数: {self.stats['total_runs']}")
print(f" 成功次数: {self.stats['successful_runs']}")
print(f" 失败次数: {self.stats['failed_runs']}")
if self.stats['last_run_time']:
print(f" 上次运行: {self.stats['last_run_time'].strftime('%Y-%m-%d %H:%M:%S')}")
if self.stats['last_success_time']:
print(f" 上次成功: {self.stats['last_success_time'].strftime('%Y-%m-%d %H:%M:%S')}")
if self.stats['last_error']:
print(f" 上次错误: {self.stats['last_error']}")
print("-"*70 + "\n")
self.logger.info(f"运行统计: 总{self.stats['total_runs']}次, 成功{self.stats['successful_runs']}次, 失败{self.stats['failed_runs']}")
def is_work_time(self) -> tuple:
"""
检查当前是否在工作时间内8:00-24:00
Returns:
tuple: (是否在工作时间内, 距离下次工作时间的秒数)
"""
now = datetime.now()
current_hour = now.hour
# 在工作时间内8:00-23:59
if self.work_start_hour <= current_hour < self.work_end_hour:
return True, 0
# 不在工作时间内,计算到下个工作时间的秒数
if current_hour < self.work_start_hour:
# 今天还没到工作时间
next_work_time = now.replace(hour=self.work_start_hour, minute=0, second=0, microsecond=0)
else:
# 今天已过工作时间,等待明天
next_work_time = (now + timedelta(days=1)).replace(hour=self.work_start_hour, minute=0, second=0, microsecond=0)
seconds_until_work = (next_work_time - now).total_seconds()
return False, seconds_until_work
def get_next_run_time(self) -> datetime:
"""获取下一次执行时间1小时后"""
now = datetime.now()
next_run = now + timedelta(hours=1)
return next_run
def run(self):
"""启动守护进程"""
print("\n" + "="*70)
print("守护进程已启动")
print("="*70)
# 设置定时任务每隔1小时执行
schedule.every(1).hours.do(self.sync_data)
# 计算下次执行时间
next_run = self.get_next_run_time()
time_until_next = (next_run - datetime.now()).total_seconds()
print(f"\n执行间隔: 每隔1小时")
print(f"工作时间: {self.work_start_hour}:00 - {self.work_end_hour}:00非工作时间自动休眠")
print(f"下次执行时间: {next_run.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"距离下次执行: {time_until_next/60:.1f} 分钟")
print("\n按 Ctrl+C 可以停止守护进程")
print("="*70 + "\n")
self.logger.info(f"守护进程已启动,执行间隔: 每隔1小时工作时间: {self.work_start_hour}:00-{self.work_end_hour}:00下次执行时间: {next_run.strftime('%Y-%m-%d %H:%M:%S')}")
try:
while True:
# 检查是否在工作时间内
is_work, seconds_until_work = self.is_work_time()
if not is_work:
# 不在工作时间内,等待至工作时间
next_work_time = datetime.now() + timedelta(seconds=seconds_until_work)
self.logger.info(f"当前非工作时间,等待至 {next_work_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"\n[休眠] 当前不在工作时间内({self.work_start_hour}:00-{self.work_end_hour}:00")
print(f"[休眠] 下次工作时间: {next_work_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"[休眠] 等待 {seconds_until_work/3600:.1f} 小时...")
# 每30分钟检查一次
check_interval = 1800
elapsed = 0
while elapsed < seconds_until_work:
sleep_time = min(check_interval, seconds_until_work - elapsed)
time.sleep(sleep_time)
elapsed += sleep_time
remaining = seconds_until_work - elapsed
if remaining > 0:
print(f" 距离工作时间还有: {remaining/3600:.1f} 小时 ({datetime.now().strftime('%H:%M:%S')})")
continue
# 在工作时间内,执行定时任务
schedule.run_pending()
time.sleep(60) # 每分钟检查一次
except KeyboardInterrupt:
print("\n\n" + "="*70)
print("守护进程已停止")
print("="*70)
self.print_stats()
self.logger.info("守护进程已停止")
def main():
"""主程序入口"""
print("\n" + "="*70)
print("数据同步守护进程配置")
print("="*70)
# 检查是否为非交互模式systemd服务或环境变量
non_interactive = not sys.stdin.isatty() or os.getenv('NON_INTERACTIVE', '').lower() == 'true'
if non_interactive:
# 非交互模式:使用环境变量或默认值
print("\n[检测到非交互模式,使用默认配置]")
print("可通过环境变量配置:")
print(" LOAD_FROM_DB=true/false - Cookie来源")
print(" USE_PROXY=true/false - 是否使用代理")
print(" DAYS=7 - 抓取天数")
print(" MAX_RETRIES=3 - 重试次数")
print(" RUN_NOW=true/false - 是否立即执行")
print(" ENABLE_VALIDATION=true/false - 是否启用验证\n")
load_from_db = os.getenv('LOAD_FROM_DB', 'true').lower() == 'true'
use_proxy = os.getenv('USE_PROXY', 'true').lower() == 'true'
days = int(os.getenv('DAYS', '7'))
max_retries = int(os.getenv('MAX_RETRIES', '3'))
run_now = os.getenv('RUN_NOW', 'true').lower() == 'true'
enable_validation = os.getenv('ENABLE_VALIDATION', 'true').lower() == 'true'
else:
# 交互模式:显示菜单
# 配置选项
print("\n请配置守护进程参数:\n")
# 1. Cookie来源
print("1. Cookie来源")
print(" 1) 从本地文件加载 (captured_account_cookies.json)")
print(" 2) 从数据库加载 (推荐)")
source = input("\n 输入选项 (1/2, 默认2): ").strip() or '2'
load_from_db = (source == '2')
# 2. 是否使用代理
print("\n2. 是否使用代理?")
use_proxy_input = input(" (y/n, 默认n): ").strip().lower() or 'n'
use_proxy = (use_proxy_input == 'y')
# 3. 抓取天数
print("\n3. 抓取最近多少天的数据?")
days_input = input(" (默认7天): ").strip() or '7'
try:
days = int(days_input)
except ValueError:
days = 7
# 4. 错误重试次数(默认启用)
print("\n4. 最大重试次数?")
print(" (遇到代理连接错误时自动重试)")
retries_input = input(" (默认3次): ").strip() or '3'
try:
max_retries = int(retries_input)
except ValueError:
max_retries = 3
# 5. 是否启用数据验证
print("\n5. 是否启用数据验证与短信告警?")
print(" (每次同步后自动验证数据失败时发送短信2222)")
enable_validation_input = input(" (y/n, 默认y): ").strip().lower() or 'y'
enable_validation = (enable_validation_input == 'y')
# 6. 是否立即执行一次
print("\n6. 是否立即执行一次同步?")
print(" (否则等待到下一个整点小时执行)")
run_now_input = input(" (y/n, 默认n): ").strip().lower() or 'n'
run_now = (run_now_input == 'y')
# 确认配置
print("\n" + "="*70)
print("配置确认:")
print(f" Cookie来源: {'数据库' if load_from_db else '本地文件'}")
print(f" 使用代理: {'' if use_proxy else ''}")
print(f" 抓取天数: {days}")
print(f" 工作时间: 8:00 - 24:00非工作时间自动休眠")
print(f" 错误重试: 最大{max_retries}")
print(f" 数据验证: {'已启用' if enable_validation else '已禁用'}")
if enable_validation:
print(f" 短信告警: 验证失败时发送 (错误代码2222)")
print(f" 立即执行: {'' if run_now else ''}")
print(f" 定时执行: 每隔1小时")
print("="*70)
confirm = input("\n确认启动守护进程?(y/n): ").strip().lower()
if confirm != 'y':
print("\n已取消")
return
# 创建守护进程
daemon = DataSyncDaemon(
use_proxy=use_proxy,
load_from_db=load_from_db,
days=days,
max_retries=max_retries,
enable_validation=enable_validation
)
# 如果选择立即执行,先执行一次
if run_now:
print("\n立即执行一次同步...")
daemon.sync_data()
# 启动守护进程
daemon.run()
if __name__ == '__main__':
main()