634 lines
26 KiB
Python
634 lines
26 KiB
Python
|
|
import asyncio
|
|||
|
|
import json
|
|||
|
|
import random
|
|||
|
|
import time
|
|||
|
|
from typing import Dict, Any, Optional
|
|||
|
|
from urllib.parse import quote
|
|||
|
|
|
|||
|
|
import aiohttp
|
|||
|
|
from playwright.async_api import async_playwright
|
|||
|
|
from fake_useragent import UserAgent
|
|||
|
|
import logging
|
|||
|
|
|
|||
|
|
from test2 import display_simple_data
|
|||
|
|
|
|||
|
|
logging.basicConfig(level=logging.INFO)
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class BaiduBJHSpider:
|
|||
|
|
def __init__(self, use_proxy: bool = False, proxy_api_url: str = None, proxy_username: str = None, proxy_password: str = None):
|
|||
|
|
self.ua = UserAgent()
|
|||
|
|
self.use_proxy = use_proxy
|
|||
|
|
self.proxy_api_url = proxy_api_url or 'http://api.tianqiip.com/getip?secret=lu29e593&num=1&type=txt&port=1&mr=1&sign=4b81a62eaed89ba802a8f34053e2c964'
|
|||
|
|
self.proxy_username = proxy_username
|
|||
|
|
self.proxy_password = proxy_password
|
|||
|
|
self.current_proxy = None
|
|||
|
|
self.session_cookie = None
|
|||
|
|
|
|||
|
|
def get_proxy(self):
|
|||
|
|
"""从代理池获取一个代理IP"""
|
|||
|
|
if not self.use_proxy:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
import requests
|
|||
|
|
logger.info(f"从代理池获取IP: {self.proxy_api_url}")
|
|||
|
|
response = requests.get(self.proxy_api_url, timeout=5) # 优化超时为5秒
|
|||
|
|
content = response.content.decode("utf-8").strip()
|
|||
|
|
logger.info(f"提取代理IP: {content}")
|
|||
|
|
|
|||
|
|
if ':' in content:
|
|||
|
|
ip, port = content.strip().split(":", 1)
|
|||
|
|
|
|||
|
|
# 如果有认证信息,添加到代理URL中
|
|||
|
|
if self.proxy_username and self.proxy_password:
|
|||
|
|
proxy_url = f"http://{self.proxy_username}:{self.proxy_password}@{ip}:{port}"
|
|||
|
|
logger.info(f"代理配置成功(带认证): http://{self.proxy_username}:****@{ip}:{port}")
|
|||
|
|
else:
|
|||
|
|
proxy_url = f"http://{ip}:{port}"
|
|||
|
|
logger.info(f"代理配置成功: {proxy_url}")
|
|||
|
|
|
|||
|
|
self.current_proxy = proxy_url
|
|||
|
|
return proxy_url
|
|||
|
|
else:
|
|||
|
|
logger.error("代理IP格式错误")
|
|||
|
|
return None
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"获取代理IP失败: {e}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
async def init_browser(self):
|
|||
|
|
"""初始化浏览器环境获取Cookie"""
|
|||
|
|
playwright = await async_playwright().start()
|
|||
|
|
|
|||
|
|
# 配置浏览器参数
|
|||
|
|
browser_args = [
|
|||
|
|
'--disable-blink-features=AutomationControlled',
|
|||
|
|
'--disable-web-security',
|
|||
|
|
'--disable-features=IsolateOrigins,site-per-process',
|
|||
|
|
'--no-sandbox',
|
|||
|
|
'--disable-setuid-sandbox',
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 启动浏览器
|
|||
|
|
browser = await playwright.chromium.launch(
|
|||
|
|
headless=True, # 设置为True可以无头模式运行
|
|||
|
|
args=browser_args
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 创建上下文
|
|||
|
|
context = await browser.new_context(
|
|||
|
|
viewport={'width': 1920, 'height': 1080},
|
|||
|
|
user_agent=self.ua.random,
|
|||
|
|
locale='zh-CN',
|
|||
|
|
timezone_id='Asia/Shanghai'
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 设置额外的HTTP头
|
|||
|
|
await context.set_extra_http_headers({
|
|||
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
|||
|
|
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
|
|||
|
|
'Accept-Encoding': 'gzip, deflate, br',
|
|||
|
|
'Connection': 'keep-alive',
|
|||
|
|
'Upgrade-Insecure-Requests': '1',
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
page = await context.new_page()
|
|||
|
|
|
|||
|
|
# 首先访问百度首页获取基础Cookie
|
|||
|
|
await page.goto('https://www.baidu.com', wait_until='networkidle')
|
|||
|
|
await asyncio.sleep(random.uniform(2, 4))
|
|||
|
|
|
|||
|
|
# 访问百家号页面
|
|||
|
|
await page.goto('https://baijiahao.baidu.com/', wait_until='networkidle')
|
|||
|
|
await asyncio.sleep(random.uniform(3, 5))
|
|||
|
|
|
|||
|
|
# 获取Cookie
|
|||
|
|
cookies = await context.cookies()
|
|||
|
|
self.session_cookie = '; '.join([f"{c['name']}={c['value']}" for c in cookies])
|
|||
|
|
|
|||
|
|
logger.info(f"获取到Cookie: {self.session_cookie[:50]}...")
|
|||
|
|
|
|||
|
|
await browser.close()
|
|||
|
|
await playwright.stop()
|
|||
|
|
|
|||
|
|
return cookies
|
|||
|
|
|
|||
|
|
def build_headers(self, referer: str = "https://baijiahao.baidu.com/") -> Dict:
|
|||
|
|
"""构建请求头"""
|
|||
|
|
timestamp = int(time.time() * 1000)
|
|||
|
|
|
|||
|
|
headers = {
|
|||
|
|
'User-Agent': self.ua.random,
|
|||
|
|
'Accept': '*/*',
|
|||
|
|
'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
|
|||
|
|
'Accept-Encoding': 'gzip, deflate, br',
|
|||
|
|
'Referer': referer,
|
|||
|
|
'Connection': 'keep-alive',
|
|||
|
|
'Sec-Fetch-Dest': 'script',
|
|||
|
|
'Sec-Fetch-Mode': 'no-cors',
|
|||
|
|
'Sec-Fetch-Site': 'same-site',
|
|||
|
|
'Pragma': 'no-cache',
|
|||
|
|
'Cache-Control': 'no-cache',
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if self.session_cookie:
|
|||
|
|
headers['Cookie'] = self.session_cookie
|
|||
|
|
|
|||
|
|
return headers
|
|||
|
|
|
|||
|
|
def generate_callback_name(self) -> str:
|
|||
|
|
"""生成随机的callback函数名"""
|
|||
|
|
timestamp = int(time.time() * 1000)
|
|||
|
|
return f"__jsonp{timestamp}"
|
|||
|
|
|
|||
|
|
async def fetch_data_directly(self, uk: str = "ntHidnLhrlfclJar2z8wBg", use_browser: bool = False, num: int = 10,
|
|||
|
|
ctime: str = None) -> Optional[Dict]:
|
|||
|
|
"""直接请求接口(可能需要多次尝试)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
uk: 作者UK
|
|||
|
|
use_browser: 是否使用浏览器获取Cookie,默认False不启动浏览器
|
|||
|
|
num: 请求数据条数,API固定为10
|
|||
|
|
ctime: 分页参数,上一次请求返回的query.ctime值
|
|||
|
|
"""
|
|||
|
|
# 只在use_browser=True时才初始化浏览器获取Cookie
|
|||
|
|
if use_browser:
|
|||
|
|
await self.init_browser()
|
|||
|
|
|
|||
|
|
# 如果启用代理,必须先获取一个代理IP(失败则抛出异常,不使用本机IP)
|
|||
|
|
if self.use_proxy:
|
|||
|
|
if not self.current_proxy:
|
|||
|
|
proxy = self.get_proxy()
|
|||
|
|
if not proxy:
|
|||
|
|
raise Exception("启用了代理但无法获取代理IP,拒绝使用本机IP")
|
|||
|
|
|
|||
|
|
async with aiohttp.ClientSession() as session:
|
|||
|
|
for attempt in range(10): # 增加到10次重试,应对IP池限流
|
|||
|
|
try:
|
|||
|
|
callback_name = self.generate_callback_name()
|
|||
|
|
timestamp = int(time.time() * 1000)
|
|||
|
|
|
|||
|
|
# 构建URL参数
|
|||
|
|
params = {
|
|||
|
|
'tab': 'main',
|
|||
|
|
'num': '10', # API固定为10
|
|||
|
|
'uk': uk,
|
|||
|
|
'source': 'pc',
|
|||
|
|
'type': 'newhome',
|
|||
|
|
'action': 'dynamic',
|
|||
|
|
'format': 'jsonp',
|
|||
|
|
'callback': callback_name,
|
|||
|
|
'otherext': f'h5_{time.strftime("%Y%m%d%H%M%S")}',
|
|||
|
|
'Tenger-Mhor': str(timestamp),
|
|||
|
|
'_': str(timestamp) # 添加时间戳参数
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 如果有ctime参数,添加到请求中(用于分页)
|
|||
|
|
if ctime:
|
|||
|
|
params['ctime'] = ctime
|
|||
|
|
|
|||
|
|
url = "https://mbd.baidu.com/webpage"
|
|||
|
|
headers = self.build_headers()
|
|||
|
|
|
|||
|
|
logger.info(f"尝试第{attempt + 1}次请求,URL: {url}")
|
|||
|
|
|
|||
|
|
# 准备请求参数
|
|||
|
|
request_kwargs = {
|
|||
|
|
'params': params,
|
|||
|
|
'headers': headers,
|
|||
|
|
'timeout': aiohttp.ClientTimeout(total=30)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 如果使用代理,添加代理配置(必须有代理才请求)
|
|||
|
|
if self.use_proxy:
|
|||
|
|
if not self.current_proxy:
|
|||
|
|
raise Exception("启用了代理但当前无代理IP,拒绝使用本机IP")
|
|||
|
|
logger.info(f"使用代理: {self.current_proxy}")
|
|||
|
|
request_kwargs['proxy'] = self.current_proxy
|
|||
|
|
|
|||
|
|
async with session.get(url, **request_kwargs) as response:
|
|||
|
|
text = await response.text()
|
|||
|
|
# 提取JSONP数据
|
|||
|
|
if text.startswith(callback_name + '(') and text.endswith(')'):
|
|||
|
|
json_str = text[len(callback_name) + 1:-1]
|
|||
|
|
data = json.loads(json_str)
|
|||
|
|
|
|||
|
|
# 检查是否被反爬
|
|||
|
|
if data.get('data', {}).get('foe', {}).get('is_need_foe') == True:
|
|||
|
|
logger.warning(f"检测到反爬标识(is_need_foe=True),尝试第{attempt + 1}次")
|
|||
|
|
# 如果启用了代理,立即切换IP
|
|||
|
|
if self.use_proxy:
|
|||
|
|
logger.info("检测到反爬,立即切换代理IP(无需等待)")
|
|||
|
|
self.get_proxy()
|
|||
|
|
# 继续重试
|
|||
|
|
if attempt < 9: # 还有重试机会(总共10次)
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
return data
|
|||
|
|
|
|||
|
|
except aiohttp.ClientConnectorError as e:
|
|||
|
|
logger.error(f"❌ 网络连接失败 (尝试{attempt + 1}/10): {type(e).__name__} - {str(e)[:100]}")
|
|||
|
|
if self.use_proxy:
|
|||
|
|
logger.info("🔄 网络错误,立即切换代理IP")
|
|||
|
|
self.get_proxy()
|
|||
|
|
except asyncio.TimeoutError as e:
|
|||
|
|
logger.error(f"❌ 请求超时 (尝试{attempt + 1}/10): 代理响应超过30秒")
|
|||
|
|
if self.use_proxy:
|
|||
|
|
logger.info("🔄 超时,立即切换代理IP")
|
|||
|
|
self.get_proxy()
|
|||
|
|
except aiohttp.ClientProxyConnectionError as e:
|
|||
|
|
logger.error(f"❌ 代理连接失败 (尝试{attempt + 1}/10): {e}")
|
|||
|
|
# 代理失败,立即重新获取
|
|||
|
|
if self.use_proxy:
|
|||
|
|
logger.info("🔄 代理失败,立即切换代理IP(无需等待)")
|
|||
|
|
self.get_proxy()
|
|||
|
|
# 代理错误不需要等待,直接重试
|
|||
|
|
except aiohttp.ClientResponseError as e:
|
|||
|
|
# 检查是否是407错误(代理认证失败/IP池限流)
|
|||
|
|
if e.status == 407:
|
|||
|
|
logger.warning(f"检测到407错误(代理IP池限流),等待10秒后重新获取IP...")
|
|||
|
|
await asyncio.sleep(10) # 增加到10秒,给IP池缓冲时间
|
|||
|
|
if self.use_proxy:
|
|||
|
|
logger.info("重新获取代理IP...")
|
|||
|
|
self.get_proxy()
|
|||
|
|
# 继续重试
|
|||
|
|
else:
|
|||
|
|
logger.error(f"❌ HTTP错误 (尝试{attempt + 1}/10): {e.status}, {e.message}")
|
|||
|
|
await asyncio.sleep(random.uniform(1, 2))
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"❌ 未知错误 (尝试{attempt + 1}/10): {type(e).__name__} - {str(e)[:100]}")
|
|||
|
|
await asyncio.sleep(random.uniform(1, 2)) # 减少到1-2秒
|
|||
|
|
|
|||
|
|
# 10次重试全部失败
|
|||
|
|
logger.error("请求失败:已经重试10次仍然失败,可能是IP池限流或网络问题")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
async def fetch_via_browser(self, uk: str = "ntHidnLhrlfclJar2z8wBg") -> Optional[Dict]:
|
|||
|
|
"""通过浏览器直接执行获取数据(最可靠的方法)"""
|
|||
|
|
playwright = await async_playwright().start()
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
browser = await playwright.chromium.launch(
|
|||
|
|
headless=False, # 调试时可设为False
|
|||
|
|
args=[
|
|||
|
|
'--disable-blink-features=AutomationControlled',
|
|||
|
|
'--no-sandbox'
|
|||
|
|
]
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
context = await browser.new_context(
|
|||
|
|
viewport={'width': 1920, 'height': 1080},
|
|||
|
|
user_agent=self.ua.random,
|
|||
|
|
locale='zh-CN'
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
page = await context.new_page()
|
|||
|
|
|
|||
|
|
# 监听网络请求
|
|||
|
|
results = []
|
|||
|
|
|
|||
|
|
def handle_response(response):
|
|||
|
|
if "mbd.baidu.com/webpage" in response.url and "format=jsonp" in response.url:
|
|||
|
|
try:
|
|||
|
|
# 尝试提取JSONP数据
|
|||
|
|
text = response.text()
|
|||
|
|
if "callback=" in response.url:
|
|||
|
|
# 从URL提取callback名称
|
|||
|
|
import re
|
|||
|
|
match = re.search(r'callback=([^&]+)', response.url)
|
|||
|
|
if match:
|
|||
|
|
callback = match.group(1)
|
|||
|
|
if text.startswith(callback + '(') and text.endswith(')'):
|
|||
|
|
json_str = text[len(callback) + 1:-1]
|
|||
|
|
data = json.loads(json_str)
|
|||
|
|
results.append(data)
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
page.on("response", handle_response)
|
|||
|
|
|
|||
|
|
# 访问百家号页面
|
|||
|
|
await page.goto(f"https://baijiahao.baidu.com/u?app_id={uk}", wait_until='networkidle')
|
|||
|
|
|
|||
|
|
# 模拟用户滚动
|
|||
|
|
for _ in range(3):
|
|||
|
|
await page.evaluate("window.scrollBy(0, window.innerHeight)")
|
|||
|
|
await asyncio.sleep(random.uniform(1, 2))
|
|||
|
|
|
|||
|
|
# 等待数据加载
|
|||
|
|
await asyncio.sleep(5)
|
|||
|
|
|
|||
|
|
await browser.close()
|
|||
|
|
|
|||
|
|
if results:
|
|||
|
|
return results[0]
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"浏览器方式获取失败: {e}")
|
|||
|
|
finally:
|
|||
|
|
await playwright.stop()
|
|||
|
|
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
async def fetch_with_signature(self, uk: str = "ntHidnLhrlfclJar2z8wBg") -> Optional[Dict]:
|
|||
|
|
"""尝试使用签名参数请求"""
|
|||
|
|
# 百度接口可能需要特定的签名参数
|
|||
|
|
# 这里需要分析JavaScript找到签名算法
|
|||
|
|
|
|||
|
|
async with aiohttp.ClientSession() as session:
|
|||
|
|
# 先获取必要的token
|
|||
|
|
token_url = "https://mbd.baidu.com/staticx/search/dynamic/config"
|
|||
|
|
|
|||
|
|
headers = {
|
|||
|
|
'User-Agent': self.ua.random,
|
|||
|
|
'Referer': 'https://baijiahao.baidu.com/',
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 获取配置信息
|
|||
|
|
async with session.get(token_url, headers=headers) as resp:
|
|||
|
|
config_text = await resp.text()
|
|||
|
|
logger.info(f"配置响应: {config_text[:200]}")
|
|||
|
|
|
|||
|
|
# 构建完整请求
|
|||
|
|
timestamp = int(time.time() * 1000)
|
|||
|
|
params = {
|
|||
|
|
'tab': 'main',
|
|||
|
|
'num': '10',
|
|||
|
|
'uk': uk,
|
|||
|
|
'source': 'pc',
|
|||
|
|
'type': 'newhome',
|
|||
|
|
'action': 'dynamic',
|
|||
|
|
'format': 'json',
|
|||
|
|
't': str(timestamp),
|
|||
|
|
'callback': f'__jsonp{timestamp}',
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 尝试JSON格式(非JSONP)
|
|||
|
|
params['format'] = 'json'
|
|||
|
|
del params['callback']
|
|||
|
|
|
|||
|
|
url = "https://mbd.baidu.com/webpage"
|
|||
|
|
|
|||
|
|
async with session.get(url, params=params, headers=headers) as response:
|
|||
|
|
text = await response.text()
|
|||
|
|
logger.info(f"JSON响应: {text[:500]}")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
return json.loads(text)
|
|||
|
|
except:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"签名方式失败: {e}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def fetch_baidu_data(uk: str = "ntHidnLhrlfclJar2z8wBg", months: int = 6, use_proxy: bool = False, proxy_api_url: str = None,
|
|||
|
|
on_page_fetched=None, start_page: int = 1, start_ctime: str = None) -> Optional[Dict]:
|
|||
|
|
"""获取百家号数据的主函数
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
uk: 作者UK
|
|||
|
|
months: 获取近几个月的数据,默认6个月(支持小数,如0.33代衡10天)
|
|||
|
|
use_proxy: 是否启用代理IP池
|
|||
|
|
proxy_api_url: 代理API地址,留空使用默认
|
|||
|
|
on_page_fetched: 回调函数,每页数据抽取后调用,signature: (page, items, ctime) -> None
|
|||
|
|
start_page: 起始页码(断点续传)
|
|||
|
|
start_ctime: 起始分页参数(断点续传)
|
|||
|
|
"""
|
|||
|
|
from datetime import datetime, timedelta
|
|||
|
|
import re
|
|||
|
|
|
|||
|
|
spider = BaiduBJHSpider(use_proxy=use_proxy, proxy_api_url=proxy_api_url)
|
|||
|
|
|
|||
|
|
# 计算目标日期(支持小数月份)
|
|||
|
|
days = int(months * 30)
|
|||
|
|
target_date = datetime.now() - timedelta(days=days)
|
|||
|
|
|
|||
|
|
# 日志输出优化
|
|||
|
|
if months < 1:
|
|||
|
|
logger.info(f"开始获取百家号数据(近{days}天, 目标日期: {target_date.strftime('%Y-%m-%d')})")
|
|||
|
|
else:
|
|||
|
|
logger.info(f"开始获取百家号数据(近{int(months)}个月, 目标日期: {target_date.strftime('%Y-%m-%d')})")
|
|||
|
|
|
|||
|
|
# 先获取第一页数据(每次固定10条)
|
|||
|
|
# 注意:不再使用 all_articles 累加,每页直接通过回调保存
|
|||
|
|
page = start_page # 支持从指定页码开始
|
|||
|
|
current_ctime = start_ctime # 支持使用之前的分页参数
|
|||
|
|
|
|||
|
|
# 如果是断点续传,直接跳过第一页,使用保存的ctime
|
|||
|
|
if start_page > 1 and start_ctime:
|
|||
|
|
logger.info(f"断点续传:从第{start_page}页开始,ctime={start_ctime}")
|
|||
|
|
data = None # 不需要第一页的data结构
|
|||
|
|
else:
|
|||
|
|
# 优化: 直接请求API,只有失败时才启动浏览器
|
|||
|
|
logger.info("尝试直接请求API(不启动浏览器)...")
|
|||
|
|
data = await spider.fetch_data_directly(uk, use_browser=False, ctime=current_ctime)
|
|||
|
|
|
|||
|
|
# 如果第一次失败,再启动浏览器重试
|
|||
|
|
if not data or not data.get('data', {}).get('list'):
|
|||
|
|
if start_page == 1: # 只有第一页才需要浏览器重试
|
|||
|
|
logger.warning("直接请求失败,启动浏览器获取Cookie...")
|
|||
|
|
# 打印第一次请求的返回数据
|
|||
|
|
if data:
|
|||
|
|
logger.warning(f"第一次请求返回数据: {json.dumps(data, ensure_ascii=False, indent=2)}")
|
|||
|
|
else:
|
|||
|
|
logger.warning("第一次请求返回数据: None")
|
|||
|
|
|
|||
|
|
data = await spider.fetch_data_directly(uk, use_browser=True)
|
|||
|
|
|
|||
|
|
if not data or not data.get('data', {}).get('list'):
|
|||
|
|
logger.error("启动浏览器后仍然失败,放弃")
|
|||
|
|
# 打印最终的返回数据
|
|||
|
|
if data:
|
|||
|
|
logger.error(f"最终返回数据: {json.dumps(data, ensure_ascii=False, indent=2)}")
|
|||
|
|
else:
|
|||
|
|
logger.error("最终返回数据: None")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
# 第一次请求成功,处理数据(只有非断点续传时)
|
|||
|
|
if data and data.get('data', {}).get('list'):
|
|||
|
|
items = data.get('data', {}).get('list', [])
|
|||
|
|
logger.info(f"第{page}页获取成功,数据条数: {len(items)}")
|
|||
|
|
|
|||
|
|
# 调用回调保存第一页数据
|
|||
|
|
if on_page_fetched:
|
|||
|
|
on_page_fetched(page, items, current_ctime)
|
|||
|
|
|
|||
|
|
# 提取第一页的ctime用于分页 - 注意路径是 data.data.query.ctime
|
|||
|
|
current_ctime = data.get('data', {}).get('query', {}).get('ctime', current_ctime)
|
|||
|
|
if current_ctime:
|
|||
|
|
logger.info(f"获取到分页参数 ctime={current_ctime}")
|
|||
|
|
else:
|
|||
|
|
logger.warning("未获取到ctime分页参数")
|
|||
|
|
|
|||
|
|
# 使用ctime(Unix时间戳)进行时间判断,更准确
|
|||
|
|
def get_article_datetime(item_data: dict) -> datetime:
|
|||
|
|
"""从ittemData中提取文章时间
|
|||
|
|
|
|||
|
|
优先使用ctime(Unix时间戳),更准确
|
|||
|
|
"""
|
|||
|
|
# 优先使用ctime(秒级Unix时间戳)
|
|||
|
|
if 'ctime' in item_data and item_data['ctime']:
|
|||
|
|
try:
|
|||
|
|
timestamp = int(item_data['ctime'])
|
|||
|
|
return datetime.fromtimestamp(timestamp)
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
# 备用: 使用time字段(相对时间或绝对时间)
|
|||
|
|
time_str = item_data.get('time', '')
|
|||
|
|
if not time_str:
|
|||
|
|
return datetime.now()
|
|||
|
|
|
|||
|
|
now = datetime.now()
|
|||
|
|
if '分钟前' in time_str:
|
|||
|
|
minutes = int(re.search(r'(\d+)', time_str).group(1))
|
|||
|
|
return now - timedelta(minutes=minutes)
|
|||
|
|
elif '小时前' in time_str:
|
|||
|
|
hours = int(re.search(r'(\d+)', time_str).group(1))
|
|||
|
|
return now - timedelta(hours=hours)
|
|||
|
|
elif '天前' in time_str or '昨天' in time_str:
|
|||
|
|
if '昨天' in time_str:
|
|||
|
|
days = 1
|
|||
|
|
else:
|
|||
|
|
days = int(re.search(r'(\d+)', time_str).group(1))
|
|||
|
|
return now - timedelta(days=days)
|
|||
|
|
elif '-' in time_str: # 绝对时间格式
|
|||
|
|
try:
|
|||
|
|
return datetime.strptime(time_str, '%Y-%m-%d %H:%M')
|
|||
|
|
except:
|
|||
|
|
try:
|
|||
|
|
return datetime.strptime(time_str, '%Y-%m-%d')
|
|||
|
|
except:
|
|||
|
|
return now
|
|||
|
|
return now
|
|||
|
|
|
|||
|
|
# 检查最后一篇文章的时间,判断是否需要继续请求
|
|||
|
|
need_more = True
|
|||
|
|
if data and data.get('data', {}).get('list'):
|
|||
|
|
items = data.get('data', {}).get('list', [])
|
|||
|
|
if items:
|
|||
|
|
last_item = items[-1]
|
|||
|
|
item_data = last_item.get('itemData', {})
|
|||
|
|
article_date = get_article_datetime(item_data)
|
|||
|
|
logger.info(f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}")
|
|||
|
|
if article_date < target_date:
|
|||
|
|
need_more = False
|
|||
|
|
if months < 1:
|
|||
|
|
logger.info(
|
|||
|
|
f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}, 已超出{days}天范围,停止请求")
|
|||
|
|
else:
|
|||
|
|
logger.info(
|
|||
|
|
f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}, 已超出{int(months)}个月范围,停止请求")
|
|||
|
|
else:
|
|||
|
|
need_more = False
|
|||
|
|
elif start_page > 1:
|
|||
|
|
# 断点续传时,默认需要继续
|
|||
|
|
need_more = True
|
|||
|
|
else:
|
|||
|
|
need_more = False
|
|||
|
|
|
|||
|
|
# 循环请求后续页面,直到达到目标日期或无数据(不限制页数)
|
|||
|
|
while need_more:
|
|||
|
|
page += 1
|
|||
|
|
logger.info(f"需要更多数据,请求第{page}页...")
|
|||
|
|
|
|||
|
|
# 优化:使用随机延迟8-12秒,避免被识别为机器行为
|
|||
|
|
delay = random.uniform(8, 12)
|
|||
|
|
logger.info(f"等待 {delay:.1f} 秒后请求...")
|
|||
|
|
await asyncio.sleep(delay)
|
|||
|
|
|
|||
|
|
# 继续请求(使用上一次返回的ctime作为分页参数)
|
|||
|
|
next_data = await spider.fetch_data_directly(uk, use_browser=False, ctime=current_ctime)
|
|||
|
|
|
|||
|
|
# fetch_data_directly已经处理了反爬检测和重试,这里只需检查是否成功获取数据
|
|||
|
|
if not next_data or not next_data.get('data', {}).get('list'):
|
|||
|
|
# 如果还是失败,检查是否因为反爬
|
|||
|
|
if next_data and next_data.get('data', {}).get('foe', {}).get('is_need_foe') == True:
|
|||
|
|
logger.error(f"第{page}页多次重试后仍然触发反爬,停止请求")
|
|||
|
|
logger.error(f"返回数据: {json.dumps(next_data, ensure_ascii=False, indent=2)}")
|
|||
|
|
else:
|
|||
|
|
logger.warning(f"第{page}页无数据,停止请求")
|
|||
|
|
# 打印完整的返回结果以便调试
|
|||
|
|
if next_data:
|
|||
|
|
logger.warning(f"返回数据: {json.dumps(next_data, ensure_ascii=False, indent=2)}")
|
|||
|
|
else:
|
|||
|
|
logger.warning("返回数据: None")
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
next_items = next_data.get('data', {}).get('list', [])
|
|||
|
|
logger.info(f"第{page}页获取成功,数据条数: {len(next_items)}")
|
|||
|
|
|
|||
|
|
# 调用回调保存这一页数据
|
|||
|
|
if on_page_fetched:
|
|||
|
|
on_page_fetched(page, next_items, current_ctime)
|
|||
|
|
|
|||
|
|
# 更新ctime为下一次请求做准备 - 注意路径是 data.data.query.ctime
|
|||
|
|
current_ctime = next_data.get('data', {}).get('query', {}).get('ctime', current_ctime)
|
|||
|
|
if current_ctime:
|
|||
|
|
logger.info(f"更新分页参数 ctime={current_ctime}")
|
|||
|
|
|
|||
|
|
# 检查最后一篇文章的时间
|
|||
|
|
if next_items:
|
|||
|
|
last_item = next_items[-1]
|
|||
|
|
item_data = last_item.get('itemData', {})
|
|||
|
|
article_date = get_article_datetime(item_data)
|
|||
|
|
logger.info(f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}")
|
|||
|
|
|
|||
|
|
if article_date < target_date:
|
|||
|
|
need_more = False
|
|||
|
|
if months < 1:
|
|||
|
|
logger.info(
|
|||
|
|
f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}, 已超出{days}天范围,停止请求")
|
|||
|
|
else:
|
|||
|
|
logger.info(
|
|||
|
|
f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}, 已超出{int(months)}个月范围,停止请求")
|
|||
|
|
else:
|
|||
|
|
need_more = False
|
|||
|
|
|
|||
|
|
# 返回最后的分页信息(用于断点续传)
|
|||
|
|
result = {
|
|||
|
|
'last_page': page,
|
|||
|
|
'last_ctime': current_ctime,
|
|||
|
|
'completed': not need_more # 是否已完成
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
logger.info(f"抓取完成,最后页码: {page}, ctime: {current_ctime}")
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 同步包装函数(便于在同步代码中调用)
|
|||
|
|
def get_baidu_data_sync(uk: str = "ntHidnLhrlfclJar2z8wBg", months: int = 6, use_proxy: bool = False,
|
|||
|
|
proxy_api_url: str = None, on_page_fetched=None,
|
|||
|
|
start_page: int = 1, start_ctime: str = None) -> Optional[Dict]:
|
|||
|
|
"""同步方式获取数据
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
uk: 作者UK
|
|||
|
|
months: 获取近几个月的数据,默认6个月
|
|||
|
|
use_proxy: 是否启用代理IP池
|
|||
|
|
proxy_api_url: 代理API地址,留空使用默认
|
|||
|
|
on_page_fetched: 回调函数,每页数据抽取后调用
|
|||
|
|
start_page: 起始页码(断点续传)
|
|||
|
|
start_ctime: 起始分页参数(断点续传)
|
|||
|
|
"""
|
|||
|
|
return asyncio.run(fetch_baidu_data(uk, months, use_proxy, proxy_api_url,
|
|||
|
|
on_page_fetched, start_page, start_ctime))
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 保留原有的main函数用于测试
|
|||
|
|
async def main():
|
|||
|
|
data = await fetch_baidu_data()
|
|||
|
|
if data:
|
|||
|
|
print(json.dumps(data, ensure_ascii=False, indent=2))
|
|||
|
|
from test2 import display_simple_data
|
|||
|
|
display_simple_data(data)
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
asyncio.run(main())
|