Files
baijiahao_data_crawl/batch_import_history.py

549 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
批量历史数据导入脚本
功能:
1. 按日期范围循环抓取百家号数据
2. 每次抓取后自动导出CSV
3. 自动导入数据库
4. 记录执行日志和错误信息
5. 自动重试机制(针对网络、代理等临时性错误)
使用方法:
# 基本用法
python batch_import_history.py --start 2025-12-01 --end 2025-12-25
# 跳过失败的日期继续执行
python batch_import_history.py --start 2025-12-01 --end 2025-12-25 --skip-failed
# 自定义重试次数默认3次
python batch_import_history.py --start 2025-12-01 --end 2025-12-25 --max-retries 5
# 组合使用
python batch_import_history.py --start 2025-12-01 --end 2025-12-25 --skip-failed --max-retries 5
"""
import sys
import os
import subprocess
import argparse
from datetime import datetime, timedelta
from typing import List, Tuple, Optional
import json
import time
# 设置UTF-8编码
if sys.platform == 'win32':
import io
if not isinstance(sys.stdout, io.TextIOWrapper) or sys.stdout.encoding != 'utf-8':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
if not isinstance(sys.stderr, io.TextIOWrapper) or sys.stderr.encoding != 'utf-8':
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
class BatchImporter:
"""批量历史数据导入器"""
def __init__(self, start_date: str, end_date: str, skip_failed: bool = False, max_retries: int = 3):
"""初始化
Args:
start_date: 开始日期 (YYYY-MM-DD)
end_date: 结束日期 (YYYY-MM-DD)
skip_failed: 是否跳过失败的日期继续执行
max_retries: 每个步骤的最大重试次数默认3
"""
self.script_dir = os.path.dirname(os.path.abspath(__file__))
self.start_date = datetime.strptime(start_date, '%Y-%m-%d')
self.end_date = datetime.strptime(end_date, '%Y-%m-%d')
self.skip_failed = skip_failed
self.max_retries = max_retries
# 脚本路径
self.analytics_script = os.path.join(self.script_dir, 'bjh_analytics_date.py')
self.export_script = os.path.join(self.script_dir, 'export_to_csv.py')
self.import_script = os.path.join(self.script_dir, 'import_csv_to_database.py')
# 日志文件
self.log_dir = os.path.join(self.script_dir, 'logs')
if not os.path.exists(self.log_dir):
os.makedirs(self.log_dir)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
self.log_file = os.path.join(self.log_dir, f'batch_import_{timestamp}.log')
# 执行结果记录
self.results = []
# 验证脚本文件存在
self._validate_scripts()
def _validate_scripts(self):
"""验证所需脚本文件是否存在"""
scripts = {
'bjh_analytics_date.py': self.analytics_script,
'export_to_csv.py': self.export_script,
'import_csv_to_database.py': self.import_script
}
missing_scripts = []
for name, path in scripts.items():
if not os.path.exists(path):
missing_scripts.append(name)
if missing_scripts:
print(f"[X] 缺少必要的脚本文件:")
for script in missing_scripts:
print(f" - {script}")
raise FileNotFoundError("脚本文件缺失")
def log(self, message: str, level: str = 'INFO'):
"""记录日志
Args:
message: 日志消息
level: 日志级别 (INFO, WARNING, ERROR)
"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_line = f"[{timestamp}] [{level}] {message}"
# 输出到控制台
print(log_line)
# 写入日志文件
try:
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(log_line + '\n')
except Exception as e:
print(f"[!] 写入日志文件失败: {e}")
def get_date_list(self) -> List[str]:
"""生成日期列表
Returns:
日期字符串列表 (YYYY-MM-DD)
"""
dates = []
current = self.start_date
while current <= self.end_date:
dates.append(current.strftime('%Y-%m-%d'))
current += timedelta(days=1)
return dates
def run_command_with_retry(self, cmd: List[str], step_name: str, max_retries: Optional[int] = None) -> Tuple[bool, str]:
"""执行命令(带重试机制)
Args:
cmd: 命令列表
step_name: 步骤名称
max_retries: 最大重试次数,默认使用实例配置
Returns:
(是否成功, 错误信息)
"""
if max_retries is None:
max_retries = self.max_retries
retry_count = 0
last_error = ""
while retry_count <= max_retries:
if retry_count > 0:
# 重试前等待递增延迟5秒、10秒、15秒
wait_time = retry_count * 5
self.log(f"{step_name}{retry_count}次重试,等待 {wait_time} 秒...", level='WARNING')
time.sleep(wait_time)
# 执行命令
success, error = self.run_command(cmd, step_name)
if success:
if retry_count > 0:
self.log(f"{step_name} 重试成功!(第{retry_count}次重试)", level='INFO')
return True, ""
# 失败,记录错误
last_error = error
retry_count += 1
# 判断是否需要重试
if retry_count <= max_retries:
# 可重试的错误类型
retryable_errors = [
'超时',
'timeout',
'连接',
'connection',
'代理',
'proxy',
'网络',
'network',
'RemoteDisconnected',
'ConnectionError',
'ProxyError'
]
# 检查错误信息是否包含可重试的关键词
is_retryable = any(keyword in str(error).lower() for keyword in retryable_errors)
if is_retryable:
self.log(f"{step_name} 出现可重试错误: {error}", level='WARNING')
else:
# 不可重试的错误,直接失败
self.log(f"{step_name} 出现不可重试错误,停止重试: {error}", level='ERROR')
return False, error
# 所有重试失败
self.log(f"{step_name} 失败,已达最大重试次数 ({max_retries})", level='ERROR')
return False, last_error
def run_command(self, cmd: List[str], step_name: str) -> Tuple[bool, str]:
"""执行命令
Args:
cmd: 命令列表
step_name: 步骤名称
Returns:
(是否成功, 错误信息)
"""
process = None
try:
self.log(f"执行命令: {' '.join(cmd)}")
# 使用subprocess运行命令实时输出
process = subprocess.Popen(
cmd,
cwd=self.script_dir,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # 合并stderr到stdout
text=True,
encoding='utf-8',
bufsize=1, # 行缓冲
universal_newlines=True
)
# 实时读取输出
output_lines = []
if process.stdout:
try:
for line in process.stdout:
line = line.rstrip()
if line: # 只输出非空行
print(f" {line}") # 实时输出到控制台
output_lines.append(line)
# 每10行记录一次日志减少日志文件大小
if len(output_lines) % 10 == 0:
self.log(f"{step_name} 运行中... (已输出{len(output_lines)}行)")
except Exception as e:
self.log(f"读取输出异常: {e}", level='WARNING')
# 等待进程结束
return_code = process.wait(timeout=600) # 10分钟超时
# 记录完整输出
full_output = '\n'.join(output_lines)
if full_output:
self.log(f"{step_name} 输出:\n{full_output}")
# 检查返回码
if return_code == 0:
self.log(f"[✓] {step_name} 执行成功", level='INFO')
return True, ""
else:
error_msg = f"返回码: {return_code}"
self.log(f"[X] {step_name} 执行失败: {error_msg}", level='ERROR')
return False, error_msg
except subprocess.TimeoutExpired:
if process:
process.kill()
error_msg = "命令执行超时(>10分钟"
self.log(f"[X] {step_name} 失败: {error_msg}", level='ERROR')
return False, error_msg
except Exception as e:
error_msg = str(e)
self.log(f"[X] {step_name} 异常: {error_msg}", level='ERROR')
import traceback
self.log(f"异常堆栈:\n{traceback.format_exc()}", level='ERROR')
return False, error_msg
def process_date(self, date_str: str) -> bool:
"""处理单个日期的数据
Args:
date_str: 日期字符串 (YYYY-MM-DD)
Returns:
是否成功
"""
self.log("="*70)
self.log(f"开始处理日期: {date_str}")
self.log("="*70)
result = {
'date': date_str,
'start_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'steps': {},
'success': False,
'error': None
}
# 步骤1: 数据抓取(带重试)
self.log(f"\n[步骤 1/3] 抓取 {date_str} 的数据...")
cmd_analytics = [
sys.executable,
self.analytics_script,
date_str,
'--proxy',
'--database',
'--no-confirm' # 跳过确认提示
]
success, error = self.run_command_with_retry(cmd_analytics, f"数据抓取 ({date_str})")
result['steps']['analytics'] = {'success': success, 'error': error}
if not success:
result['error'] = f"数据抓取失败: {error}"
result['end_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.results.append(result)
return False
# 等待2秒确保文件写入完成
time.sleep(2)
# 步骤2: 导出CSV带重试
self.log(f"\n[步骤 2/3] 导出CSV文件...")
cmd_export = [
sys.executable,
self.export_script,
'--mode', 'csv',
'--no-confirm' # 跳过确认提示
]
success, error = self.run_command_with_retry(cmd_export, f"CSV导出 ({date_str})")
result['steps']['export'] = {'success': success, 'error': error}
if not success:
result['error'] = f"CSV导出失败: {error}"
result['end_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.results.append(result)
return False
# 等待2秒
time.sleep(2)
# 步骤3: 导入数据库(带重试)
self.log(f"\n[步骤 3/3] 导入数据库...")
cmd_import = [
sys.executable,
self.import_script
]
success, error = self.run_command_with_retry(cmd_import, f"数据库导入 ({date_str})")
result['steps']['import'] = {'success': success, 'error': error}
if not success:
result['error'] = f"数据库导入失败: {error}"
result['end_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.results.append(result)
return False
# 全部成功
result['success'] = True
result['end_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.results.append(result)
self.log(f"\n[✓] {date_str} 处理完成!")
self.log("="*70 + "\n")
return True
def run(self):
"""执行批量导入"""
dates = self.get_date_list()
print("\n" + "="*70)
print("批量历史数据导入")
print("="*70)
print(f"开始日期: {self.start_date.strftime('%Y-%m-%d')}")
print(f"结束日期: {self.end_date.strftime('%Y-%m-%d')}")
print(f"总天数: {len(dates)}")
print(f"跳过失败: {'' if self.skip_failed else ''}")
print(f"最大重试次数: {self.max_retries}")
print(f"日志文件: {self.log_file}")
print("="*70)
# 确认执行
confirm = input("\n是否开始执行? (y/n): ").strip().lower()
if confirm != 'y':
print("已取消")
return
self.log(f"开始批量导入: {len(dates)} 个日期")
start_time = datetime.now()
success_count = 0
failed_count = 0
for idx, date_str in enumerate(dates, 1):
print(f"\n{'='*70}")
print(f"进度: [{idx}/{len(dates)}] {date_str}")
print(f"{'='*70}")
success = self.process_date(date_str)
if success:
success_count += 1
else:
failed_count += 1
# 如果不跳过失败,则停止执行
if not self.skip_failed:
self.log(f"[X] 日期 {date_str} 处理失败,停止执行", level='ERROR')
break
else:
self.log(f"[!] 日期 {date_str} 处理失败,跳过继续", level='WARNING')
# 日期间延迟(避免请求过快)
if idx < len(dates):
delay = 5
self.log(f"等待 {delay} 秒后处理下一个日期...")
time.sleep(delay)
# 执行完成
end_time = datetime.now()
duration = end_time - start_time
print("\n" + "="*70)
print("批量导入完成")
print("="*70)
print(f"总耗时: {duration}")
print(f"成功: {success_count}")
print(f"失败: {failed_count}")
print(f"日志文件: {self.log_file}")
print("="*70)
self.log("="*70)
self.log(f"批量导入完成: 成功 {success_count} 天, 失败 {failed_count}")
self.log(f"总耗时: {duration}")
self.log("="*70)
# 保存执行结果
self._save_results()
# 显示失败的日期
if failed_count > 0:
print("\n失败的日期:")
for r in self.results:
if not r['success']:
print(f" - {r['date']}: {r.get('error', '未知错误')}")
def _save_results(self):
"""保存执行结果到JSON文件"""
try:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
result_file = os.path.join(self.log_dir, f'batch_result_{timestamp}.json')
summary = {
'start_date': self.start_date.strftime('%Y-%m-%d'),
'end_date': self.end_date.strftime('%Y-%m-%d'),
'total_dates': len(self.results),
'success_count': sum(1 for r in self.results if r['success']),
'failed_count': sum(1 for r in self.results if not r['success']),
'results': self.results
}
with open(result_file, 'w', encoding='utf-8') as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
self.log(f"执行结果已保存: {result_file}")
except Exception as e:
self.log(f"保存执行结果失败: {e}", level='ERROR')
def main():
"""主函数"""
parser = argparse.ArgumentParser(
description='批量历史数据导入脚本',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例用法:
python batch_import_history.py --start 2025-12-01 --end 2025-12-25
python batch_import_history.py --start 2025-12-01 --end 2025-12-25 --skip-failed
"""
)
parser.add_argument(
'--start',
type=str,
required=True,
help='开始日期 (格式: YYYY-MM-DD)'
)
parser.add_argument(
'--end',
type=str,
required=True,
help='结束日期 (格式: YYYY-MM-DD)'
)
parser.add_argument(
'--skip-failed',
action='store_true',
help='跳过失败的日期继续执行(默认:遇到失败停止)'
)
parser.add_argument(
'--max-retries',
type=int,
default=3,
help='每个步骤的最大重试次数默认3'
)
args = parser.parse_args()
# 验证日期格式
try:
start = datetime.strptime(args.start, '%Y-%m-%d')
end = datetime.strptime(args.end, '%Y-%m-%d')
if start > end:
print("[X] 开始日期不能晚于结束日期")
return 1
except ValueError as e:
print(f"[X] 日期格式错误: {e}")
print(" 正确格式: YYYY-MM-DD (例如: 2025-12-01)")
return 1
try:
# 创建导入器
importer = BatchImporter(
start_date=args.start,
end_date=args.end,
skip_failed=args.skip_failed,
max_retries=args.max_retries
)
# 执行批量导入
importer.run()
return 0
except Exception as e:
print(f"\n[X] 程序执行出错: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == '__main__':
sys.exit(main())