import asyncio import json import random import time from typing import Dict, Any, Optional from urllib.parse import quote import aiohttp from playwright.async_api import async_playwright from fake_useragent import UserAgent import logging from test2 import display_simple_data logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class BaiduBJHSpider: def __init__(self, use_proxy: bool = False, proxy_api_url: str = None, proxy_username: str = None, proxy_password: str = None): self.ua = UserAgent() self.use_proxy = use_proxy self.proxy_api_url = proxy_api_url or 'http://api.tianqiip.com/getip?secret=lu29e593&num=1&type=txt&port=1&mr=1&sign=4b81a62eaed89ba802a8f34053e2c964' self.proxy_username = proxy_username self.proxy_password = proxy_password self.current_proxy = None self.session_cookie = None def get_proxy(self): """从代理池获取一个代理IP""" if not self.use_proxy: return None try: import requests logger.info(f"从代理池获取IP: {self.proxy_api_url}") response = requests.get(self.proxy_api_url, timeout=5) # 优化超时为5秒 content = response.content.decode("utf-8").strip() logger.info(f"提取代理IP: {content}") if ':' in content: ip, port = content.strip().split(":", 1) # 如果有认证信息,添加到代理URL中 if self.proxy_username and self.proxy_password: proxy_url = f"http://{self.proxy_username}:{self.proxy_password}@{ip}:{port}" logger.info(f"代理配置成功(带认证): http://{self.proxy_username}:****@{ip}:{port}") else: proxy_url = f"http://{ip}:{port}" logger.info(f"代理配置成功: {proxy_url}") self.current_proxy = proxy_url return proxy_url else: logger.error("代理IP格式错误") return None except Exception as e: logger.error(f"获取代理IP失败: {e}") return None async def init_browser(self): """初始化浏览器环境获取Cookie""" playwright = await async_playwright().start() # 配置浏览器参数 browser_args = [ '--disable-blink-features=AutomationControlled', '--disable-web-security', '--disable-features=IsolateOrigins,site-per-process', '--no-sandbox', '--disable-setuid-sandbox', ] # 启动浏览器 browser = await playwright.chromium.launch( headless=True, # 设置为True可以无头模式运行 args=browser_args ) # 创建上下文 context = await browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent=self.ua.random, locale='zh-CN', timezone_id='Asia/Shanghai' ) # 设置额外的HTTP头 await context.set_extra_http_headers({ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', }) page = await context.new_page() # 首先访问百度首页获取基础Cookie await page.goto('https://www.baidu.com', wait_until='networkidle') await asyncio.sleep(random.uniform(2, 4)) # 访问百家号页面 await page.goto('https://baijiahao.baidu.com/', wait_until='networkidle') await asyncio.sleep(random.uniform(3, 5)) # 获取Cookie cookies = await context.cookies() self.session_cookie = '; '.join([f"{c['name']}={c['value']}" for c in cookies]) logger.info(f"获取到Cookie: {self.session_cookie[:50]}...") await browser.close() await playwright.stop() return cookies def build_headers(self, referer: str = "https://baijiahao.baidu.com/") -> Dict: """构建请求头""" timestamp = int(time.time() * 1000) headers = { 'User-Agent': self.ua.random, 'Accept': '*/*', 'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7', 'Accept-Encoding': 'gzip, deflate, br', 'Referer': referer, 'Connection': 'keep-alive', 'Sec-Fetch-Dest': 'script', 'Sec-Fetch-Mode': 'no-cors', 'Sec-Fetch-Site': 'same-site', 'Pragma': 'no-cache', 'Cache-Control': 'no-cache', } if self.session_cookie: headers['Cookie'] = self.session_cookie return headers def generate_callback_name(self) -> str: """生成随机的callback函数名""" timestamp = int(time.time() * 1000) return f"__jsonp{timestamp}" async def fetch_data_directly(self, uk: str = "ntHidnLhrlfclJar2z8wBg", use_browser: bool = False, num: int = 10, ctime: str = None) -> Optional[Dict]: """直接请求接口(可能需要多次尝试) Args: uk: 作者UK use_browser: 是否使用浏览器获取Cookie,默认False不启动浏览器 num: 请求数据条数,API固定为10 ctime: 分页参数,上一次请求返回的query.ctime值 """ # 只在use_browser=True时才初始化浏览器获取Cookie if use_browser: await self.init_browser() # 如果启用代理,必须先获取一个代理IP(失败则抛出异常,不使用本机IP) if self.use_proxy: if not self.current_proxy: proxy = self.get_proxy() if not proxy: raise Exception("启用了代理但无法获取代理IP,拒绝使用本机IP") async with aiohttp.ClientSession() as session: for attempt in range(10): # 增加到10次重试,应对IP池限流 try: callback_name = self.generate_callback_name() timestamp = int(time.time() * 1000) # 构建URL参数 params = { 'tab': 'main', 'num': '10', # API固定为10 'uk': uk, 'source': 'pc', 'type': 'newhome', 'action': 'dynamic', 'format': 'jsonp', 'callback': callback_name, 'otherext': f'h5_{time.strftime("%Y%m%d%H%M%S")}', 'Tenger-Mhor': str(timestamp), '_': str(timestamp) # 添加时间戳参数 } # 如果有ctime参数,添加到请求中(用于分页) if ctime: params['ctime'] = ctime url = "https://mbd.baidu.com/webpage" headers = self.build_headers() logger.info(f"尝试第{attempt + 1}次请求,URL: {url}") # 准备请求参数 request_kwargs = { 'params': params, 'headers': headers, 'timeout': aiohttp.ClientTimeout(total=30) } # 如果使用代理,添加代理配置(必须有代理才请求) if self.use_proxy: if not self.current_proxy: raise Exception("启用了代理但当前无代理IP,拒绝使用本机IP") logger.info(f"使用代理: {self.current_proxy}") request_kwargs['proxy'] = self.current_proxy async with session.get(url, **request_kwargs) as response: text = await response.text() # 提取JSONP数据 if text.startswith(callback_name + '(') and text.endswith(')'): json_str = text[len(callback_name) + 1:-1] data = json.loads(json_str) # 检查是否被反爬 if data.get('data', {}).get('foe', {}).get('is_need_foe') == True: logger.warning(f"检测到反爬标识(is_need_foe=True),尝试第{attempt + 1}次") # 如果启用了代理,立即切换IP if self.use_proxy: logger.info("检测到反爬,立即切换代理IP(无需等待)") self.get_proxy() # 继续重试 if attempt < 9: # 还有重试机会(总共10次) continue return data except aiohttp.ClientConnectorError as e: logger.error(f"❌ 网络连接失败 (尝试{attempt + 1}/10): {type(e).__name__} - {str(e)[:100]}") if self.use_proxy: logger.info("🔄 网络错误,立即切换代理IP") self.get_proxy() except asyncio.TimeoutError as e: logger.error(f"❌ 请求超时 (尝试{attempt + 1}/10): 代理响应超过30秒") if self.use_proxy: logger.info("🔄 超时,立即切换代理IP") self.get_proxy() except aiohttp.ClientProxyConnectionError as e: logger.error(f"❌ 代理连接失败 (尝试{attempt + 1}/10): {e}") # 代理失败,立即重新获取 if self.use_proxy: logger.info("🔄 代理失败,立即切换代理IP(无需等待)") self.get_proxy() # 代理错误不需要等待,直接重试 except aiohttp.ClientResponseError as e: # 检查是否是407错误(代理认证失败/IP池限流) if e.status == 407: logger.warning(f"检测到407错误(代理IP池限流),等待10秒后重新获取IP...") await asyncio.sleep(10) # 增加到10秒,给IP池缓冲时间 if self.use_proxy: logger.info("重新获取代理IP...") self.get_proxy() # 继续重试 else: logger.error(f"❌ HTTP错误 (尝试{attempt + 1}/10): {e.status}, {e.message}") await asyncio.sleep(random.uniform(1, 2)) except Exception as e: logger.error(f"❌ 未知错误 (尝试{attempt + 1}/10): {type(e).__name__} - {str(e)[:100]}") await asyncio.sleep(random.uniform(1, 2)) # 减少到1-2秒 # 10次重试全部失败 logger.error("请求失败:已经重试10次仍然失败,可能是IP池限流或网络问题") return None async def fetch_via_browser(self, uk: str = "ntHidnLhrlfclJar2z8wBg") -> Optional[Dict]: """通过浏览器直接执行获取数据(最可靠的方法)""" playwright = await async_playwright().start() try: browser = await playwright.chromium.launch( headless=False, # 调试时可设为False args=[ '--disable-blink-features=AutomationControlled', '--no-sandbox' ] ) context = await browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent=self.ua.random, locale='zh-CN' ) page = await context.new_page() # 监听网络请求 results = [] def handle_response(response): if "mbd.baidu.com/webpage" in response.url and "format=jsonp" in response.url: try: # 尝试提取JSONP数据 text = response.text() if "callback=" in response.url: # 从URL提取callback名称 import re match = re.search(r'callback=([^&]+)', response.url) if match: callback = match.group(1) if text.startswith(callback + '(') and text.endswith(')'): json_str = text[len(callback) + 1:-1] data = json.loads(json_str) results.append(data) except: pass page.on("response", handle_response) # 访问百家号页面 await page.goto(f"https://baijiahao.baidu.com/u?app_id={uk}", wait_until='networkidle') # 模拟用户滚动 for _ in range(3): await page.evaluate("window.scrollBy(0, window.innerHeight)") await asyncio.sleep(random.uniform(1, 2)) # 等待数据加载 await asyncio.sleep(5) await browser.close() if results: return results[0] except Exception as e: logger.error(f"浏览器方式获取失败: {e}") finally: await playwright.stop() return None async def fetch_with_signature(self, uk: str = "ntHidnLhrlfclJar2z8wBg") -> Optional[Dict]: """尝试使用签名参数请求""" # 百度接口可能需要特定的签名参数 # 这里需要分析JavaScript找到签名算法 async with aiohttp.ClientSession() as session: # 先获取必要的token token_url = "https://mbd.baidu.com/staticx/search/dynamic/config" headers = { 'User-Agent': self.ua.random, 'Referer': 'https://baijiahao.baidu.com/', } try: # 获取配置信息 async with session.get(token_url, headers=headers) as resp: config_text = await resp.text() logger.info(f"配置响应: {config_text[:200]}") # 构建完整请求 timestamp = int(time.time() * 1000) params = { 'tab': 'main', 'num': '10', 'uk': uk, 'source': 'pc', 'type': 'newhome', 'action': 'dynamic', 'format': 'json', 't': str(timestamp), 'callback': f'__jsonp{timestamp}', } # 尝试JSON格式(非JSONP) params['format'] = 'json' del params['callback'] url = "https://mbd.baidu.com/webpage" async with session.get(url, params=params, headers=headers) as response: text = await response.text() logger.info(f"JSON响应: {text[:500]}") try: return json.loads(text) except: return None except Exception as e: logger.error(f"签名方式失败: {e}") return None async def fetch_baidu_data(uk: str = "ntHidnLhrlfclJar2z8wBg", months: int = 6, use_proxy: bool = False, proxy_api_url: str = None, on_page_fetched=None, start_page: int = 1, start_ctime: str = None) -> Optional[Dict]: """获取百家号数据的主函数 Args: uk: 作者UK months: 获取近几个月的数据,默认6个月(支持小数,如0.33代衡10天) use_proxy: 是否启用代理IP池 proxy_api_url: 代理API地址,留空使用默认 on_page_fetched: 回调函数,每页数据抽取后调用,signature: (page, items, ctime) -> None start_page: 起始页码(断点续传) start_ctime: 起始分页参数(断点续传) """ from datetime import datetime, timedelta import re spider = BaiduBJHSpider(use_proxy=use_proxy, proxy_api_url=proxy_api_url) # 计算目标日期(支持小数月份) days = int(months * 30) target_date = datetime.now() - timedelta(days=days) # 日志输出优化 if months < 1: logger.info(f"开始获取百家号数据(近{days}天, 目标日期: {target_date.strftime('%Y-%m-%d')})") else: logger.info(f"开始获取百家号数据(近{int(months)}个月, 目标日期: {target_date.strftime('%Y-%m-%d')})") # 先获取第一页数据(每次固定10条) # 注意:不再使用 all_articles 累加,每页直接通过回调保存 page = start_page # 支持从指定页码开始 current_ctime = start_ctime # 支持使用之前的分页参数 # 如果是断点续传,直接跳过第一页,使用保存的ctime if start_page > 1 and start_ctime: logger.info(f"断点续传:从第{start_page}页开始,ctime={start_ctime}") data = None # 不需要第一页的data结构 else: # 优化: 直接请求API,只有失败时才启动浏览器 logger.info("尝试直接请求API(不启动浏览器)...") data = await spider.fetch_data_directly(uk, use_browser=False, ctime=current_ctime) # 如果第一次失败,再启动浏览器重试 if not data or not data.get('data', {}).get('list'): if start_page == 1: # 只有第一页才需要浏览器重试 logger.warning("直接请求失败,启动浏览器获取Cookie...") # 打印第一次请求的返回数据 if data: logger.warning(f"第一次请求返回数据: {json.dumps(data, ensure_ascii=False, indent=2)}") else: logger.warning("第一次请求返回数据: None") data = await spider.fetch_data_directly(uk, use_browser=True) if not data or not data.get('data', {}).get('list'): logger.error("启动浏览器后仍然失败,放弃") # 打印最终的返回数据 if data: logger.error(f"最终返回数据: {json.dumps(data, ensure_ascii=False, indent=2)}") else: logger.error("最终返回数据: None") return None # 第一次请求成功,处理数据(只有非断点续传时) if data and data.get('data', {}).get('list'): items = data.get('data', {}).get('list', []) logger.info(f"第{page}页获取成功,数据条数: {len(items)}") # 调用回调保存第一页数据 if on_page_fetched: on_page_fetched(page, items, current_ctime) # 提取第一页的ctime用于分页 - 注意路径是 data.data.query.ctime current_ctime = data.get('data', {}).get('query', {}).get('ctime', current_ctime) if current_ctime: logger.info(f"获取到分页参数 ctime={current_ctime}") else: logger.warning("未获取到ctime分页参数") # 使用ctime(Unix时间戳)进行时间判断,更准确 def get_article_datetime(item_data: dict) -> datetime: """从ittemData中提取文章时间 优先使用ctime(Unix时间戳),更准确 """ # 优先使用ctime(秒级Unix时间戳) if 'ctime' in item_data and item_data['ctime']: try: timestamp = int(item_data['ctime']) return datetime.fromtimestamp(timestamp) except: pass # 备用: 使用time字段(相对时间或绝对时间) time_str = item_data.get('time', '') if not time_str: return datetime.now() now = datetime.now() if '分钟前' in time_str: minutes = int(re.search(r'(\d+)', time_str).group(1)) return now - timedelta(minutes=minutes) elif '小时前' in time_str: hours = int(re.search(r'(\d+)', time_str).group(1)) return now - timedelta(hours=hours) elif '天前' in time_str or '昨天' in time_str: if '昨天' in time_str: days = 1 else: days = int(re.search(r'(\d+)', time_str).group(1)) return now - timedelta(days=days) elif '-' in time_str: # 绝对时间格式 try: return datetime.strptime(time_str, '%Y-%m-%d %H:%M') except: try: return datetime.strptime(time_str, '%Y-%m-%d') except: return now return now # 检查最后一篇文章的时间,判断是否需要继续请求 need_more = True if data and data.get('data', {}).get('list'): items = data.get('data', {}).get('list', []) if items: last_item = items[-1] item_data = last_item.get('itemData', {}) article_date = get_article_datetime(item_data) logger.info(f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}") if article_date < target_date: need_more = False if months < 1: logger.info( f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}, 已超出{days}天范围,停止请求") else: logger.info( f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}, 已超出{int(months)}个月范围,停止请求") else: need_more = False elif start_page > 1: # 断点续传时,默认需要继续 need_more = True else: need_more = False # 循环请求后续页面,直到达到目标日期或无数据(不限制页数) while need_more: page += 1 logger.info(f"需要更多数据,请求第{page}页...") # 优化:使用随机延迟8-12秒,避免被识别为机器行为 delay = random.uniform(8, 12) logger.info(f"等待 {delay:.1f} 秒后请求...") await asyncio.sleep(delay) # 继续请求(使用上一次返回的ctime作为分页参数) next_data = await spider.fetch_data_directly(uk, use_browser=False, ctime=current_ctime) # fetch_data_directly已经处理了反爬检测和重试,这里只需检查是否成功获取数据 if not next_data or not next_data.get('data', {}).get('list'): # 如果还是失败,检查是否因为反爬 if next_data and next_data.get('data', {}).get('foe', {}).get('is_need_foe') == True: logger.error(f"第{page}页多次重试后仍然触发反爬,停止请求") logger.error(f"返回数据: {json.dumps(next_data, ensure_ascii=False, indent=2)}") else: logger.warning(f"第{page}页无数据,停止请求") # 打印完整的返回结果以便调试 if next_data: logger.warning(f"返回数据: {json.dumps(next_data, ensure_ascii=False, indent=2)}") else: logger.warning("返回数据: None") break next_items = next_data.get('data', {}).get('list', []) logger.info(f"第{page}页获取成功,数据条数: {len(next_items)}") # 调用回调保存这一页数据 if on_page_fetched: on_page_fetched(page, next_items, current_ctime) # 更新ctime为下一次请求做准备 - 注意路径是 data.data.query.ctime current_ctime = next_data.get('data', {}).get('query', {}).get('ctime', current_ctime) if current_ctime: logger.info(f"更新分页参数 ctime={current_ctime}") # 检查最后一篇文章的时间 if next_items: last_item = next_items[-1] item_data = last_item.get('itemData', {}) article_date = get_article_datetime(item_data) logger.info(f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}") if article_date < target_date: need_more = False if months < 1: logger.info( f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}, 已超出{days}天范围,停止请求") else: logger.info( f"最后一篇文章时间: {article_date.strftime('%Y-%m-%d %H:%M:%S')}, 已超出{int(months)}个月范围,停止请求") else: need_more = False # 返回最后的分页信息(用于断点续传) result = { 'last_page': page, 'last_ctime': current_ctime, 'completed': not need_more # 是否已完成 } logger.info(f"抓取完成,最后页码: {page}, ctime: {current_ctime}") return result # 同步包装函数(便于在同步代码中调用) def get_baidu_data_sync(uk: str = "ntHidnLhrlfclJar2z8wBg", months: int = 6, use_proxy: bool = False, proxy_api_url: str = None, on_page_fetched=None, start_page: int = 1, start_ctime: str = None) -> Optional[Dict]: """同步方式获取数据 Args: uk: 作者UK months: 获取近几个月的数据,默认6个月 use_proxy: 是否启用代理IP池 proxy_api_url: 代理API地址,留空使用默认 on_page_fetched: 回调函数,每页数据抽取后调用 start_page: 起始页码(断点续传) start_ctime: 起始分页参数(断点续传) """ return asyncio.run(fetch_baidu_data(uk, months, use_proxy, proxy_api_url, on_page_fetched, start_page, start_ctime)) # 保留原有的main函数用于测试 async def main(): data = await fetch_baidu_data() if data: print(json.dumps(data, ensure_ascii=False, indent=2)) from test2 import display_simple_data display_simple_data(data) if __name__ == "__main__": asyncio.run(main())