Files
baijiahao_data_crawl/bjh_data_to_database.py
“shengyudong” 322ac74336 2025-12-25 upload
2025-12-25 11:16:59 +08:00

286 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
百家号数据抓取并生成CSV文件的整合脚本
功能:
1. 从百家号API抓取最新数据发文统计 + 收益数据 + 当周收益)
2. 保存到 bjh_integrated_data.json
3. 生成三个CSV文件
- ai_statistics.csv - 账号汇总统计表
- ai_statistics_day.csv - 每日明细统计表(最新一天)
- ai_statistics_days.csv - 核心指标统计表(最新一天,含发文量、收益、环比)
特点:
- author_id 将从 ai_authors 表中根据 author_name 查询获取
- 支持从数据库加载Cookie无需手动维护
- 支持代理模式,防止被限流
- 支持错误重试机制,自动处理代理连接异常
"""
import sys
import os
import time
# 设置UTF-8编码
if sys.platform == 'win32':
import io
if not isinstance(sys.stdout, io.TextIOWrapper) or sys.stdout.encoding != 'utf-8':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
if not isinstance(sys.stderr, io.TextIOWrapper) or sys.stderr.encoding != 'utf-8':
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
from bjh_analytics import BaijiahaoAnalytics
from export_to_csv import DataExporter
from log_config import setup_data_to_db_logger
def main():
# 初始化日志记录器
logger = setup_data_to_db_logger()
logger.info("="*70)
logger.info("百家号数据抓取并生成CSV文件 - 启动")
logger.info("="*70)
print("\n" + "="*70)
print("百家号数据抓取并生成CSV文件")
print("="*70)
# 步骤1配置数据抓取
print("\n【步骤1/3】配置数据抓取")
print("="*70)
logger.info("步骤1/3配置数据抓取")
# 询问数据源
print("\n请选择账号数据源:")
print(" 1. 从本地文件加载Cookie (captured_account_cookies.json)")
print(" 2. 从数据库加载Cookie")
source = input("\n输入选项 (1/2, 默认1): ").strip() or '1'
load_from_db = (source == '2')
logger.info(f"数据源选择: {'数据库' if load_from_db else '本地文件'}")
# 询问是否使用代理
print("\n是否启用代理?")
use_proxy_input = input("(y/n, 默认n): ").strip().lower() or 'n'
use_proxy = (use_proxy_input == 'y')
logger.info(f"代理模式: {'启用' if use_proxy else '禁用'}")
# 询问抓取天数
print("\n抓取最近多少天的数据?")
days_input = input("(默认7天): ").strip() or '7'
try:
days = int(days_input)
except ValueError:
days = 7
logger.info(f"抓取天数: {days}")
# 自动启用错误重试机制(不询问)
enable_retry = True
max_retries = 3
print(f"\n[配置] 已自动启用错误重试机制(最大{max_retries}次)")
logger.info(f"错误重试机制: 启用,最大重试{max_retries}")
# 步骤2抓取数据
print("\n【步骤2/3】抓取数据")
print("="*70)
logger.info("步骤2/3抓取数据")
# 初始化分析器
analytics = BaijiahaoAnalytics(use_proxy=use_proxy, load_from_db=load_from_db)
if not analytics.account_cookies:
print("\n[X] 未找到可用的账号Cookie无法继续")
logger.error("未找到可用的账号Cookie")
return
print(f"\n找到 {len(analytics.account_cookies)} 个账号")
logger.info(f"找到 {len(analytics.account_cookies)} 个账号")
# 询问是否抓取所有账号
print("\n请选择抓取模式:")
print(" 1. 抓取所有账号")
print(" 2. 只抓取第一个账号(测试用)")
mode = input("\n输入选项 (1/2, 默认1): ").strip() or '1'
logger.info(f"抓取模式: {'所有账号' if mode == '1' else '测试单账号'}")
if mode == '2':
# 只抓取第一个账号
account_ids = [list(analytics.account_cookies.keys())[0]]
else:
# 抓取所有账号
account_ids = list(analytics.account_cookies.keys())
print(f"\n将抓取 {len(account_ids)} 个账号的数据")
logger.info(f"将抓取 {len(account_ids)} 个账号: {', '.join(account_ids)}")
confirm = input("\n是否继续? (y/n): ").strip().lower()
if confirm != 'y':
print("\n已取消")
logger.info("用户取消操作")
return
# 执行抓取
print("\n开始抓取数据...")
print(f" 错误重试策略:遇到代理连接错误时自动重试,最大{max_retries}\n")
logger.info("="*70)
logger.info("开始执行数据抓取任务")
logger.info(f"总账号数: {len(account_ids)},错误重试: 最大{max_retries}")
logger.info("="*70)
success_count = 0
failed_accounts = []
partial_success_accounts = [] # 部分成功的账号(只获取到部分数据)
results = []
for i, account_id in enumerate(account_ids, 1):
print(f"\n[{i}/{len(account_ids)}] 抓取账号: {account_id}")
print("-"*70)
# 重试机制
retry_count = 0
result = None
last_error = None
while retry_count <= (max_retries if enable_retry else 0):
try:
# 如果是重试,先等待一段时间
if retry_count > 0:
wait_time = retry_count * 2 # 递增等待时间2秒、4秒、6秒
print(f" [重试 {retry_count}/{max_retries}] 等待 {wait_time} 秒后重试...")
logger.info(f"账号 {account_id}{retry_count}次重试,等待{wait_time}")
time.sleep(wait_time)
print(f" [重试 {retry_count}/{max_retries}] 开始重新抓取...")
result = analytics.extract_integrated_data(account_id, days=days)
if result:
results.append(result)
status = result.get('status', '')
if status == 'success_all':
success_count += 1
print(f" [✓] 抓取成功")
logger.info(f"账号 {account_id} 抓取成功 (完全成功)")
break # 成功,跳出重试循环
elif 'success' in status:
# 部分成功(例如:只获取到发文数据,收入数据失败)
partial_success_accounts.append(account_id)
print(f" [!] 部分成功: {status}")
logger.warning(f"账号 {account_id} 部分成功: {status}")
break # 部分成功也跳出重试循环
else:
# 失败
last_error = f"状态: {status}"
if not enable_retry or retry_count >= max_retries:
failed_accounts.append(account_id)
print(f" [X] 抓取失败: {status}")
logger.error(f"账号 {account_id} 抓取失败: {status}")
break
else:
retry_count += 1
continue
else:
# 返回None表示完全失败
last_error = "返回结果为空"
if not enable_retry or retry_count >= max_retries:
failed_accounts.append(account_id)
print(f" [X] 抓取失败: 无返回结果")
logger.error(f"账号 {account_id} 抓取失败: 无返回结果")
break
else:
retry_count += 1
continue
except Exception as e:
last_error = str(e)
error_type = type(e).__name__
# 判断是否是代理相关错误
is_proxy_error = any([
'Connection' in error_type,
'RemoteDisconnected' in error_type,
'ProxyError' in error_type,
'Timeout' in error_type,
'ConnectionError' in str(e),
'Connection aborted' in str(e),
'Remote end closed' in str(e),
])
if is_proxy_error:
print(f" [!] 代理连接错误: {error_type}")
logger.warning(f"账号 {account_id} 代理连接错误: {error_type} - {last_error}")
else:
print(f" [!] 发生异常: {error_type} - {e}")
logger.error(f"账号 {account_id} 异常: {error_type} - {e}", exc_info=True)
if not enable_retry or retry_count >= max_retries:
failed_accounts.append(account_id)
print(f" [X] 抓取失败(已达最大重试次数)")
logger.error(f"账号 {account_id} 最终失败,已达最大重试次数")
break
else:
retry_count += 1
continue
# 保存到文件
if results:
analytics.save_results(results)
logger.info(f"数据已保存到 bjh_integrated_data.json{len(results)} 个账号")
print(f"\n数据抓取完成:")
print(f" 完全成功: {success_count}/{len(account_ids)}")
logger.info(f"数据抓取完成: 完全成功 {success_count}/{len(account_ids)}")
if partial_success_accounts:
print(f" 部分成功: {len(partial_success_accounts)} 个账号")
print(f" 账号列表: {', '.join(partial_success_accounts)}")
logger.info(f"部分成功: {len(partial_success_accounts)} 个账号 - {', '.join(partial_success_accounts)}")
if failed_accounts:
print(f" 完全失败: {len(failed_accounts)} 个账号")
print(f" 账号列表: {', '.join(failed_accounts)}")
logger.error(f"完全失败: {len(failed_accounts)} 个账号 - {', '.join(failed_accounts)}")
# 步骤3生成CSV文件
print("\n【步骤3/3】生成CSV文件")
print("="*70)
logger.info("步骤3/3生成CSV文件")
confirm = input("\n是否生成CSV文件? (y/n): ").strip().lower()
if confirm != 'y':
print("\n已跳过CSV生成")
logger.info("用户跳过CSV生成")
return
# 初始化导出器CSV模式但启用数据库连接以查询author_id
exporter = DataExporter(use_database=False)
logger.info("已初始化CSV导出器CSV模式 + 数据库查询author_id")
print("\n将生成以下三个CSV文件")
print(" 1. ai_statistics.csv - 账号汇总统计表")
print(" 2. ai_statistics_day.csv - 每日明细统计表")
print(" 3. ai_statistics_days.csv - 核心指标统计表(含发文量、收益、环比)")
print("\n注意author_id 将从 ai_authors 表中查询")
# 执行CSV导出
success = exporter.export_all_tables()
if success:
print("\n" + "="*70)
print("✓ 所有步骤完成!")
print("="*70)
logger.info("="*70)
logger.info("所有步骤完成CSV文件已生成")
logger.info("="*70)
else:
print("\n" + "="*70)
print("✗ CSV生成失败")
print("="*70)
logger.error("CSV生成失败")
if __name__ == '__main__':
main()