634 lines
26 KiB
Python
634 lines
26 KiB
Python
import asyncio
|
||
import json
|
||
import random
|
||
import time
|
||
from typing import Dict, Any, Optional
|
||
from urllib.parse import quote
|
||
|
||
import aiohttp
|
||
from playwright.async_api import async_playwright
|
||
from fake_useragent import UserAgent
|
||
import logging
|
||
|
||
from test2 import display_simple_data
|
||
|
||
logging.basicConfig(level=logging.INFO)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class BaiduBJHSpider:
|
||
def __init__(self, use_proxy: bool = False, proxy_api_url: str = None, proxy_username: str = None, proxy_password: str = None):
|
||
self.ua = UserAgent()
|
||
self.use_proxy = use_proxy
|
||
self.proxy_api_url = proxy_api_url or 'http://api.tianqiip.com/getip?secret=lu29e593&num=1&type=txt&port=1&mr=1&sign=4b81a62eaed89ba802a8f34053e2c964'
|
||
self.proxy_username = proxy_username
|
||
self.proxy_password = proxy_password
|
||
self.current_proxy = None
|
||
self.session_cookie = None
|
||
|
||
def get_proxy(self):
|
||
"""从代理池获取一个代理IP"""
|
||
if not self.use_proxy:
|
||
return None
|
||
|
||
try:
|
||
import requests
|
||
logger.info(f"从代理池获取IP: {self.proxy_api_url}")
|
||
response = requests.get(self.proxy_api_url, timeout=5) # 优化超时为5秒
|
||
content = response.content.decode("utf-8").strip()
|
||
logger.info(f"提取代理IP: {content}")
|
||
|
||
if ':' in content:
|
||
ip, port = content.strip().split(":", 1)
|
||
|
||
# 如果有认证信息,添加到代理URL中
|
||
if self.proxy_username and self.proxy_password:
|
||
proxy_url = f"http://{self.proxy_username}:{self.proxy_password}@{ip}:{port}"
|
||
logger.info(f"代理配置成功(带认证): http://{self.proxy_username}:****@{ip}:{port}")
|
||
else:
|
||
proxy_url = f"http://{ip}:{port}"
|
||
logger.info(f"代理配置成功: {proxy_url}")
|
||
|
||
self.current_proxy = proxy_url
|
||
return proxy_url
|
||
else:
|
||
logger.error("代理IP格式错误")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"获取代理IP失败: {e}")
|
||
return None
|
||
|
||
async def init_browser(self):
|
||
"""初始化浏览器环境获取Cookie"""
|
||
playwright = await async_playwright().start()
|
||
|
||
# 配置浏览器参数
|
||
browser_args = [
|
||
'--disable-blink-features=AutomationControlled',
|
||
'--disable-web-security',
|
||
'--disable-features=IsolateOrigins,site-per-process',
|
||
'--no-sandbox',
|
||
'--disable-setuid-sandbox',
|
||
]
|
||
|
||
# 启动浏览器
|
||
browser = await playwright.chromium.launch(
|
||
headless=True, # 设置为True可以无头模式运行
|
||
args=browser_args
|
||
)
|
||
|
||
# 创建上下文
|
||
context = await browser.new_context(
|
||
viewport={'width': 1920, 'height': 1080},
|
||
user_agent=self.ua.random,
|
||
locale='zh-CN',
|
||
timezone_id='Asia/Shanghai'
|
||
)
|
||
|
||
# 设置额外的HTTP头
|
||
await context.set_extra_http_headers({
|
||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
||
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
|
||
'Accept-Encoding': 'gzip, deflate, br',
|
||
'Connection': 'keep-alive',
|
||
'Upgrade-Insecure-Requests': '1',
|
||
})
|
||
|
||
page = await context.new_page()
|
||
|
||
# 首先访问百度首页获取基础Cookie
|
||
await page.goto('https://www.baidu.com', wait_until='networkidle')
|
||
await asyncio.sleep(random.uniform(2, 4))
|
||
|
||
# 访问百家号页面
|
||
await page.goto('https://baijiahao.baidu.com/', wait_until='networkidle')
|
||
await asyncio.sleep(random.uniform(3, 5))
|
||
|
||
# 获取Cookie
|
||
cookies = await context.cookies()
|
||
self.session_cookie = '; '.join([f"{c['name']}={c['value']}" for c in cookies])
|
||
|
||
logger.info(f"获取到Cookie: {self.session_cookie[:50]}...")
|
||
|
||
await browser.close()
|
||
await playwright.stop()
|
||
|
||
return cookies
|
||
|
||
def build_headers(self, referer: str = "https://baijiahao.baidu.com/") -> Dict:
|
||
"""构建请求头"""
|
||
timestamp = int(time.time() * 1000)
|
||
|
||
headers = {
|
||
'User-Agent': self.ua.random,
|
||
'Accept': '*/*',
|
||
'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
|
||
'Accept-Encoding': 'gzip, deflate, br',
|
||
'Referer': referer,
|
||
'Connection': 'keep-alive',
|
||
'Sec-Fetch-Dest': 'script',
|
||
'Sec-Fetch-Mode': 'no-cors',
|
||
'Sec-Fetch-Site': 'same-site',
|
||
'Pragma': 'no-cache',
|
||
'Cache-Control': 'no-cache',
|
||
}
|
||
|
||
if self.session_cookie:
|
||
headers['Cookie'] = self.session_cookie
|
||
|
||
return headers
|
||
|
||
def generate_callback_name(self) -> str:
|
||
"""生成随机的callback函数名"""
|
||
timestamp = int(time.time() * 1000)
|
||
return f"__jsonp{timestamp}"
|
||
|
||
async def fetch_data_directly(self, uk: str = "ntHidnLhrlfclJar2z8wBg", use_browser: bool = False, num: int = 10,
|
||
ctime: str = None) -> Optional[Dict]:
|
||
"""直接请求接口(可能需要多次尝试)
|
||
|
||
Args:
|
||
uk: 作者UK
|
||
use_browser: 是否使用浏览器获取Cookie,默认False不启动浏览器
|
||
num: 请求数据条数,API固定为10
|
||
ctime: 分页参数,上一次请求返回的query.ctime值
|
||
"""
|
||
# 只在use_browser=True时才初始化浏览器获取Cookie
|
||
if use_browser:
|
||
await self.init_browser()
|
||
|
||
# 如果启用代理,必须先获取一个代理IP(失败则抛出异常,不使用本机IP)
|
||
if self.use_proxy:
|
||
if not self.current_proxy:
|
||
proxy = self.get_proxy()
|
||
if not proxy:
|
||
raise Exception("启用了代理但无法获取代理IP,拒绝使用本机IP")
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
for attempt in range(10): # 增加到10次重试,应对IP池限流
|
||
try:
|
||
callback_name = self.generate_callback_name()
|
||
timestamp = int(time.time() * 1000)
|
||
|
||
# 构建URL参数
|
||
params = {
|
||
'tab': 'main',
|
||
'num': '10', # API固定为10
|
||
'uk': uk,
|
||
'source': 'pc',
|
||
'type': 'newhome',
|
||
'action': 'dynamic',
|
||
'format': 'jsonp',
|
||
'callback': callback_name,
|
||
'otherext': f'h5_{time.strftime("%Y%m%d%H%M%S")}',
|
||
'Tenger-Mhor': str(timestamp),
|
||
'_': str(timestamp) # 添加时间戳参数
|
||
}
|
||
|
||
# 如果有ctime参数,添加到请求中(用于分页)
|
||
if ctime:
|
||
params['ctime'] = ctime
|
||
|
||
url = "https://mbd.baidu.com/webpage"
|
||
headers = self.build_headers()
|
||
|
||
logger.info(f"尝试第{attempt + 1}次请求,URL: {url}")
|
||
|
||
# 准备请求参数
|
||
request_kwargs = {
|
||
'params': params,
|
||
'headers': headers,
|
||
'timeout': aiohttp.ClientTimeout(total=30)
|
||
}
|
||
|
||
# 如果使用代理,添加代理配置(必须有代理才请求)
|
||
if self.use_proxy:
|
||
if not self.current_proxy:
|
||
raise Exception("启用了代理但当前无代理IP,拒绝使用本机IP")
|
||
logger.info(f"使用代理: {self.current_proxy}")
|
||
request_kwargs['proxy'] = self.current_proxy
|
||
|
||
async with session.get(url, **request_kwargs) as response:
|
||
text = await response.text()
|
||
# 提取JSONP数据
|
||
if text.startswith(callback_name + '(') and text.endswith(')'):
|
||
json_str = text[len(callback_name) + 1:-1]
|
||
data = json.loads(json_str)
|
||
|
||
# 检查是否被反爬
|
||
if data.get('data', {}).get('foe', {}).get('is_need_foe') == True:
|
||
logger.warning(f"检测到反爬标识(is_need_foe=True),尝试第{attempt + 1}次")
|
||
# 如果启用了代理,立即切换IP
|
||
if self.use_proxy:
|
||
logger.info("检测到反爬,立即切换代理IP(无需等待)")
|
||
self.get_proxy()
|
||
# 继续重试
|
||
if attempt < 9: # 还有重试机会(总共10次)
|
||
continue
|
||
|
||
return data
|
||
|
||
except aiohttp.ClientConnectorError as e:
|
||
logger.error(f"❌ 网络连接失败 (尝试{attempt + 1}/10): {type(e).__name__} - {str(e)[:100]}")
|
||
if self.use_proxy:
|
||
logger.info("🔄 网络错误,立即切换代理IP")
|
||
self.get_proxy()
|
||
except asyncio.TimeoutError as e:
|
||
logger.error(f"❌ 请求超时 (尝试{attempt + 1}/10): 代理响应超过30秒")
|
||
if self.use_proxy:
|
||
logger.info("🔄 超时,立即切换代理IP")
|
||
self.get_proxy()
|
||
except aiohttp.ClientProxyConnectionError as e:
|
||
logger.error(f"❌ 代理连接失败 (尝试{attempt + 1}/10): {e}")
|
||
# 代理失败,立即重新获取
|
||
if self.use_proxy:
|
||
logger.info("🔄 代理失败,立即切换代理IP(无需等待)")
|
||
self.get_proxy()
|
||
# 代理错误不需要等待,直接重试
|
||
except aiohttp.ClientResponseError as e:
|
||
# 检查是否是407错误(代理认证失败/IP池限流)
|
||
if e.status == 407:
|
||
logger.warning(f"检测到407错误(代理IP池限流),等待10秒后重新获取IP...")
|
||
await asyncio.sleep(10) # 增加到10秒,给IP池缓冲时间
|
||
if self.use_proxy:
|
||
logger.info("重新获取代理IP...")
|
||
self.get_proxy()
|
||
# 继续重试
|
||
else:
|
||
logger.error(f"❌ HTTP错误 (尝试{attempt + 1}/10): {e.status}, {e.message}")
|
||
await asyncio.sleep(random.uniform(1, 2))
|
||
except Exception as e:
|
||
logger.error(f"❌ 未知错误 (尝试{attempt + 1}/10): {type(e).__name__} - {str(e)[:100]}")
|
||
await asyncio.sleep(random.uniform(1, 2)) # 减少到1-2秒
|
||
|
||
# 10次重试全部失败
|
||
logger.error("请求失败:已经重试10次仍然失败,可能是IP池限流或网络问题")
|
||
return None
|
||
|
||
async def fetch_via_browser(self, uk: str = "ntHidnLhrlfclJar2z8wBg") -> Optional[Dict]:
|
||
"""通过浏览器直接执行获取数据(最可靠的方法)"""
|
||
playwright = await async_playwright().start()
|
||
|
||
try:
|
||
browser = await playwright.chromium.launch(
|
||
headless=False, # 调试时可设为False
|
||
args=[
|
||
'--disable-blink-features=AutomationControlled',
|
||
'--no-sandbox'
|
||
]
|
||
)
|
||
|
||
context = await browser.new_context(
|
||
viewport={'width': 1920, 'height': 1080},
|
||
user_agent=self.ua.random,
|
||
locale='zh-CN'
|
||
)
|
||
|
||
page = await context.new_page()
|
||
|
||
# 监听网络请求
|
||
results = []
|
||
|
||
def handle_response(response):
|
||
if "mbd.baidu.com/webpage" in response.url and "format=jsonp" in response.url:
|
||
try:
|
||
# 尝试提取JSONP数据
|
||
text = response.text()
|
||
if "callback=" in response.url:
|
||
# 从URL提取callback名称
|
||
import re
|
||
match = re.search(r'callback=([^&]+)', response.url)
|
||
if match:
|
||
callback = match.group(1)
|
||
if text.startswith(callback + '(') and text.endswith(')'):
|
||
json_str = text[len(callback) + 1:-1]
|
||
data = json.loads(json_str)
|
||
results.append(data)
|
||
except:
|
||
pass
|
||
|
||
page.on("response", handle_response)
|
||
|
||
# 访问百家号页面
|
||
await page.goto(f"https://baijiahao.baidu.com/u?app_id={uk}", wait_until='networkidle')
|
||
|
||
# 模拟用户滚动
|
||
for _ in range(3):
|
||
await page.evaluate("window.scrollBy(0, window.innerHeight)")
|
||
await asyncio.sleep(random.uniform(1, 2))
|
||
|
||
# 等待数据加载
|
||
await asyncio.sleep(5)
|
||
|
||
await browser.close()
|
||
|
||
if results:
|
||
return results[0]
|
||
|
||
except Exception as e:
|
||
logger.error(f"浏览器方式获取失败: {e}")
|
||
finally:
|
||
await playwright.stop()
|
||
|
||
return None
|
||
|
||
async def fetch_with_signature(self, uk: str = "ntHidnLhrlfclJar2z8wBg") -> Optional[Dict]:
|
||
"""尝试使用签名参数请求"""
|
||
# 百度接口可能需要特定的签名参数
|
||
# 这里需要分析JavaScript找到签名算法
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
# 先获取必要的token
|
||
token_url = "https://mbd.baidu.com/staticx/search/dynamic/config"
|
||
|
||
headers = {
|
||
'User-Agent': self.ua.random,
|
||
'Referer': 'https://baijiahao.baidu.com/',
|
||
}
|
||
|
||
try:
|
||
# 获取配置信息
|
||
async with session.get(token_url, headers=headers) as resp:
|
||
config_text = await resp.text()
|
||
logger.info(f"配置响应: {config_text[:200]}")
|
||
|
||
# 构建完整请求
|
||
timestamp = int(time.time() * 1000)
|
||
params = {
|
||
'tab': 'main',
|
||
'num': '10',
|
||
'uk': uk,
|
||
'source': 'pc',
|
||
'type': 'newhome',
|
||
'action': 'dynamic',
|
||
'format': 'json',
|
||
't': str(timestamp),
|
||
'callback': f'__jsonp{timestamp}',
|
||
}
|
||
|
||
# 尝试JSON格式(非JSONP)
|
||
params['format'] = 'json'
|
||
del params['callback']
|
||
|
||
url = "https://mbd.baidu.com/webpage"
|
||
|
||
async with session.get(url, params=params, headers=headers) as response:
|
||
text = await response.text()
|
||
logger.info(f"JSON响应: {text[:500]}")
|
||
|
||
try:
|
||
return json.loads(text)
|
||
except:
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"签名方式失败: {e}")
|
||
return None
|
||
|
||
|
||
async def fetch_baidu_data(uk: str = "ntHidnLhrlfclJar2z8wBg", months: int = 6, use_proxy: bool = False, proxy_api_url: str = None,
|
||
on_page_fetched=None, start_page: int = 1, start_ctime: str = None) -> Optional[Dict]:
|
||
"""获取百家号数据的主函数
|
||
|
||
Args:
|
||
uk: 作者UK
|
||
months: 获取近几个月的数据,默认6个月(支持小数,如0.33代衡10天)
|
||
use_proxy: 是否启用代理IP池
|
||
proxy_api_url: 代理API地址,留空使用默认
|
||
on_page_fetched: 回调函数,每页数据抽取后调用,signature: (page, items, ctime) -> None
|
||
start_page: 起始页码(断点续传)
|
||
start_ctime: 起始分页参数(断点续传)
|
||
"""
|
||
from datetime import datetime, timedelta
|
||
import re
|
||
|
||
spider = BaiduBJHSpider(use_proxy=use_proxy, proxy_api_url=proxy_api_url)
|
||
|
||
# 计算目标日期(支持小数月份)
|
||
days = int(months * 30)
|
||
target_date = datetime.now() - timedelta(days=days)
|
||
|
||
# 日志输出优化
|
||
if months < 1:
|
||
logger.info(f"开始获取百家号数据(近{days}天, 目标日期: {target_date.strftime('%Y-%m-%d')})")
|
||
else:
|
||
logger.info(f"开始获取百家号数据(近{int(months)}个月, 目标日期: {target_date.strftime('%Y-%m-%d')})")
|
||
|
||
# 先获取第一页数据(每次固定10条)
|
||
# 注意:不再使用 all_articles 累加,每页直接通过回调保存
|
||
page = start_page # 支持从指定页码开始
|
||
current_ctime = start_ctime # 支持使用之前的分页参数
|
||
|
||
# 如果是断点续传,直接跳过第一页,使用保存的ctime
|
||
if start_page > 1 and start_ctime:
|
||
logger.info(f"断点续传:从第{start_page}页开始,ctime={start_ctime}")
|
||
data = None # 不需要第一页的data结构
|
||
else:
|
||
# 优化: 直接请求API,只有失败时才启动浏览器
|
||
logger.info("尝试直接请求API(不启动浏览器)...")
|
||
data = await spider.fetch_data_directly(uk, use_browser=False, ctime=current_ctime)
|
||
|
||
# 如果第一次失败,再启动浏览器重试
|
||
if not data or not data.get('data', {}).get('list'):
|
||
if start_page == 1: # 只有第一页才需要浏览器重试
|
||
logger.warning("直接请求失败,启动浏览器获取Cookie...")
|
||
# 打印第一次请求的返回数据
|
||
if data:
|
||
logger.warning(f"第一次请求返回数据: {json.dumps(data, ensure_ascii=False, indent=2)}")
|
||
else:
|
||
logger.warning("第一次请求返回数据: None")
|
||
|
||
data = await spider.fetch_data_directly(uk, use_browser=True)
|
||
|
||
if not data or not data.get('data', {}).get('list'):
|
||
logger.error("启动浏览器后仍然失败,放弃")
|
||
# 打印最终的返回数据
|
||
if data:
|
||
logger.error(f"最终返回数据: {json.dumps(data, ensure_ascii=False, indent=2)}")
|
||
else:
|
||
logger.error("最终返回数据: None")
|
||
return None
|
||
|
||
# 第一次请求成功,处理数据(只有非断点续传时)
|
||
if data and data.get('data', {}).get('list'):
|
||
items = data.get('data', {}).get('list', [])
|
||
logger.info(f"第{page}页获取成功,数据条数: {len(items)}")
|
||
|
||
# 调用回调保存第一页数据
|
||
if on_page_fetched:
|
||
on_page_fetched(page, items, current_ctime)
|
||
|
||
# 提取第一页的ctime用于分页 - 注意路径是 data.data.query.ctime
|
||
current_ctime = data.get('data', {}).get('query', {}).get('ctime', current_ctime)
|
||
if current_ctime:
|
||
logger.info(f"获取到分页参数 ctime={current_ctime}")
|
||
else:
|
||
logger.warning("未获取到ctime分页参数")
|
||
|
||
# 使用ctime(Unix时间戳)进行时间判断,更准确
|
||
def get_article_datetime(item_data: dict) -> datetime:
|
||
"""从ittemData中提取文章时间
|
||
|
||
优先使用ctime(Unix时间戳),更准确
|
||
"""
|
||
# 优先使用ctime(秒级Unix时间戳)
|
||
if 'ctime' in item_data and item_data['ctime']:
|
||
try:
|
||
timestamp = int(item_data['ctime'])
|
||
return datetime.fromtimestamp(timestamp)
|
||
except:
|
||
pass
|
||
|
||
# 备用: 使用time字段(相对时间或绝对时间)
|
||
time_str = item_data.get('time', '')
|
||
if not time_str:
|
||
return datetime.now()
|
||
|
||
now = datetime.now()
|
||
if '分钟前' in time_str:
|
||
minutes = int(re.search(r'(\d+)', time_str).group(1))
|
||
return now - timedelta(minutes=minutes)
|
||
elif '小时前' in time_str:
|
||
hours = int(re.search(r'(\d+)', time_str).group(1))
|
||
return now - timedelta(hours=hours)
|
||
elif '天前' in time_str or '昨天' in time_str:
|
||
if '昨天' in time_str:
|
||
days = 1
|
||
else:
|
||
days = int(re.search(r'(\d+)', time_str).group(1))
|
||
return now - timedelta(days=days)
|
||
elif '-' in time_str: # 绝对时间格式
|
||
try:
|
||
return datetime.strptime(time_str, '%Y-%m-%d %H:%M')
|
||
except:
|
||
try:
|
||
return datetime.strptime(time_str, '%Y-%m-%d')
|
||
except:
|
||
return now
|
||
return now
|
||
|
||
# 检查最后一篇文章的时间,判断是否需要继续请求
|
||
need_more = True
|
||
if data and data.get('data', {}).get('list'):
|
||
items = data.get('data', {}).get('list', [])
|
||
if items:
|
||
last_item = items[-1]
|
||
item_data = last_item.get('itemData', {})
|
||
article_date = get_article_datetime(item_data)
|
||
logger.info(f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}")
|
||
if article_date < target_date:
|
||
need_more = False
|
||
if months < 1:
|
||
logger.info(
|
||
f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}, 已超出{days}天范围,停止请求")
|
||
else:
|
||
logger.info(
|
||
f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}, 已超出{int(months)}个月范围,停止请求")
|
||
else:
|
||
need_more = False
|
||
elif start_page > 1:
|
||
# 断点续传时,默认需要继续
|
||
need_more = True
|
||
else:
|
||
need_more = False
|
||
|
||
# 循环请求后续页面,直到达到目标日期或无数据(不限制页数)
|
||
while need_more:
|
||
page += 1
|
||
logger.info(f"需要更多数据,请求第{page}页...")
|
||
|
||
# 优化:使用随机延迟8-12秒,避免被识别为机器行为
|
||
delay = random.uniform(8, 12)
|
||
logger.info(f"等待 {delay:.1f} 秒后请求...")
|
||
await asyncio.sleep(delay)
|
||
|
||
# 继续请求(使用上一次返回的ctime作为分页参数)
|
||
next_data = await spider.fetch_data_directly(uk, use_browser=False, ctime=current_ctime)
|
||
|
||
# fetch_data_directly已经处理了反爬检测和重试,这里只需检查是否成功获取数据
|
||
if not next_data or not next_data.get('data', {}).get('list'):
|
||
# 如果还是失败,检查是否因为反爬
|
||
if next_data and next_data.get('data', {}).get('foe', {}).get('is_need_foe') == True:
|
||
logger.error(f"第{page}页多次重试后仍然触发反爬,停止请求")
|
||
logger.error(f"返回数据: {json.dumps(next_data, ensure_ascii=False, indent=2)}")
|
||
else:
|
||
logger.warning(f"第{page}页无数据,停止请求")
|
||
# 打印完整的返回结果以便调试
|
||
if next_data:
|
||
logger.warning(f"返回数据: {json.dumps(next_data, ensure_ascii=False, indent=2)}")
|
||
else:
|
||
logger.warning("返回数据: None")
|
||
break
|
||
|
||
next_items = next_data.get('data', {}).get('list', [])
|
||
logger.info(f"第{page}页获取成功,数据条数: {len(next_items)}")
|
||
|
||
# 调用回调保存这一页数据
|
||
if on_page_fetched:
|
||
on_page_fetched(page, next_items, current_ctime)
|
||
|
||
# 更新ctime为下一次请求做准备 - 注意路径是 data.data.query.ctime
|
||
current_ctime = next_data.get('data', {}).get('query', {}).get('ctime', current_ctime)
|
||
if current_ctime:
|
||
logger.info(f"更新分页参数 ctime={current_ctime}")
|
||
|
||
# 检查最后一篇文章的时间
|
||
if next_items:
|
||
last_item = next_items[-1]
|
||
item_data = last_item.get('itemData', {})
|
||
article_date = get_article_datetime(item_data)
|
||
logger.info(f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}")
|
||
|
||
if article_date < target_date:
|
||
need_more = False
|
||
if months < 1:
|
||
logger.info(
|
||
f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}, 已超出{days}天范围,停止请求")
|
||
else:
|
||
logger.info(
|
||
f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}, 已超出{int(months)}个月范围,停止请求")
|
||
else:
|
||
need_more = False
|
||
|
||
# 返回最后的分页信息(用于断点续传)
|
||
result = {
|
||
'last_page': page,
|
||
'last_ctime': current_ctime,
|
||
'completed': not need_more # 是否已完成
|
||
}
|
||
|
||
logger.info(f"抓取完成,最后页码: {page}, ctime: {current_ctime}")
|
||
return result
|
||
|
||
|
||
# 同步包装函数(便于在同步代码中调用)
|
||
def get_baidu_data_sync(uk: str = "ntHidnLhrlfclJar2z8wBg", months: int = 6, use_proxy: bool = False,
|
||
proxy_api_url: str = None, on_page_fetched=None,
|
||
start_page: int = 1, start_ctime: str = None) -> Optional[Dict]:
|
||
"""同步方式获取数据
|
||
|
||
Args:
|
||
uk: 作者UK
|
||
months: 获取近几个月的数据,默认6个月
|
||
use_proxy: 是否启用代理IP池
|
||
proxy_api_url: 代理API地址,留空使用默认
|
||
on_page_fetched: 回调函数,每页数据抽取后调用
|
||
start_page: 起始页码(断点续传)
|
||
start_ctime: 起始分页参数(断点续传)
|
||
"""
|
||
return asyncio.run(fetch_baidu_data(uk, months, use_proxy, proxy_api_url,
|
||
on_page_fetched, start_page, start_ctime))
|
||
|
||
|
||
# 保留原有的main函数用于测试
|
||
async def main():
|
||
data = await fetch_baidu_data()
|
||
if data:
|
||
print(json.dumps(data, ensure_ascii=False, indent=2))
|
||
from test2 import display_simple_data
|
||
display_simple_data(data)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|