Files
baijiahao_data_crawl/bjh_articles_crawler.py

719 lines
27 KiB
Python
Raw Permalink Normal View History

2025-12-25 11:16:59 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
百家号文章数据抓取工具
抓取当天发布的所有文章生成CSV文件
"""
import json
import sys
import os
import requests
import csv
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import re
# 设置标准输出编码为UTF-8
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
# 禁用SSL警告
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 导入日志和数据库配置
from log_config import setup_logger
from database_config import DatabaseManager, DB_CONFIG
class BaijiahaoArticlesCrawler:
"""百家号文章数据抓取器"""
def __init__(self, load_from_db: bool = True, load_from_json: bool = False):
"""初始化
Args:
load_from_db: 是否从数据库加载账号Cookie默认True
load_from_json: 是否从JSON文件加载Cookie用于测试默认False
"""
self.base_url = "https://baijiahao.baidu.com"
self.session = requests.Session()
self.session.verify = False
# 初始化日志
self.logger = setup_logger(
name='bjh_articles_crawler',
log_file='logs/bjh_articles_crawler.log',
error_log_file='logs/bjh_articles_crawler_error.log',
level=20, # INFO
backup_count=30,
console_output=False # 不输出到控制台避免与print重复
)
# 数据库配置
self.load_from_db = load_from_db
self.load_from_json = load_from_json
self.db_manager = None
if load_from_json:
print("[配置] 已启用JSON文件加载模式测试模式")
self.logger.info("已启用JSON文件加载模式测试模式")
elif load_from_db:
self.db_manager = DatabaseManager(DB_CONFIG)
print("[配置] 已启用数据库加载模式")
self.logger.info("已启用数据库加载模式")
# 获取脚本所在目录
self.script_dir = os.path.dirname(os.path.abspath(__file__))
# 加载Cookie
self.captured_cookies_file = os.path.join(self.script_dir, "captured_account_cookies.json")
if self.load_from_json:
# 测试模式从JSON文件加载
self.account_cookies = self.load_cookies_from_json_file()
elif self.load_from_db:
# 生产模式:从数据库加载
self.account_cookies = self.load_cookies_from_database()
else:
# 兼容旧模式
self.account_cookies = self.load_captured_cookies()
# 输出CSV文件
self.output_csv = os.path.join(self.script_dir, f"bjh_articles_{datetime.now().strftime('%Y%m%d')}.csv")
def cookie_string_to_dict(self, cookie_string: str) -> Dict:
"""将Cookie字符串转换为字典格式"""
cookie_dict = {}
if not cookie_string:
return cookie_dict
for item in cookie_string.split(';'):
item = item.strip()
if '=' in item:
key, value = item.split('=', 1)
cookie_dict[key.strip()] = value.strip()
return cookie_dict
def load_cookies_from_database(self) -> Dict:
"""从数据库加载Cookie数据"""
try:
if not self.db_manager:
print("[X] 数据库管理器未初始化")
return {}
sql = """
SELECT id, author_name, app_id, toutiao_cookie, department_name
FROM ai_authors
WHERE channel = 1
AND status = 'active'
AND toutiao_cookie IS NOT NULL
AND toutiao_cookie != ''
ORDER BY id
"""
results = self.db_manager.execute_query(sql)
if not results:
print("[X] 数据库中未找到可用的账号Cookie")
return {}
account_cookies = {}
for row in results:
author_id = row['id']
author_name = row['author_name']
app_id = row['app_id'] or ''
cookie_string = row['toutiao_cookie']
domain = row['department_name'] or '其它'
cookies = self.cookie_string_to_dict(cookie_string)
if not cookies:
continue
account_cookies[author_name] = {
'author_id': author_id,
'app_id': app_id,
'nick': author_name,
'domain': domain,
'cookies': cookies,
'source': 'database'
}
self.logger.info(f"从数据库加载了 {len(account_cookies)} 个账号的Cookie")
print(f"[OK] 从数据库加载了 {len(account_cookies)} 个账号的Cookie")
return account_cookies
except Exception as e:
self.logger.error(f"从数据库加载Cookie失败: {e}", exc_info=True)
print(f"[X] 从数据库加载Cookie失败: {e}")
return {}
def load_captured_cookies(self) -> Dict:
"""从本地JSON文件加载已捕获的Cookie数据兼容旧模式"""
try:
with open(self.captured_cookies_file, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"[OK] 从文件加载了 {len(data)} 个账号的Cookie")
return data
except FileNotFoundError:
print(f"[X] 未找到Cookie文件: {self.captured_cookies_file}")
return {}
except Exception as e:
print(f"[X] 加载Cookie失败: {e}")
return {}
def load_cookies_from_json_file(self) -> Dict:
"""从JSON文件加载Cookie数据测试模式
从captured_account_cookies.json文件加载Cookie
并转换为与数据库加载模式相同的格式确保author_id字段存在
Returns:
账号Cookie字典
"""
try:
with open(self.captured_cookies_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if not data:
print("[X] JSON文件中没有账号数据")
return {}
# 转换格式添加author_id字段
account_cookies = {}
for account_name, account_info in data.items():
# 从JSON文件中提取数据
nick = account_info.get('nick', account_name)
app_id = account_info.get('app_id', '')
cookies = account_info.get('cookies', {})
domain = account_info.get('domain', '其它')
if not cookies:
continue
# 尝试从数据库查询author_id如果启用了数据库
author_id = 0
if self.db_manager:
try:
sql = "SELECT id FROM ai_authors WHERE author_name = %s AND channel = 1 LIMIT 1"
result = self.db_manager.execute_query(sql, (nick,), fetch_one=True)
if result:
author_id = result['id']
self.logger.info(f"[{nick}] 从数据库查询到 author_id={author_id}")
except Exception as e:
self.logger.warning(f"[{nick}] 查询author_id失败: {e}")
# 构建账号数据
account_cookies[account_name] = {
'author_id': author_id, # 从数据库查询或默认为0
'app_id': app_id,
'nick': nick,
'domain': domain,
'cookies': cookies,
'source': 'json_file' # 标记数据来源
}
self.logger.info(f"从JSON文件加载了 {len(account_cookies)} 个账号的Cookie")
print(f"[OK] 从JSON文件加载了 {len(account_cookies)} 个账号的Cookie")
# 显示账号列表
print("\n可用账号列表:")
for idx, (name, info) in enumerate(account_cookies.items(), 1):
author_id_str = f"ID:{info['author_id']}" if info['author_id'] > 0 else "ID:未查询"
print(f" {idx}. {info['nick']} ({author_id_str}) - {info['domain']}")
print()
return account_cookies
except FileNotFoundError:
print(f"[X] 未找到Cookie文件: {self.captured_cookies_file}")
print(" 请先运行一键捕获Cookie工具")
return {}
except json.JSONDecodeError as e:
print(f"[X] JSON文件格式错误: {e}")
self.logger.error(f"JSON文件格式错误: {e}")
return {}
except Exception as e:
print(f"[X] 加载JSON文件失败: {e}")
self.logger.error(f"加载JSON文件失败: {e}", exc_info=True)
return {}
def set_account_cookies(self, account_data: Dict) -> bool:
"""设置当前账号的Cookie"""
try:
cookies = account_data.get('cookies', {})
if not cookies:
return False
self.session.cookies.clear()
for key, value in cookies.items():
self.session.cookies.set(key, value, domain='.baidu.com')
return True
except Exception as e:
self.logger.error(f"设置Cookie失败: {e}")
return False
def fetch_articles_list(self, account_data: Dict, page: int = 1, page_size: int = 100,
filter_status: Optional[str] = None) -> Optional[Dict]:
"""获取文章列表
Args:
account_data: 账号数据
page: 页码从1开始
page_size: 每页数量默认100
filter_status: 状态筛选可选例如'published''draft'
Returns:
文章列表数据
"""
try:
author_name = account_data.get('nick', '未知')
# 设置Cookie
if not self.set_account_cookies(account_data):
print(f" [X] 设置Cookie失败")
return None
# 获取今天的日期范围
today = datetime.now().date()
start_date = today.strftime('%Y-%m-%d')
end_date = (today + timedelta(days=1)).strftime('%Y-%m-%d') # 结束日期为明天(不包含)
# 构建API URL
api_url = f"{self.base_url}/pcui/article/lists"
params = {
'currentPage': page,
'pageSize': page_size,
'search': '',
'type': '', # 类型筛选:空=全部
'collection': '',
'startDate': start_date, # 今天开始
'endDate': end_date, # 明天开始(不包含)
'clearBeforeFetch': 'false',
'dynamic': '0'
}
# 构建请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'sec-ch-ua': '"Not?A_Brand";v="8", "Chromium";v="108"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Referer': f'{self.base_url}/builder/rc/content',
'Accept-Language': 'zh-CN'
}
# 如果有token添加到请求头
if 'token' in account_data:
headers['token'] = account_data['token']
print(f" [>] 正在获取第{page}页文章列表...")
self.logger.info(f"[{author_name}] 请求文章列表 - 第{page}页,日期范围: {start_date} ~ {end_date}")
response = self.session.get(
api_url,
headers=headers,
params=params,
timeout=15
)
if response.status_code == 200:
data = response.json()
# 检查响应
errno = data.get('errno', -1)
if errno == 0:
# 保存返回的token供下次使用
if 'Token' in response.headers:
account_data['token'] = response.headers['Token']
return data
else:
errmsg = data.get('errmsg', '未知错误')
print(f" [X] API错误: {errmsg}")
self.logger.error(f"[{author_name}] API错误: errno={errno}, errmsg={errmsg}")
return None
else:
print(f" [X] HTTP错误: {response.status_code}")
self.logger.error(f"[{author_name}] HTTP错误: {response.status_code}")
return None
except Exception as e:
self.logger.error(f"获取文章列表异常: {e}", exc_info=True)
print(f" [X] 获取文章列表异常: {e}")
return None
def fetch_article_detail(self, account_data: Dict, article_id: str, max_retries: int = 3) -> Optional[Dict]:
"""获取文章详情
Args:
account_data: 账号数据
article_id: 文章ID (feed_id)
max_retries: 最大重试次数默认3次
Returns:
文章详情数据
"""
author_name = account_data.get('nick', '未知')
for retry in range(max_retries):
try:
# 设置Cookie
if not self.set_account_cookies(account_data):
return None
# 构建API URL
api_url = f"{self.base_url}/pcui/article/edit"
params = {
'type': 'events',
'feed_id': article_id,
'copy_new_nid': article_id
}
# 构建请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'sec-ch-ua': '"Not?A_Brand";v="8", "Chromium";v="108"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Referer': f'{self.base_url}/builder/rc/edit?type=events&app_id={account_data.get("app_id", "")}&feed_id={article_id}',
'Accept-Language': 'zh-CN'
}
# 如果有token添加到请求头
if 'token' in account_data:
headers['token'] = account_data['token']
self.logger.info(f"[{author_name}] 获取文章详情: article_id={article_id}")
response = self.session.get(
api_url,
headers=headers,
params=params,
timeout=10 # 设置10秒超时
)
if response.status_code == 200:
data = response.json()
errno = data.get('errno', -1)
if errno == 0:
# 保存返回的token
if 'Token' in response.headers:
account_data['token'] = response.headers['Token']
return data.get('data', {}).get('article', {})
else:
errmsg = data.get('errmsg', '未知错误')
self.logger.error(f"[{author_name}] 文章详情API错误: errno={errno}, errmsg={errmsg}")
return None
else:
self.logger.error(f"[{author_name}] 文章详情HTTP错误: {response.status_code}")
# 如果是HTTP错误且还有重试机会等待后重试
if retry < max_retries - 1:
wait_time = (retry + 1) * 2
self.logger.warning(f"[{author_name}] HTTP错误{wait_time}秒后重试...")
time.sleep(wait_time)
continue
return None
except requests.exceptions.Timeout:
self.logger.warning(f"[{author_name}] 请求超时,第{retry + 1}次尝试")
if retry < max_retries - 1:
time.sleep(2)
continue
else:
self.logger.error(f"[{author_name}] 请求超时,已达最大重试次数")
return None
except requests.exceptions.ConnectionError as e:
self.logger.warning(f"[{author_name}] 连接错误: {e},第{retry + 1}次尝试")
if retry < max_retries - 1:
time.sleep(2)
continue
else:
self.logger.error(f"[{author_name}] 连接错误,已达最大重试次数")
return None
except Exception as e:
self.logger.error(f"获取文章详情异常: {e}", exc_info=True)
return None
return None
def extract_title_from_content(self, content: str) -> str:
"""从正文中提取标题(第一行)"""
if not content:
return ""
lines = content.strip().split('\n')
if lines:
title = lines[0].strip()
# 移除可能的标签
title = re.sub(r'#.*?#', '', title).strip()
return title
return ""
def parse_article_data(self, article: Dict, account_data: Dict) -> Optional[Dict]:
"""解析文章数据映射到ai_articles表结构
Args:
article: 文章详情数据
account_data: 账号数据
Returns:
解析后的文章数据字典
"""
try:
# 提取基础信息
article_id = article.get('article_id', '')
content = article.get('content', '')
title = self.extract_title_from_content(content)
# 提取作者信息
author_name = account_data.get('nick', '')
app_id = account_data.get('app_id', '')
author_id = account_data.get('author_id', 0)
# 提取分类
category = article.get('category_v4', '')
# 提取时间
created_at = article.get('created_at', '')
commit_at = article.get('commit_at', '')
publish_time = commit_at or created_at
# 提取封面图片
cover_images_str = article.get('cover_images', '[]')
try:
cover_images = json.loads(cover_images_str)
image_count = len(cover_images) if isinstance(cover_images, list) else 0
except:
image_count = 0
# 计算字数
word_count = len(content) if content else 0
# 提取审核状态
audit_status = article.get('audit_status_info', {})
status_map = {
0: 'draft', # 草稿
1: 'published', # 已发布
2: 'pending_review', # 待审核
3: 'rejected', # 审核拒绝
4: 'failed' # 发布失败
}
auditing_status = article.get('auditing_status', 0)
status = status_map.get(auditing_status, 'draft')
# 构建返回数据
return {
'baijiahao_id': article_id,
'author_id': author_id,
'author_name': author_name,
'title': title,
'content': content,
'category': category,
'channel': 1, # 1=百家号
'status': status,
'word_count': word_count,
'image_count': image_count,
'publish_time': publish_time,
'created_at': created_at,
'baijiahao_status': audit_status.get('quality_status', ''),
}
except Exception as e:
self.logger.error(f"解析文章数据失败: {e}", exc_info=True)
return None
def crawl_account_articles(self, account_name: str, account_data: Dict) -> List[Dict]:
"""抓取单个账号的所有今日文章
Args:
account_name: 账号名称
account_data: 账号数据
Returns:
文章数据列表
"""
print(f"\n[账号] {account_name}")
print("="*70)
all_articles = []
page = 1
while True:
# 获取文章列表
result = self.fetch_articles_list(account_data, page=page, page_size=100)
if not result:
break
data = result.get('data', {})
articles_list = data.get('list', [])
if not articles_list:
print(f" [i] 第{page}页无数据,停止翻页")
break
print(f" [OK] 第{page}页获取到 {len(articles_list)} 篇文章")
# 遍历文章,获取详情
for idx, article_brief in enumerate(articles_list, 1):
article_id = article_brief.get('article_id', '')
if not article_id:
continue
print(f" [{idx}/{len(articles_list)}] 正在获取文章详情: {article_id}")
# 获取文章详情
article_detail = self.fetch_article_detail(account_data, article_id)
if article_detail:
# 解析文章数据
parsed_data = self.parse_article_data(article_detail, account_data)
if parsed_data:
all_articles.append(parsed_data)
print(f" 标题: {parsed_data['title']}")
print(f" 状态: {parsed_data['status']}")
# 避免请求过快
time.sleep(0.5)
# 检查是否还有下一页
has_more = data.get('has_more', False)
if not has_more:
print(f" [i] 已到最后一页")
break
page += 1
time.sleep(1)
print(f"\n[OK] 账号 {account_name} 共抓取 {len(all_articles)} 篇文章")
return all_articles
def save_to_csv(self, articles: List[Dict]) -> bool:
"""保存文章数据到CSV文件
Args:
articles: 文章数据列表
Returns:
是否成功
"""
try:
if not articles:
print("[!] 没有文章数据需要保存")
return False
# CSV表头对应ai_articles表的主要字段
fieldnames = [
'baijiahao_id',
'author_id',
'author_name',
'title',
'content',
'category',
'channel',
'status',
'word_count',
'image_count',
'publish_time',
'created_at',
'baijiahao_status'
]
print(f"\n[>] 正在保存到CSV文件: {self.output_csv}")
with open(self.output_csv, 'w', encoding='utf-8-sig', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for article in articles:
# 只写入指定的字段
row = {key: article.get(key, '') for key in fieldnames}
writer.writerow(row)
print(f"[OK] 成功保存 {len(articles)} 篇文章到CSV文件")
self.logger.info(f"成功保存 {len(articles)} 篇文章到 {self.output_csv}")
return True
except Exception as e:
print(f"[X] 保存CSV失败: {e}")
self.logger.error(f"保存CSV失败: {e}", exc_info=True)
return False
def run(self):
"""运行爬虫"""
print("\n" + "="*70)
print("百家号文章数据抓取工具")
print("="*70)
print(f"抓取日期: {datetime.now().strftime('%Y-%m-%d')}")
print(f"账号数量: {len(self.account_cookies)}")
print("="*70)
if not self.account_cookies:
print("[X] 没有可用的账号Cookie退出")
return
all_articles = []
# 遍历所有账号
for account_name, account_data in self.account_cookies.items():
try:
articles = self.crawl_account_articles(account_name, account_data)
all_articles.extend(articles)
except Exception as e:
print(f"[X] 抓取账号 {account_name} 失败: {e}")
self.logger.error(f"抓取账号 {account_name} 失败: {e}", exc_info=True)
continue
# 保存到CSV
if all_articles:
self.save_to_csv(all_articles)
else:
print("\n[!] 今日暂无文章数据")
print("\n" + "="*70)
print(f"抓取完成!共抓取 {len(all_articles)} 篇文章")
print("="*70)
if __name__ == '__main__':
# 默认模式从数据库加载Cookie
# crawler = BaijiahaoArticlesCrawler(load_from_db=True)
# 测试模式从JSON文件加载Cookie方便测试会尝试查询author_id
crawler = BaijiahaoArticlesCrawler(load_from_json=True)
# 兼容旧模式从JSON文件加载Cookie不查询author_id
# crawler = BaijiahaoArticlesCrawler(load_from_db=False)
crawler.run()