import json import random import time from typing import Dict, Any, Optional import logging from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError from fake_useragent import UserAgent import requests import re logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class BaiduBJHSpider: def __init__(self, use_proxy: bool = False): self.ua = UserAgent() self.use_proxy = use_proxy self.proxies = [] # 如果需要代理,这里填你的代理列表 self.session_cookie = None self.session = requests.Session() # 设置请求超时和重试 self.session.mount('http://', requests.adapters.HTTPAdapter(max_retries=3)) self.session.mount('https://', requests.adapters.HTTPAdapter(max_retries=3)) def init_browser(self, timeout: int = 15000): """初始化浏览器环境获取Cookie""" playwright = sync_playwright().start() try: # 配置浏览器参数 browser_args = [ '--disable-blink-features=AutomationControlled', '--disable-web-security', '--disable-features=IsolateOrigins,site-per-process', '--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage', '--disable-gpu', ] # 启动浏览器 browser = playwright.chromium.launch( headless=True, # 改为True,无头模式更快 args=browser_args, timeout=timeout ) # 创建上下文 context = browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent=self.ua.random, locale='zh-CN', timezone_id='Asia/Shanghai', # 设置超时 navigation_timeout=timeout, java_script_enabled=True, bypass_csp=True ) # 设置额外的HTTP头 context.set_extra_http_headers({ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', }) page = context.new_page() # 1. 首先访问百度首页获取基础Cookie logger.info("访问百度首页...") try: page.goto('https://www.baidu.com', wait_until='domcontentloaded', timeout=10000) time.sleep(random.uniform(1, 2)) except PlaywrightTimeoutError: logger.warning("百度首页加载超时,继续执行...") # 2. 访问百家号页面 logger.info("访问百家号页面...") try: # 使用更宽松的等待条件 page.goto('https://baijiahao.baidu.com/', wait_until='domcontentloaded', # 改为domcontentloaded,更快 timeout=10000) time.sleep(random.uniform(2, 3)) except PlaywrightTimeoutError: logger.warning("百家号页面加载超时,尝试继续...") # 即使超时,也尝试获取Cookie # 获取Cookie cookies = context.cookies() self.session_cookie = '; '.join([f"{c['name']}={c['value']}" for c in cookies]) # 将Cookie添加到requests session中 for cookie in cookies: self.session.cookies.set(cookie['name'], cookie['value']) if cookies: logger.info(f"成功获取到 {len(cookies)} 个Cookie") else: logger.warning("未获取到Cookie") browser.close() return cookies except Exception as e: logger.error(f"初始化浏览器失败: {e}") return None finally: playwright.stop() def build_headers(self, referer: str = "https://baijiahao.baidu.com/") -> Dict: """构建请求头""" headers = { 'User-Agent': self.ua.random, 'Accept': '*/*', 'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7', 'Accept-Encoding': 'gzip, deflate', 'Referer': referer, 'Connection': 'keep-alive', 'Pragma': 'no-cache', 'Cache-Control': 'no-cache', } if self.session_cookie: headers['Cookie'] = self.session_cookie return headers def generate_callback_name(self) -> str: """生成随机的callback函数名""" timestamp = int(time.time() * 1000) return f"__jsonp{timestamp}" def fetch_data_directly(self, uk: str = "ntHidnLhrlfclJar2z8wBg") -> Optional[Dict]: """直接请求接口(可能需要多次尝试)""" # 先初始化浏览器获取Cookie logger.info("初始化浏览器获取Cookie...") cookies = self.init_browser() if not cookies: logger.warning("未能获取到Cookie,尝试继续请求...") for attempt in range(3): # 尝试3次 try: callback_name = self.generate_callback_name() timestamp = int(time.time() * 1000) # 构建URL参数 - 使用更简单的参数 params = { 'tab': 'main', 'num': '10', 'uk': uk, 'source': 'pc', 'type': 'newhome', 'action': 'dynamic', 'format': 'jsonp', 'callback': callback_name, '_': str(timestamp) # 时间戳参数 } url = "https://mbd.baidu.com/webpage" headers = self.build_headers() logger.info(f"尝试第{attempt + 1}次请求...") # 随机延迟 time.sleep(random.uniform(1, 2)) # 设置代理(如果需要) proxies = None if self.use_proxy and self.proxies: proxy = random.choice(self.proxies) proxies = { 'http': proxy, 'https': proxy } response = self.session.get( url, params=params, headers=headers, timeout=15, # 缩短超时时间 proxies=proxies ) # 提取JSONP数据 text = response.text if text.startswith(callback_name + '(') and text.endswith(')'): json_str = text[len(callback_name) + 1:-1] data = json.loads(json_str) logger.info(f"成功获取JSON数据") return data else: # 尝试直接解析为JSON(可能是JSON格式) try: data = json.loads(text) logger.info("直接解析JSON成功") return data except: pass except requests.exceptions.Timeout: logger.error(f"请求超时 (尝试{attempt + 1})") except Exception as e: logger.error(f"请求失败 (尝试{attempt + 1}): {e}") # 等待后重试 if attempt < 2: # 如果不是最后一次尝试 time.sleep(random.uniform(2, 3)) return None def fetch_via_browser(self, uk: str = "ntHidnLhrlfclJar2z8wBg", timeout: int = 15000) -> Optional[Dict]: """通过浏览器直接执行获取数据""" playwright = sync_playwright().start() try: browser = playwright.chromium.launch( headless=True, # 无头模式 args=[ '--disable-blink-features=AutomationControlled', '--no-sandbox', '--disable-dev-shm-usage' ], timeout=timeout ) context = browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent=self.ua.random, locale='zh-CN', navigation_timeout=timeout ) page = context.new_page() # 监听网络请求 results = [] def handle_response(response): url = response.url if "mbd.baidu.com/webpage" in url and "format=jsonp" in url: try: # 获取响应文本 text = response.text() logger.info(f"捕获到请求: {url}") # 从URL提取callback名称 import urllib.parse parsed_url = urllib.parse.urlparse(url) query_params = urllib.parse.parse_qs(parsed_url.query) if 'callback' in query_params: callback = query_params['callback'][0] if text.startswith(callback + '(') and text.endswith(')'): json_str = text[len(callback) + 1:-1] data = json.loads(json_str) results.append(data) logger.info("成功解析JSONP数据") except Exception as e: logger.debug(f"处理响应失败: {e}") page.on("response", handle_response) # 访问百家号页面 target_url = f"https://baijiahao.baidu.com/u?app_id={uk}" logger.info(f"访问页面: {target_url}") try: page.goto(target_url, wait_until='domcontentloaded', timeout=10000) time.sleep(random.uniform(2, 3)) # 简单滚动 page.evaluate("window.scrollBy(0, 500)") time.sleep(1) page.evaluate("window.scrollBy(0, 500)") time.sleep(1) # 等待数据加载 time.sleep(2) except PlaywrightTimeoutError: logger.warning("页面加载超时,继续处理已捕获的数据...") browser.close() if results: logger.info(f"通过浏览器捕获到 {len(results)} 个结果") return results[0] except Exception as e: logger.error(f"浏览器方式获取失败: {e}") finally: playwright.stop() return None def fetch_with_ajax(self, uk: str = "ntHidnLhrlfclJar2z8wBg") -> Optional[Dict]: """使用简化参数直接请求""" try: timestamp = int(time.time() * 1000) # 使用更简单的参数 params = { 'action': 'dynamic', 'uk': uk, 'type': 'newhome', 'num': '10', 'format': 'json', '_': str(timestamp) } url = "https://mbd.baidu.com/webpage" headers = { 'User-Agent': self.ua.random, 'Referer': 'https://baijiahao.baidu.com/', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'X-Requested-With': 'XMLHttpRequest' } logger.info("尝试AJAX方式请求...") response = self.session.get( url, params=params, headers=headers, timeout=10 ) logger.info(f"AJAX响应状态: {response.status_code}") try: data = json.loads(response.text) logger.info("AJAX方式成功获取数据") return data except json.JSONDecodeError as e: logger.error(f"JSON解析失败: {e}") logger.info(f"响应内容: {response.text[:200]}") return None except Exception as e: logger.error(f"AJAX方式失败: {e}") return None def fetch_all_methods(self, uk: str = "ntHidnLhrlfclJar2z8wBg") -> Optional[Dict]: """尝试所有方法获取数据""" logger.info("=" * 50) logger.info(f"开始获取百家号数据,UK: {uk}") logger.info("=" * 50) # 方法1:直接请求 logger.info("\n方法1:直接请求接口...") data = self.fetch_data_directly(uk) if data and data.get("errno") == "0" and data.get("data", {}).get("list") is not None: logger.info(f"✓ 方法1成功,获取到 {len(data['data']['list'])} 条数据") return data else: logger.info("✗ 方法1失败或数据为空") # 方法2:通过浏览器获取 logger.info("\n方法2:浏览器模拟获取...") data = self.fetch_via_browser(uk) if data and data.get("errno") == "0" and data.get("data", {}).get("list") is not None: logger.info(f"✓ 方法2成功,获取到 {len(data['data']['list'])} 条数据") return data else: logger.info("✗ 方法2失败或数据为空") # 方法3:AJAX请求 logger.info("\n方法3:AJAX请求...") data = self.fetch_with_ajax(uk) if data and data.get("errno") == "0" and data.get("data", {}).get("list") is not None: logger.info(f"✓ 方法3成功,获取到 {len(data['data']['list'])} 条数据") return data else: logger.info("✗ 方法3失败或数据为空") # 方法4:备用请求 logger.info("\n方法4:尝试备用请求方式...") data = self.try_backup_method(uk) if data: logger.info("✓ 方法4成功获取数据") return data else: logger.error("所有方法都失败了") return None def try_backup_method(self, uk: str) -> Optional[Dict]: """备用方法:尝试不同的URL和参数""" backup_urls = [ "https://author.baidu.com/rest/2.0/ugc/dynamic", "https://mbd.baidu.com/dynamic/api", "https://baijiahao.baidu.com/builder/api" ] for url in backup_urls: try: params = { 'action': 'list', 'uk': uk, 'page': '1', 'page_size': '10', '_': str(int(time.time() * 1000)) } headers = { 'User-Agent': self.ua.random, 'Referer': 'https://baijiahao.baidu.com/' } response = requests.get(url, params=params, headers=headers, timeout=10) if response.status_code == 200: try: data = response.json() if data: logger.info(f"备用URL {url} 成功") return data except: pass except Exception as e: logger.debug(f"备用URL {url} 失败: {e}") return None def display_simple_data(data): """简单展示数据""" if not data or "data" not in data or "list" not in data["data"]: print("没有有效的数据") return articles = data["data"]["list"] print(f"\n获取到 {len(articles)} 篇文章:") for idx, article in enumerate(articles[:10]): # 显示前10条 print(f"\n{'=' * 60}") print(f"文章 {idx + 1}:") item_data = article.get("itemData", {}) # 标题 title = item_data.get("title", "无标题") # 清理标题中的换行符 title = title.replace('\n', ' ').strip() if not title or title == "无标题": # 尝试获取origin_title title = item_data.get("origin_title", "无标题").replace('\n', ' ').strip() print(f"标题: {title[:100]}{'...' if len(title) > 100 else ''}") # 作者 display_info = item_data.get("displaytype_exinfo", "") author = "未知作者" if display_info: try: info = json.loads(display_info) author = info.get("name", info.get("display_name", "未知作者")) except: # 尝试正则匹配 name_match = re.search(r'"name":"([^"]+)"', display_info) if name_match: author = name_match.group(1) print(f"作者: {author}") # 发布时间 time_str = item_data.get("time", item_data.get("cst_time", "未知时间")) print(f"发布时间: {time_str}") # 文章ID thread_id = item_data.get("thread_id", article.get("thread_id", "未知")) print(f"文章ID: {thread_id}") # 图片信息 img_src = item_data.get("imgSrc", []) if img_src: print(f"包含图片: {len(img_src)} 张") # 标签/话题 targets = item_data.get("target", []) if targets: tags = [t.get("key", "") for t in targets if t.get("key")] if tags: print(f"标签: {', '.join(tags)}") def main(): """主函数""" spider = BaiduBJHSpider() # 获取数据 data = spider.fetch_all_methods() if data: # 保存完整数据到文件 filename = f'baijiahao_data_{int(time.time())}.json' with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) logger.info(f"完整数据已保存到 {filename}") # 简单展示数据 display_simple_data(data) else: print("未能获取到数据,建议:") print("1. 检查网络连接") print("2. 尝试使用代理") print("3. 等待一段时间后重试") print("4. 检查目标页面是否可正常访问") if __name__ == "__main__": # 设置更详细的日志 logging.getLogger("playwright").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) main()