Files
ai_baijiahao/baidu_api.py

634 lines
26 KiB
Python
Raw Permalink Normal View History

import asyncio
import json
import random
import time
from typing import Dict, Any, Optional
from urllib.parse import quote
import aiohttp
from playwright.async_api import async_playwright
from fake_useragent import UserAgent
import logging
from test2 import display_simple_data
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class BaiduBJHSpider:
def __init__(self, use_proxy: bool = False, proxy_api_url: str = None, proxy_username: str = None, proxy_password: str = None):
self.ua = UserAgent()
self.use_proxy = use_proxy
self.proxy_api_url = proxy_api_url or 'http://api.tianqiip.com/getip?secret=lu29e593&num=1&type=txt&port=1&mr=1&sign=4b81a62eaed89ba802a8f34053e2c964'
self.proxy_username = proxy_username
self.proxy_password = proxy_password
self.current_proxy = None
self.session_cookie = None
def get_proxy(self):
"""从代理池获取一个代理IP"""
if not self.use_proxy:
return None
try:
import requests
logger.info(f"从代理池获取IP: {self.proxy_api_url}")
response = requests.get(self.proxy_api_url, timeout=5) # 优化超时为5秒
content = response.content.decode("utf-8").strip()
logger.info(f"提取代理IP: {content}")
if ':' in content:
ip, port = content.strip().split(":", 1)
# 如果有认证信息添加到代理URL中
if self.proxy_username and self.proxy_password:
proxy_url = f"http://{self.proxy_username}:{self.proxy_password}@{ip}:{port}"
logger.info(f"代理配置成功(带认证): http://{self.proxy_username}:****@{ip}:{port}")
else:
proxy_url = f"http://{ip}:{port}"
logger.info(f"代理配置成功: {proxy_url}")
self.current_proxy = proxy_url
return proxy_url
else:
logger.error("代理IP格式错误")
return None
except Exception as e:
logger.error(f"获取代理IP失败: {e}")
return None
async def init_browser(self):
"""初始化浏览器环境获取Cookie"""
playwright = await async_playwright().start()
# 配置浏览器参数
browser_args = [
'--disable-blink-features=AutomationControlled',
'--disable-web-security',
'--disable-features=IsolateOrigins,site-per-process',
'--no-sandbox',
'--disable-setuid-sandbox',
]
# 启动浏览器
browser = await playwright.chromium.launch(
headless=True, # 设置为True可以无头模式运行
args=browser_args
)
# 创建上下文
context = await browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent=self.ua.random,
locale='zh-CN',
timezone_id='Asia/Shanghai'
)
# 设置额外的HTTP头
await context.set_extra_http_headers({
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
})
page = await context.new_page()
# 首先访问百度首页获取基础Cookie
await page.goto('https://www.baidu.com', wait_until='networkidle')
await asyncio.sleep(random.uniform(2, 4))
# 访问百家号页面
await page.goto('https://baijiahao.baidu.com/', wait_until='networkidle')
await asyncio.sleep(random.uniform(3, 5))
# 获取Cookie
cookies = await context.cookies()
self.session_cookie = '; '.join([f"{c['name']}={c['value']}" for c in cookies])
logger.info(f"获取到Cookie: {self.session_cookie[:50]}...")
await browser.close()
await playwright.stop()
return cookies
def build_headers(self, referer: str = "https://baijiahao.baidu.com/") -> Dict:
"""构建请求头"""
timestamp = int(time.time() * 1000)
headers = {
'User-Agent': self.ua.random,
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Referer': referer,
'Connection': 'keep-alive',
'Sec-Fetch-Dest': 'script',
'Sec-Fetch-Mode': 'no-cors',
'Sec-Fetch-Site': 'same-site',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
}
if self.session_cookie:
headers['Cookie'] = self.session_cookie
return headers
def generate_callback_name(self) -> str:
"""生成随机的callback函数名"""
timestamp = int(time.time() * 1000)
return f"__jsonp{timestamp}"
async def fetch_data_directly(self, uk: str = "ntHidnLhrlfclJar2z8wBg", use_browser: bool = False, num: int = 10,
ctime: str = None) -> Optional[Dict]:
"""直接请求接口(可能需要多次尝试)
Args:
uk: 作者UK
use_browser: 是否使用浏览器获取Cookie默认False不启动浏览器
num: 请求数据条数API固定为10
ctime: 分页参数上一次请求返回的query.ctime值
"""
# 只在use_browser=True时才初始化浏览器获取Cookie
if use_browser:
await self.init_browser()
# 如果启用代理必须先获取一个代理IP失败则抛出异常不使用本机IP
if self.use_proxy:
if not self.current_proxy:
proxy = self.get_proxy()
if not proxy:
raise Exception("启用了代理但无法获取代理IP拒绝使用本机IP")
async with aiohttp.ClientSession() as session:
for attempt in range(10): # 增加到10次重试应对IP池限流
try:
callback_name = self.generate_callback_name()
timestamp = int(time.time() * 1000)
# 构建URL参数
params = {
'tab': 'main',
'num': '10', # API固定为10
'uk': uk,
'source': 'pc',
'type': 'newhome',
'action': 'dynamic',
'format': 'jsonp',
'callback': callback_name,
'otherext': f'h5_{time.strftime("%Y%m%d%H%M%S")}',
'Tenger-Mhor': str(timestamp),
'_': str(timestamp) # 添加时间戳参数
}
# 如果有ctime参数,添加到请求中(用于分页)
if ctime:
params['ctime'] = ctime
url = "https://mbd.baidu.com/webpage"
headers = self.build_headers()
logger.info(f"尝试第{attempt + 1}次请求URL: {url}")
# 准备请求参数
request_kwargs = {
'params': params,
'headers': headers,
'timeout': aiohttp.ClientTimeout(total=30)
}
# 如果使用代理,添加代理配置(必须有代理才请求)
if self.use_proxy:
if not self.current_proxy:
raise Exception("启用了代理但当前无代理IP拒绝使用本机IP")
logger.info(f"使用代理: {self.current_proxy}")
request_kwargs['proxy'] = self.current_proxy
async with session.get(url, **request_kwargs) as response:
text = await response.text()
# 提取JSONP数据
if text.startswith(callback_name + '(') and text.endswith(')'):
json_str = text[len(callback_name) + 1:-1]
data = json.loads(json_str)
# 检查是否被反爬
if data.get('data', {}).get('foe', {}).get('is_need_foe') == True:
logger.warning(f"检测到反爬标识(is_need_foe=True),尝试第{attempt + 1}")
# 如果启用了代理立即切换IP
if self.use_proxy:
logger.info("检测到反爬立即切换代理IP无需等待")
self.get_proxy()
# 继续重试
if attempt < 9: # 还有重试机会总共10次
continue
return data
except aiohttp.ClientConnectorError as e:
logger.error(f"❌ 网络连接失败 (尝试{attempt + 1}/10): {type(e).__name__} - {str(e)[:100]}")
if self.use_proxy:
logger.info("🔄 网络错误立即切换代理IP")
self.get_proxy()
except asyncio.TimeoutError as e:
logger.error(f"❌ 请求超时 (尝试{attempt + 1}/10): 代理响应超过30秒")
if self.use_proxy:
logger.info("🔄 超时立即切换代理IP")
self.get_proxy()
except aiohttp.ClientProxyConnectionError as e:
logger.error(f"❌ 代理连接失败 (尝试{attempt + 1}/10): {e}")
# 代理失败,立即重新获取
if self.use_proxy:
logger.info("🔄 代理失败立即切换代理IP无需等待")
self.get_proxy()
# 代理错误不需要等待,直接重试
except aiohttp.ClientResponseError as e:
# 检查是否是407错误代理认证失败/IP池限流
if e.status == 407:
logger.warning(f"检测到407错误代理IP池限流等待10秒后重新获取IP...")
await asyncio.sleep(10) # 增加到10秒给IP池缓冲时间
if self.use_proxy:
logger.info("重新获取代理IP...")
self.get_proxy()
# 继续重试
else:
logger.error(f"❌ HTTP错误 (尝试{attempt + 1}/10): {e.status}, {e.message}")
await asyncio.sleep(random.uniform(1, 2))
except Exception as e:
logger.error(f"❌ 未知错误 (尝试{attempt + 1}/10): {type(e).__name__} - {str(e)[:100]}")
await asyncio.sleep(random.uniform(1, 2)) # 减少到1-2秒
# 10次重试全部失败
logger.error("请求失败已经重试10次仍然失败可能是IP池限流或网络问题")
return None
async def fetch_via_browser(self, uk: str = "ntHidnLhrlfclJar2z8wBg") -> Optional[Dict]:
"""通过浏览器直接执行获取数据(最可靠的方法)"""
playwright = await async_playwright().start()
try:
browser = await playwright.chromium.launch(
headless=False, # 调试时可设为False
args=[
'--disable-blink-features=AutomationControlled',
'--no-sandbox'
]
)
context = await browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent=self.ua.random,
locale='zh-CN'
)
page = await context.new_page()
# 监听网络请求
results = []
def handle_response(response):
if "mbd.baidu.com/webpage" in response.url and "format=jsonp" in response.url:
try:
# 尝试提取JSONP数据
text = response.text()
if "callback=" in response.url:
# 从URL提取callback名称
import re
match = re.search(r'callback=([^&]+)', response.url)
if match:
callback = match.group(1)
if text.startswith(callback + '(') and text.endswith(')'):
json_str = text[len(callback) + 1:-1]
data = json.loads(json_str)
results.append(data)
except:
pass
page.on("response", handle_response)
# 访问百家号页面
await page.goto(f"https://baijiahao.baidu.com/u?app_id={uk}", wait_until='networkidle')
# 模拟用户滚动
for _ in range(3):
await page.evaluate("window.scrollBy(0, window.innerHeight)")
await asyncio.sleep(random.uniform(1, 2))
# 等待数据加载
await asyncio.sleep(5)
await browser.close()
if results:
return results[0]
except Exception as e:
logger.error(f"浏览器方式获取失败: {e}")
finally:
await playwright.stop()
return None
async def fetch_with_signature(self, uk: str = "ntHidnLhrlfclJar2z8wBg") -> Optional[Dict]:
"""尝试使用签名参数请求"""
# 百度接口可能需要特定的签名参数
# 这里需要分析JavaScript找到签名算法
async with aiohttp.ClientSession() as session:
# 先获取必要的token
token_url = "https://mbd.baidu.com/staticx/search/dynamic/config"
headers = {
'User-Agent': self.ua.random,
'Referer': 'https://baijiahao.baidu.com/',
}
try:
# 获取配置信息
async with session.get(token_url, headers=headers) as resp:
config_text = await resp.text()
logger.info(f"配置响应: {config_text[:200]}")
# 构建完整请求
timestamp = int(time.time() * 1000)
params = {
'tab': 'main',
'num': '10',
'uk': uk,
'source': 'pc',
'type': 'newhome',
'action': 'dynamic',
'format': 'json',
't': str(timestamp),
'callback': f'__jsonp{timestamp}',
}
# 尝试JSON格式非JSONP
params['format'] = 'json'
del params['callback']
url = "https://mbd.baidu.com/webpage"
async with session.get(url, params=params, headers=headers) as response:
text = await response.text()
logger.info(f"JSON响应: {text[:500]}")
try:
return json.loads(text)
except:
return None
except Exception as e:
logger.error(f"签名方式失败: {e}")
return None
async def fetch_baidu_data(uk: str = "ntHidnLhrlfclJar2z8wBg", months: int = 6, use_proxy: bool = False, proxy_api_url: str = None,
on_page_fetched=None, start_page: int = 1, start_ctime: str = None) -> Optional[Dict]:
"""获取百家号数据的主函数
Args:
uk: 作者UK
months: 获取近几个月的数据默认6个月支持小数如0.33代衡10天
use_proxy: 是否启用代理IP池
proxy_api_url: 代理API地址留空使用默认
on_page_fetched: 回调函数每页数据抽取后调用signature: (page, items, ctime) -> None
start_page: 起始页码断点续传
start_ctime: 起始分页参数断点续传
"""
from datetime import datetime, timedelta
import re
spider = BaiduBJHSpider(use_proxy=use_proxy, proxy_api_url=proxy_api_url)
# 计算目标日期(支持小数月份)
days = int(months * 30)
target_date = datetime.now() - timedelta(days=days)
# 日志输出优化
if months < 1:
logger.info(f"开始获取百家号数据(近{days}天, 目标日期: {target_date.strftime('%Y-%m-%d')})")
else:
logger.info(f"开始获取百家号数据(近{int(months)}个月, 目标日期: {target_date.strftime('%Y-%m-%d')})")
# 先获取第一页数据(每次固定10条)
# 注意:不再使用 all_articles 累加,每页直接通过回调保存
page = start_page # 支持从指定页码开始
current_ctime = start_ctime # 支持使用之前的分页参数
# 如果是断点续传直接跳过第一页使用保存的ctime
if start_page > 1 and start_ctime:
logger.info(f"断点续传:从第{start_page}页开始ctime={start_ctime}")
data = None # 不需要第一页的data结构
else:
# 优化: 直接请求API,只有失败时才启动浏览器
logger.info("尝试直接请求API(不启动浏览器)...")
data = await spider.fetch_data_directly(uk, use_browser=False, ctime=current_ctime)
# 如果第一次失败,再启动浏览器重试
if not data or not data.get('data', {}).get('list'):
if start_page == 1: # 只有第一页才需要浏览器重试
logger.warning("直接请求失败,启动浏览器获取Cookie...")
# 打印第一次请求的返回数据
if data:
logger.warning(f"第一次请求返回数据: {json.dumps(data, ensure_ascii=False, indent=2)}")
else:
logger.warning("第一次请求返回数据: None")
data = await spider.fetch_data_directly(uk, use_browser=True)
if not data or not data.get('data', {}).get('list'):
logger.error("启动浏览器后仍然失败,放弃")
# 打印最终的返回数据
if data:
logger.error(f"最终返回数据: {json.dumps(data, ensure_ascii=False, indent=2)}")
else:
logger.error("最终返回数据: None")
return None
# 第一次请求成功,处理数据(只有非断点续传时)
if data and data.get('data', {}).get('list'):
items = data.get('data', {}).get('list', [])
logger.info(f"{page}页获取成功,数据条数: {len(items)}")
# 调用回调保存第一页数据
if on_page_fetched:
on_page_fetched(page, items, current_ctime)
# 提取第一页的ctime用于分页 - 注意路径是 data.data.query.ctime
current_ctime = data.get('data', {}).get('query', {}).get('ctime', current_ctime)
if current_ctime:
logger.info(f"获取到分页参数 ctime={current_ctime}")
else:
logger.warning("未获取到ctime分页参数")
# 使用ctime(Unix时间戳)进行时间判断,更准确
def get_article_datetime(item_data: dict) -> datetime:
"""从ittemData中提取文章时间
优先使用ctime(Unix时间戳),更准确
"""
# 优先使用ctime(秒级Unix时间戳)
if 'ctime' in item_data and item_data['ctime']:
try:
timestamp = int(item_data['ctime'])
return datetime.fromtimestamp(timestamp)
except:
pass
# 备用: 使用time字段(相对时间或绝对时间)
time_str = item_data.get('time', '')
if not time_str:
return datetime.now()
now = datetime.now()
if '分钟前' in time_str:
minutes = int(re.search(r'(\d+)', time_str).group(1))
return now - timedelta(minutes=minutes)
elif '小时前' in time_str:
hours = int(re.search(r'(\d+)', time_str).group(1))
return now - timedelta(hours=hours)
elif '天前' in time_str or '昨天' in time_str:
if '昨天' in time_str:
days = 1
else:
days = int(re.search(r'(\d+)', time_str).group(1))
return now - timedelta(days=days)
elif '-' in time_str: # 绝对时间格式
try:
return datetime.strptime(time_str, '%Y-%m-%d %H:%M')
except:
try:
return datetime.strptime(time_str, '%Y-%m-%d')
except:
return now
return now
# 检查最后一篇文章的时间,判断是否需要继续请求
need_more = True
if data and data.get('data', {}).get('list'):
items = data.get('data', {}).get('list', [])
if items:
last_item = items[-1]
item_data = last_item.get('itemData', {})
article_date = get_article_datetime(item_data)
logger.info(f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}")
if article_date < target_date:
need_more = False
if months < 1:
logger.info(
f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}, 已超出{days}天范围,停止请求")
else:
logger.info(
f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}, 已超出{int(months)}个月范围,停止请求")
else:
need_more = False
elif start_page > 1:
# 断点续传时,默认需要继续
need_more = True
else:
need_more = False
# 循环请求后续页面,直到达到目标日期或无数据(不限制页数)
while need_more:
page += 1
logger.info(f"需要更多数据,请求第{page}页...")
# 优化使用随机延迟8-12秒避免被识别为机器行为
delay = random.uniform(8, 12)
logger.info(f"等待 {delay:.1f} 秒后请求...")
await asyncio.sleep(delay)
# 继续请求(使用上一次返回的ctime作为分页参数)
next_data = await spider.fetch_data_directly(uk, use_browser=False, ctime=current_ctime)
# fetch_data_directly已经处理了反爬检测和重试这里只需检查是否成功获取数据
if not next_data or not next_data.get('data', {}).get('list'):
# 如果还是失败,检查是否因为反爬
if next_data and next_data.get('data', {}).get('foe', {}).get('is_need_foe') == True:
logger.error(f"{page}页多次重试后仍然触发反爬,停止请求")
logger.error(f"返回数据: {json.dumps(next_data, ensure_ascii=False, indent=2)}")
else:
logger.warning(f"{page}页无数据,停止请求")
# 打印完整的返回结果以便调试
if next_data:
logger.warning(f"返回数据: {json.dumps(next_data, ensure_ascii=False, indent=2)}")
else:
logger.warning("返回数据: None")
break
next_items = next_data.get('data', {}).get('list', [])
logger.info(f"{page}页获取成功,数据条数: {len(next_items)}")
# 调用回调保存这一页数据
if on_page_fetched:
on_page_fetched(page, next_items, current_ctime)
# 更新ctime为下一次请求做准备 - 注意路径是 data.data.query.ctime
current_ctime = next_data.get('data', {}).get('query', {}).get('ctime', current_ctime)
if current_ctime:
logger.info(f"更新分页参数 ctime={current_ctime}")
# 检查最后一篇文章的时间
if next_items:
last_item = next_items[-1]
item_data = last_item.get('itemData', {})
article_date = get_article_datetime(item_data)
logger.info(f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}")
if article_date < target_date:
need_more = False
if months < 1:
logger.info(
f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}, 已超出{days}天范围,停止请求")
else:
logger.info(
f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}, 已超出{int(months)}个月范围,停止请求")
else:
need_more = False
# 返回最后的分页信息(用于断点续传)
result = {
'last_page': page,
'last_ctime': current_ctime,
'completed': not need_more # 是否已完成
}
logger.info(f"抓取完成,最后页码: {page}, ctime: {current_ctime}")
return result
# 同步包装函数(便于在同步代码中调用)
def get_baidu_data_sync(uk: str = "ntHidnLhrlfclJar2z8wBg", months: int = 6, use_proxy: bool = False,
proxy_api_url: str = None, on_page_fetched=None,
start_page: int = 1, start_ctime: str = None) -> Optional[Dict]:
"""同步方式获取数据
Args:
uk: 作者UK
months: 获取近几个月的数据默认6个月
use_proxy: 是否启用代理IP池
proxy_api_url: 代理API地址留空使用默认
on_page_fetched: 回调函数每页数据抽取后调用
start_page: 起始页码断点续传
start_ctime: 起始分页参数断点续传
"""
return asyncio.run(fetch_baidu_data(uk, months, use_proxy, proxy_api_url,
on_page_fetched, start_page, start_ctime))
# 保留原有的main函数用于测试
async def main():
data = await fetch_baidu_data()
if data:
print(json.dumps(data, ensure_ascii=False, indent=2))
from test2 import display_simple_data
display_simple_data(data)
if __name__ == "__main__":
asyncio.run(main())