Files
baijiahao_data_crawl/batch_import_history.py

549 lines
18 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
批量历史数据导入脚本
功能
1. 按日期范围循环抓取百家号数据
2. 每次抓取后自动导出CSV
3. 自动导入数据库
4. 记录执行日志和错误信息
5. 自动重试机制针对网络代理等临时性错误
使用方法
# 基本用法
python batch_import_history.py --start 2025-12-01 --end 2025-12-25
# 跳过失败的日期继续执行
python batch_import_history.py --start 2025-12-01 --end 2025-12-25 --skip-failed
# 自定义重试次数默认3次
python batch_import_history.py --start 2025-12-01 --end 2025-12-25 --max-retries 5
# 组合使用
python batch_import_history.py --start 2025-12-01 --end 2025-12-25 --skip-failed --max-retries 5
"""
import sys
import os
import subprocess
import argparse
from datetime import datetime, timedelta
from typing import List, Tuple, Optional
import json
import time
# 设置UTF-8编码
if sys.platform == 'win32':
import io
if not isinstance(sys.stdout, io.TextIOWrapper) or sys.stdout.encoding != 'utf-8':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
if not isinstance(sys.stderr, io.TextIOWrapper) or sys.stderr.encoding != 'utf-8':
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
class BatchImporter:
"""批量历史数据导入器"""
def __init__(self, start_date: str, end_date: str, skip_failed: bool = False, max_retries: int = 3):
"""初始化
Args:
start_date: 开始日期 (YYYY-MM-DD)
end_date: 结束日期 (YYYY-MM-DD)
skip_failed: 是否跳过失败的日期继续执行
max_retries: 每个步骤的最大重试次数默认3
"""
self.script_dir = os.path.dirname(os.path.abspath(__file__))
self.start_date = datetime.strptime(start_date, '%Y-%m-%d')
self.end_date = datetime.strptime(end_date, '%Y-%m-%d')
self.skip_failed = skip_failed
self.max_retries = max_retries
# 脚本路径
self.analytics_script = os.path.join(self.script_dir, 'bjh_analytics_date.py')
self.export_script = os.path.join(self.script_dir, 'export_to_csv.py')
self.import_script = os.path.join(self.script_dir, 'import_csv_to_database.py')
# 日志文件
self.log_dir = os.path.join(self.script_dir, 'logs')
if not os.path.exists(self.log_dir):
os.makedirs(self.log_dir)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
self.log_file = os.path.join(self.log_dir, f'batch_import_{timestamp}.log')
# 执行结果记录
self.results = []
# 验证脚本文件存在
self._validate_scripts()
def _validate_scripts(self):
"""验证所需脚本文件是否存在"""
scripts = {
'bjh_analytics_date.py': self.analytics_script,
'export_to_csv.py': self.export_script,
'import_csv_to_database.py': self.import_script
}
missing_scripts = []
for name, path in scripts.items():
if not os.path.exists(path):
missing_scripts.append(name)
if missing_scripts:
print(f"[X] 缺少必要的脚本文件:")
for script in missing_scripts:
print(f" - {script}")
raise FileNotFoundError("脚本文件缺失")
def log(self, message: str, level: str = 'INFO'):
"""记录日志
Args:
message: 日志消息
level: 日志级别 (INFO, WARNING, ERROR)
"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_line = f"[{timestamp}] [{level}] {message}"
# 输出到控制台
print(log_line)
# 写入日志文件
try:
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(log_line + '\n')
except Exception as e:
print(f"[!] 写入日志文件失败: {e}")
def get_date_list(self) -> List[str]:
"""生成日期列表
Returns:
日期字符串列表 (YYYY-MM-DD)
"""
dates = []
current = self.start_date
while current <= self.end_date:
dates.append(current.strftime('%Y-%m-%d'))
current += timedelta(days=1)
return dates
def run_command_with_retry(self, cmd: List[str], step_name: str, max_retries: Optional[int] = None) -> Tuple[bool, str]:
"""执行命令(带重试机制)
Args:
cmd: 命令列表
step_name: 步骤名称
max_retries: 最大重试次数默认使用实例配置
Returns:
(是否成功, 错误信息)
"""
if max_retries is None:
max_retries = self.max_retries
retry_count = 0
last_error = ""
while retry_count <= max_retries:
if retry_count > 0:
# 重试前等待递增延迟5秒、10秒、15秒
wait_time = retry_count * 5
self.log(f"{step_name}{retry_count}次重试,等待 {wait_time} 秒...", level='WARNING')
time.sleep(wait_time)
# 执行命令
success, error = self.run_command(cmd, step_name)
if success:
if retry_count > 0:
self.log(f"{step_name} 重试成功!(第{retry_count}次重试)", level='INFO')
return True, ""
# 失败,记录错误
last_error = error
retry_count += 1
# 判断是否需要重试
if retry_count <= max_retries:
# 可重试的错误类型
retryable_errors = [
'超时',
'timeout',
'连接',
'connection',
'代理',
'proxy',
'网络',
'network',
'RemoteDisconnected',
'ConnectionError',
'ProxyError'
]
# 检查错误信息是否包含可重试的关键词
is_retryable = any(keyword in str(error).lower() for keyword in retryable_errors)
if is_retryable:
self.log(f"{step_name} 出现可重试错误: {error}", level='WARNING')
else:
# 不可重试的错误,直接失败
self.log(f"{step_name} 出现不可重试错误,停止重试: {error}", level='ERROR')
return False, error
# 所有重试失败
self.log(f"{step_name} 失败,已达最大重试次数 ({max_retries})", level='ERROR')
return False, last_error
def run_command(self, cmd: List[str], step_name: str) -> Tuple[bool, str]:
"""执行命令
Args:
cmd: 命令列表
step_name: 步骤名称
Returns:
(是否成功, 错误信息)
"""
process = None
try:
self.log(f"执行命令: {' '.join(cmd)}")
# 使用subprocess运行命令实时输出
process = subprocess.Popen(
cmd,
cwd=self.script_dir,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # 合并stderr到stdout
text=True,
encoding='utf-8',
bufsize=1, # 行缓冲
universal_newlines=True
)
# 实时读取输出
output_lines = []
if process.stdout:
try:
for line in process.stdout:
line = line.rstrip()
if line: # 只输出非空行
print(f" {line}") # 实时输出到控制台
output_lines.append(line)
# 每10行记录一次日志减少日志文件大小
if len(output_lines) % 10 == 0:
self.log(f"{step_name} 运行中... (已输出{len(output_lines)}行)")
except Exception as e:
self.log(f"读取输出异常: {e}", level='WARNING')
# 等待进程结束
return_code = process.wait(timeout=600) # 10分钟超时
# 记录完整输出
full_output = '\n'.join(output_lines)
if full_output:
self.log(f"{step_name} 输出:\n{full_output}")
# 检查返回码
if return_code == 0:
self.log(f"[✓] {step_name} 执行成功", level='INFO')
return True, ""
else:
error_msg = f"返回码: {return_code}"
self.log(f"[X] {step_name} 执行失败: {error_msg}", level='ERROR')
return False, error_msg
except subprocess.TimeoutExpired:
if process:
process.kill()
error_msg = "命令执行超时(>10分钟"
self.log(f"[X] {step_name} 失败: {error_msg}", level='ERROR')
return False, error_msg
except Exception as e:
error_msg = str(e)
self.log(f"[X] {step_name} 异常: {error_msg}", level='ERROR')
import traceback
self.log(f"异常堆栈:\n{traceback.format_exc()}", level='ERROR')
return False, error_msg
def process_date(self, date_str: str) -> bool:
"""处理单个日期的数据
Args:
date_str: 日期字符串 (YYYY-MM-DD)
Returns:
是否成功
"""
self.log("="*70)
self.log(f"开始处理日期: {date_str}")
self.log("="*70)
result = {
'date': date_str,
'start_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'steps': {},
'success': False,
'error': None
}
# 步骤1: 数据抓取(带重试)
self.log(f"\n[步骤 1/3] 抓取 {date_str} 的数据...")
cmd_analytics = [
sys.executable,
self.analytics_script,
date_str,
'--proxy',
'--database',
'--no-confirm' # 跳过确认提示
]
success, error = self.run_command_with_retry(cmd_analytics, f"数据抓取 ({date_str})")
result['steps']['analytics'] = {'success': success, 'error': error}
if not success:
result['error'] = f"数据抓取失败: {error}"
result['end_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.results.append(result)
return False
# 等待2秒确保文件写入完成
time.sleep(2)
# 步骤2: 导出CSV带重试
self.log(f"\n[步骤 2/3] 导出CSV文件...")
cmd_export = [
sys.executable,
self.export_script,
'--mode', 'csv',
'--no-confirm' # 跳过确认提示
]
success, error = self.run_command_with_retry(cmd_export, f"CSV导出 ({date_str})")
result['steps']['export'] = {'success': success, 'error': error}
if not success:
result['error'] = f"CSV导出失败: {error}"
result['end_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.results.append(result)
return False
# 等待2秒
time.sleep(2)
# 步骤3: 导入数据库(带重试)
self.log(f"\n[步骤 3/3] 导入数据库...")
cmd_import = [
sys.executable,
self.import_script
]
success, error = self.run_command_with_retry(cmd_import, f"数据库导入 ({date_str})")
result['steps']['import'] = {'success': success, 'error': error}
if not success:
result['error'] = f"数据库导入失败: {error}"
result['end_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.results.append(result)
return False
# 全部成功
result['success'] = True
result['end_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.results.append(result)
self.log(f"\n[✓] {date_str} 处理完成!")
self.log("="*70 + "\n")
return True
def run(self):
"""执行批量导入"""
dates = self.get_date_list()
print("\n" + "="*70)
print("批量历史数据导入")
print("="*70)
print(f"开始日期: {self.start_date.strftime('%Y-%m-%d')}")
print(f"结束日期: {self.end_date.strftime('%Y-%m-%d')}")
print(f"总天数: {len(dates)}")
print(f"跳过失败: {'' if self.skip_failed else ''}")
print(f"最大重试次数: {self.max_retries}")
print(f"日志文件: {self.log_file}")
print("="*70)
# 确认执行
confirm = input("\n是否开始执行? (y/n): ").strip().lower()
if confirm != 'y':
print("已取消")
return
self.log(f"开始批量导入: {len(dates)} 个日期")
start_time = datetime.now()
success_count = 0
failed_count = 0
for idx, date_str in enumerate(dates, 1):
print(f"\n{'='*70}")
print(f"进度: [{idx}/{len(dates)}] {date_str}")
print(f"{'='*70}")
success = self.process_date(date_str)
if success:
success_count += 1
else:
failed_count += 1
# 如果不跳过失败,则停止执行
if not self.skip_failed:
self.log(f"[X] 日期 {date_str} 处理失败,停止执行", level='ERROR')
break
else:
self.log(f"[!] 日期 {date_str} 处理失败,跳过继续", level='WARNING')
# 日期间延迟(避免请求过快)
if idx < len(dates):
delay = 5
self.log(f"等待 {delay} 秒后处理下一个日期...")
time.sleep(delay)
# 执行完成
end_time = datetime.now()
duration = end_time - start_time
print("\n" + "="*70)
print("批量导入完成")
print("="*70)
print(f"总耗时: {duration}")
print(f"成功: {success_count}")
print(f"失败: {failed_count}")
print(f"日志文件: {self.log_file}")
print("="*70)
self.log("="*70)
self.log(f"批量导入完成: 成功 {success_count} 天, 失败 {failed_count}")
self.log(f"总耗时: {duration}")
self.log("="*70)
# 保存执行结果
self._save_results()
# 显示失败的日期
if failed_count > 0:
print("\n失败的日期:")
for r in self.results:
if not r['success']:
print(f" - {r['date']}: {r.get('error', '未知错误')}")
def _save_results(self):
"""保存执行结果到JSON文件"""
try:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
result_file = os.path.join(self.log_dir, f'batch_result_{timestamp}.json')
summary = {
'start_date': self.start_date.strftime('%Y-%m-%d'),
'end_date': self.end_date.strftime('%Y-%m-%d'),
'total_dates': len(self.results),
'success_count': sum(1 for r in self.results if r['success']),
'failed_count': sum(1 for r in self.results if not r['success']),
'results': self.results
}
with open(result_file, 'w', encoding='utf-8') as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
self.log(f"执行结果已保存: {result_file}")
except Exception as e:
self.log(f"保存执行结果失败: {e}", level='ERROR')
def main():
"""主函数"""
parser = argparse.ArgumentParser(
description='批量历史数据导入脚本',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例用法:
python batch_import_history.py --start 2025-12-01 --end 2025-12-25
python batch_import_history.py --start 2025-12-01 --end 2025-12-25 --skip-failed
"""
)
parser.add_argument(
'--start',
type=str,
required=True,
help='开始日期 (格式: YYYY-MM-DD)'
)
parser.add_argument(
'--end',
type=str,
required=True,
help='结束日期 (格式: YYYY-MM-DD)'
)
parser.add_argument(
'--skip-failed',
action='store_true',
help='跳过失败的日期继续执行(默认:遇到失败停止)'
)
parser.add_argument(
'--max-retries',
type=int,
default=3,
help='每个步骤的最大重试次数默认3'
)
args = parser.parse_args()
# 验证日期格式
try:
start = datetime.strptime(args.start, '%Y-%m-%d')
end = datetime.strptime(args.end, '%Y-%m-%d')
if start > end:
print("[X] 开始日期不能晚于结束日期")
return 1
except ValueError as e:
print(f"[X] 日期格式错误: {e}")
print(" 正确格式: YYYY-MM-DD (例如: 2025-12-01)")
return 1
try:
# 创建导入器
importer = BatchImporter(
start_date=args.start,
end_date=args.end,
skip_failed=args.skip_failed,
max_retries=args.max_retries
)
# 执行批量导入
importer.run()
return 0
except Exception as e:
print(f"\n[X] 程序执行出错: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == '__main__':
sys.exit(main())