Files
baijiahao_data_crawl/bjh_articles_crawler.py
“shengyudong” 322ac74336 2025-12-25 upload
2025-12-25 11:16:59 +08:00

719 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
百家号文章数据抓取工具
抓取当天发布的所有文章生成CSV文件
"""
import json
import sys
import os
import requests
import csv
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import re
# 设置标准输出编码为UTF-8
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
# 禁用SSL警告
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 导入日志和数据库配置
from log_config import setup_logger
from database_config import DatabaseManager, DB_CONFIG
class BaijiahaoArticlesCrawler:
"""百家号文章数据抓取器"""
def __init__(self, load_from_db: bool = True, load_from_json: bool = False):
"""初始化
Args:
load_from_db: 是否从数据库加载账号Cookie默认True
load_from_json: 是否从JSON文件加载Cookie用于测试默认False
"""
self.base_url = "https://baijiahao.baidu.com"
self.session = requests.Session()
self.session.verify = False
# 初始化日志
self.logger = setup_logger(
name='bjh_articles_crawler',
log_file='logs/bjh_articles_crawler.log',
error_log_file='logs/bjh_articles_crawler_error.log',
level=20, # INFO
backup_count=30,
console_output=False # 不输出到控制台避免与print重复
)
# 数据库配置
self.load_from_db = load_from_db
self.load_from_json = load_from_json
self.db_manager = None
if load_from_json:
print("[配置] 已启用JSON文件加载模式测试模式")
self.logger.info("已启用JSON文件加载模式测试模式")
elif load_from_db:
self.db_manager = DatabaseManager(DB_CONFIG)
print("[配置] 已启用数据库加载模式")
self.logger.info("已启用数据库加载模式")
# 获取脚本所在目录
self.script_dir = os.path.dirname(os.path.abspath(__file__))
# 加载Cookie
self.captured_cookies_file = os.path.join(self.script_dir, "captured_account_cookies.json")
if self.load_from_json:
# 测试模式从JSON文件加载
self.account_cookies = self.load_cookies_from_json_file()
elif self.load_from_db:
# 生产模式:从数据库加载
self.account_cookies = self.load_cookies_from_database()
else:
# 兼容旧模式
self.account_cookies = self.load_captured_cookies()
# 输出CSV文件
self.output_csv = os.path.join(self.script_dir, f"bjh_articles_{datetime.now().strftime('%Y%m%d')}.csv")
def cookie_string_to_dict(self, cookie_string: str) -> Dict:
"""将Cookie字符串转换为字典格式"""
cookie_dict = {}
if not cookie_string:
return cookie_dict
for item in cookie_string.split(';'):
item = item.strip()
if '=' in item:
key, value = item.split('=', 1)
cookie_dict[key.strip()] = value.strip()
return cookie_dict
def load_cookies_from_database(self) -> Dict:
"""从数据库加载Cookie数据"""
try:
if not self.db_manager:
print("[X] 数据库管理器未初始化")
return {}
sql = """
SELECT id, author_name, app_id, toutiao_cookie, department_name
FROM ai_authors
WHERE channel = 1
AND status = 'active'
AND toutiao_cookie IS NOT NULL
AND toutiao_cookie != ''
ORDER BY id
"""
results = self.db_manager.execute_query(sql)
if not results:
print("[X] 数据库中未找到可用的账号Cookie")
return {}
account_cookies = {}
for row in results:
author_id = row['id']
author_name = row['author_name']
app_id = row['app_id'] or ''
cookie_string = row['toutiao_cookie']
domain = row['department_name'] or '其它'
cookies = self.cookie_string_to_dict(cookie_string)
if not cookies:
continue
account_cookies[author_name] = {
'author_id': author_id,
'app_id': app_id,
'nick': author_name,
'domain': domain,
'cookies': cookies,
'source': 'database'
}
self.logger.info(f"从数据库加载了 {len(account_cookies)} 个账号的Cookie")
print(f"[OK] 从数据库加载了 {len(account_cookies)} 个账号的Cookie")
return account_cookies
except Exception as e:
self.logger.error(f"从数据库加载Cookie失败: {e}", exc_info=True)
print(f"[X] 从数据库加载Cookie失败: {e}")
return {}
def load_captured_cookies(self) -> Dict:
"""从本地JSON文件加载已捕获的Cookie数据兼容旧模式"""
try:
with open(self.captured_cookies_file, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"[OK] 从文件加载了 {len(data)} 个账号的Cookie")
return data
except FileNotFoundError:
print(f"[X] 未找到Cookie文件: {self.captured_cookies_file}")
return {}
except Exception as e:
print(f"[X] 加载Cookie失败: {e}")
return {}
def load_cookies_from_json_file(self) -> Dict:
"""从JSON文件加载Cookie数据测试模式
从captured_account_cookies.json文件加载Cookie
并转换为与数据库加载模式相同的格式确保author_id字段存在。
Returns:
账号Cookie字典
"""
try:
with open(self.captured_cookies_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if not data:
print("[X] JSON文件中没有账号数据")
return {}
# 转换格式添加author_id字段
account_cookies = {}
for account_name, account_info in data.items():
# 从JSON文件中提取数据
nick = account_info.get('nick', account_name)
app_id = account_info.get('app_id', '')
cookies = account_info.get('cookies', {})
domain = account_info.get('domain', '其它')
if not cookies:
continue
# 尝试从数据库查询author_id如果启用了数据库
author_id = 0
if self.db_manager:
try:
sql = "SELECT id FROM ai_authors WHERE author_name = %s AND channel = 1 LIMIT 1"
result = self.db_manager.execute_query(sql, (nick,), fetch_one=True)
if result:
author_id = result['id']
self.logger.info(f"[{nick}] 从数据库查询到 author_id={author_id}")
except Exception as e:
self.logger.warning(f"[{nick}] 查询author_id失败: {e}")
# 构建账号数据
account_cookies[account_name] = {
'author_id': author_id, # 从数据库查询或默认为0
'app_id': app_id,
'nick': nick,
'domain': domain,
'cookies': cookies,
'source': 'json_file' # 标记数据来源
}
self.logger.info(f"从JSON文件加载了 {len(account_cookies)} 个账号的Cookie")
print(f"[OK] 从JSON文件加载了 {len(account_cookies)} 个账号的Cookie")
# 显示账号列表
print("\n可用账号列表:")
for idx, (name, info) in enumerate(account_cookies.items(), 1):
author_id_str = f"ID:{info['author_id']}" if info['author_id'] > 0 else "ID:未查询"
print(f" {idx}. {info['nick']} ({author_id_str}) - {info['domain']}")
print()
return account_cookies
except FileNotFoundError:
print(f"[X] 未找到Cookie文件: {self.captured_cookies_file}")
print(" 请先运行一键捕获Cookie工具")
return {}
except json.JSONDecodeError as e:
print(f"[X] JSON文件格式错误: {e}")
self.logger.error(f"JSON文件格式错误: {e}")
return {}
except Exception as e:
print(f"[X] 加载JSON文件失败: {e}")
self.logger.error(f"加载JSON文件失败: {e}", exc_info=True)
return {}
def set_account_cookies(self, account_data: Dict) -> bool:
"""设置当前账号的Cookie"""
try:
cookies = account_data.get('cookies', {})
if not cookies:
return False
self.session.cookies.clear()
for key, value in cookies.items():
self.session.cookies.set(key, value, domain='.baidu.com')
return True
except Exception as e:
self.logger.error(f"设置Cookie失败: {e}")
return False
def fetch_articles_list(self, account_data: Dict, page: int = 1, page_size: int = 100,
filter_status: Optional[str] = None) -> Optional[Dict]:
"""获取文章列表
Args:
account_data: 账号数据
page: 页码从1开始
page_size: 每页数量默认100
filter_status: 状态筛选(可选),例如'published''draft'
Returns:
文章列表数据
"""
try:
author_name = account_data.get('nick', '未知')
# 设置Cookie
if not self.set_account_cookies(account_data):
print(f" [X] 设置Cookie失败")
return None
# 获取今天的日期范围
today = datetime.now().date()
start_date = today.strftime('%Y-%m-%d')
end_date = (today + timedelta(days=1)).strftime('%Y-%m-%d') # 结束日期为明天(不包含)
# 构建API URL
api_url = f"{self.base_url}/pcui/article/lists"
params = {
'currentPage': page,
'pageSize': page_size,
'search': '',
'type': '', # 类型筛选:空=全部
'collection': '',
'startDate': start_date, # 今天开始
'endDate': end_date, # 明天开始(不包含)
'clearBeforeFetch': 'false',
'dynamic': '0'
}
# 构建请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'sec-ch-ua': '"Not?A_Brand";v="8", "Chromium";v="108"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Referer': f'{self.base_url}/builder/rc/content',
'Accept-Language': 'zh-CN'
}
# 如果有token添加到请求头
if 'token' in account_data:
headers['token'] = account_data['token']
print(f" [>] 正在获取第{page}页文章列表...")
self.logger.info(f"[{author_name}] 请求文章列表 - 第{page}页,日期范围: {start_date} ~ {end_date}")
response = self.session.get(
api_url,
headers=headers,
params=params,
timeout=15
)
if response.status_code == 200:
data = response.json()
# 检查响应
errno = data.get('errno', -1)
if errno == 0:
# 保存返回的token供下次使用
if 'Token' in response.headers:
account_data['token'] = response.headers['Token']
return data
else:
errmsg = data.get('errmsg', '未知错误')
print(f" [X] API错误: {errmsg}")
self.logger.error(f"[{author_name}] API错误: errno={errno}, errmsg={errmsg}")
return None
else:
print(f" [X] HTTP错误: {response.status_code}")
self.logger.error(f"[{author_name}] HTTP错误: {response.status_code}")
return None
except Exception as e:
self.logger.error(f"获取文章列表异常: {e}", exc_info=True)
print(f" [X] 获取文章列表异常: {e}")
return None
def fetch_article_detail(self, account_data: Dict, article_id: str, max_retries: int = 3) -> Optional[Dict]:
"""获取文章详情
Args:
account_data: 账号数据
article_id: 文章ID (feed_id)
max_retries: 最大重试次数默认3次
Returns:
文章详情数据
"""
author_name = account_data.get('nick', '未知')
for retry in range(max_retries):
try:
# 设置Cookie
if not self.set_account_cookies(account_data):
return None
# 构建API URL
api_url = f"{self.base_url}/pcui/article/edit"
params = {
'type': 'events',
'feed_id': article_id,
'copy_new_nid': article_id
}
# 构建请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'sec-ch-ua': '"Not?A_Brand";v="8", "Chromium";v="108"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Referer': f'{self.base_url}/builder/rc/edit?type=events&app_id={account_data.get("app_id", "")}&feed_id={article_id}',
'Accept-Language': 'zh-CN'
}
# 如果有token添加到请求头
if 'token' in account_data:
headers['token'] = account_data['token']
self.logger.info(f"[{author_name}] 获取文章详情: article_id={article_id}")
response = self.session.get(
api_url,
headers=headers,
params=params,
timeout=10 # 设置10秒超时
)
if response.status_code == 200:
data = response.json()
errno = data.get('errno', -1)
if errno == 0:
# 保存返回的token
if 'Token' in response.headers:
account_data['token'] = response.headers['Token']
return data.get('data', {}).get('article', {})
else:
errmsg = data.get('errmsg', '未知错误')
self.logger.error(f"[{author_name}] 文章详情API错误: errno={errno}, errmsg={errmsg}")
return None
else:
self.logger.error(f"[{author_name}] 文章详情HTTP错误: {response.status_code}")
# 如果是HTTP错误且还有重试机会等待后重试
if retry < max_retries - 1:
wait_time = (retry + 1) * 2
self.logger.warning(f"[{author_name}] HTTP错误{wait_time}秒后重试...")
time.sleep(wait_time)
continue
return None
except requests.exceptions.Timeout:
self.logger.warning(f"[{author_name}] 请求超时,第{retry + 1}次尝试")
if retry < max_retries - 1:
time.sleep(2)
continue
else:
self.logger.error(f"[{author_name}] 请求超时,已达最大重试次数")
return None
except requests.exceptions.ConnectionError as e:
self.logger.warning(f"[{author_name}] 连接错误: {e},第{retry + 1}次尝试")
if retry < max_retries - 1:
time.sleep(2)
continue
else:
self.logger.error(f"[{author_name}] 连接错误,已达最大重试次数")
return None
except Exception as e:
self.logger.error(f"获取文章详情异常: {e}", exc_info=True)
return None
return None
def extract_title_from_content(self, content: str) -> str:
"""从正文中提取标题(第一行)"""
if not content:
return ""
lines = content.strip().split('\n')
if lines:
title = lines[0].strip()
# 移除可能的标签
title = re.sub(r'#.*?#', '', title).strip()
return title
return ""
def parse_article_data(self, article: Dict, account_data: Dict) -> Optional[Dict]:
"""解析文章数据映射到ai_articles表结构
Args:
article: 文章详情数据
account_data: 账号数据
Returns:
解析后的文章数据字典
"""
try:
# 提取基础信息
article_id = article.get('article_id', '')
content = article.get('content', '')
title = self.extract_title_from_content(content)
# 提取作者信息
author_name = account_data.get('nick', '')
app_id = account_data.get('app_id', '')
author_id = account_data.get('author_id', 0)
# 提取分类
category = article.get('category_v4', '')
# 提取时间
created_at = article.get('created_at', '')
commit_at = article.get('commit_at', '')
publish_time = commit_at or created_at
# 提取封面图片
cover_images_str = article.get('cover_images', '[]')
try:
cover_images = json.loads(cover_images_str)
image_count = len(cover_images) if isinstance(cover_images, list) else 0
except:
image_count = 0
# 计算字数
word_count = len(content) if content else 0
# 提取审核状态
audit_status = article.get('audit_status_info', {})
status_map = {
0: 'draft', # 草稿
1: 'published', # 已发布
2: 'pending_review', # 待审核
3: 'rejected', # 审核拒绝
4: 'failed' # 发布失败
}
auditing_status = article.get('auditing_status', 0)
status = status_map.get(auditing_status, 'draft')
# 构建返回数据
return {
'baijiahao_id': article_id,
'author_id': author_id,
'author_name': author_name,
'title': title,
'content': content,
'category': category,
'channel': 1, # 1=百家号
'status': status,
'word_count': word_count,
'image_count': image_count,
'publish_time': publish_time,
'created_at': created_at,
'baijiahao_status': audit_status.get('quality_status', ''),
}
except Exception as e:
self.logger.error(f"解析文章数据失败: {e}", exc_info=True)
return None
def crawl_account_articles(self, account_name: str, account_data: Dict) -> List[Dict]:
"""抓取单个账号的所有今日文章
Args:
account_name: 账号名称
account_data: 账号数据
Returns:
文章数据列表
"""
print(f"\n[账号] {account_name}")
print("="*70)
all_articles = []
page = 1
while True:
# 获取文章列表
result = self.fetch_articles_list(account_data, page=page, page_size=100)
if not result:
break
data = result.get('data', {})
articles_list = data.get('list', [])
if not articles_list:
print(f" [i] 第{page}页无数据,停止翻页")
break
print(f" [OK] 第{page}页获取到 {len(articles_list)} 篇文章")
# 遍历文章,获取详情
for idx, article_brief in enumerate(articles_list, 1):
article_id = article_brief.get('article_id', '')
if not article_id:
continue
print(f" [{idx}/{len(articles_list)}] 正在获取文章详情: {article_id}")
# 获取文章详情
article_detail = self.fetch_article_detail(account_data, article_id)
if article_detail:
# 解析文章数据
parsed_data = self.parse_article_data(article_detail, account_data)
if parsed_data:
all_articles.append(parsed_data)
print(f" 标题: {parsed_data['title']}")
print(f" 状态: {parsed_data['status']}")
# 避免请求过快
time.sleep(0.5)
# 检查是否还有下一页
has_more = data.get('has_more', False)
if not has_more:
print(f" [i] 已到最后一页")
break
page += 1
time.sleep(1)
print(f"\n[OK] 账号 {account_name} 共抓取 {len(all_articles)} 篇文章")
return all_articles
def save_to_csv(self, articles: List[Dict]) -> bool:
"""保存文章数据到CSV文件
Args:
articles: 文章数据列表
Returns:
是否成功
"""
try:
if not articles:
print("[!] 没有文章数据需要保存")
return False
# CSV表头对应ai_articles表的主要字段
fieldnames = [
'baijiahao_id',
'author_id',
'author_name',
'title',
'content',
'category',
'channel',
'status',
'word_count',
'image_count',
'publish_time',
'created_at',
'baijiahao_status'
]
print(f"\n[>] 正在保存到CSV文件: {self.output_csv}")
with open(self.output_csv, 'w', encoding='utf-8-sig', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for article in articles:
# 只写入指定的字段
row = {key: article.get(key, '') for key in fieldnames}
writer.writerow(row)
print(f"[OK] 成功保存 {len(articles)} 篇文章到CSV文件")
self.logger.info(f"成功保存 {len(articles)} 篇文章到 {self.output_csv}")
return True
except Exception as e:
print(f"[X] 保存CSV失败: {e}")
self.logger.error(f"保存CSV失败: {e}", exc_info=True)
return False
def run(self):
"""运行爬虫"""
print("\n" + "="*70)
print("百家号文章数据抓取工具")
print("="*70)
print(f"抓取日期: {datetime.now().strftime('%Y-%m-%d')}")
print(f"账号数量: {len(self.account_cookies)}")
print("="*70)
if not self.account_cookies:
print("[X] 没有可用的账号Cookie退出")
return
all_articles = []
# 遍历所有账号
for account_name, account_data in self.account_cookies.items():
try:
articles = self.crawl_account_articles(account_name, account_data)
all_articles.extend(articles)
except Exception as e:
print(f"[X] 抓取账号 {account_name} 失败: {e}")
self.logger.error(f"抓取账号 {account_name} 失败: {e}", exc_info=True)
continue
# 保存到CSV
if all_articles:
self.save_to_csv(all_articles)
else:
print("\n[!] 今日暂无文章数据")
print("\n" + "="*70)
print(f"抓取完成!共抓取 {len(all_articles)} 篇文章")
print("="*70)
if __name__ == '__main__':
# 默认模式从数据库加载Cookie
# crawler = BaijiahaoArticlesCrawler(load_from_db=True)
# 测试模式从JSON文件加载Cookie方便测试会尝试查询author_id
crawler = BaijiahaoArticlesCrawler(load_from_json=True)
# 兼容旧模式从JSON文件加载Cookie不查询author_id
# crawler = BaijiahaoArticlesCrawler(load_from_db=False)
crawler.run()