Files
ai_baijiahao/test2.py

526 lines
18 KiB
Python
Raw Normal View History

import json
import random
import time
from typing import Dict, Any, Optional
import logging
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
from fake_useragent import UserAgent
import requests
import re
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class BaiduBJHSpider:
def __init__(self, use_proxy: bool = False):
self.ua = UserAgent()
self.use_proxy = use_proxy
self.proxies = [] # 如果需要代理,这里填你的代理列表
self.session_cookie = None
self.session = requests.Session()
# 设置请求超时和重试
self.session.mount('http://', requests.adapters.HTTPAdapter(max_retries=3))
self.session.mount('https://', requests.adapters.HTTPAdapter(max_retries=3))
def init_browser(self, timeout: int = 15000):
"""初始化浏览器环境获取Cookie"""
playwright = sync_playwright().start()
try:
# 配置浏览器参数
browser_args = [
'--disable-blink-features=AutomationControlled',
'--disable-web-security',
'--disable-features=IsolateOrigins,site-per-process',
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu',
]
# 启动浏览器
browser = playwright.chromium.launch(
headless=True, # 改为True无头模式更快
args=browser_args,
timeout=timeout
)
# 创建上下文
context = browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent=self.ua.random,
locale='zh-CN',
timezone_id='Asia/Shanghai',
# 设置超时
navigation_timeout=timeout,
java_script_enabled=True,
bypass_csp=True
)
# 设置额外的HTTP头
context.set_extra_http_headers({
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
})
page = context.new_page()
# 1. 首先访问百度首页获取基础Cookie
logger.info("访问百度首页...")
try:
page.goto('https://www.baidu.com', wait_until='domcontentloaded', timeout=10000)
time.sleep(random.uniform(1, 2))
except PlaywrightTimeoutError:
logger.warning("百度首页加载超时,继续执行...")
# 2. 访问百家号页面
logger.info("访问百家号页面...")
try:
# 使用更宽松的等待条件
page.goto('https://baijiahao.baidu.com/',
wait_until='domcontentloaded', # 改为domcontentloaded更快
timeout=10000)
time.sleep(random.uniform(2, 3))
except PlaywrightTimeoutError:
logger.warning("百家号页面加载超时,尝试继续...")
# 即使超时也尝试获取Cookie
# 获取Cookie
cookies = context.cookies()
self.session_cookie = '; '.join([f"{c['name']}={c['value']}" for c in cookies])
# 将Cookie添加到requests session中
for cookie in cookies:
self.session.cookies.set(cookie['name'], cookie['value'])
if cookies:
logger.info(f"成功获取到 {len(cookies)} 个Cookie")
else:
logger.warning("未获取到Cookie")
browser.close()
return cookies
except Exception as e:
logger.error(f"初始化浏览器失败: {e}")
return None
finally:
playwright.stop()
def build_headers(self, referer: str = "https://baijiahao.baidu.com/") -> Dict:
"""构建请求头"""
headers = {
'User-Agent': self.ua.random,
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
'Accept-Encoding': 'gzip, deflate',
'Referer': referer,
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
}
if self.session_cookie:
headers['Cookie'] = self.session_cookie
return headers
def generate_callback_name(self) -> str:
"""生成随机的callback函数名"""
timestamp = int(time.time() * 1000)
return f"__jsonp{timestamp}"
def fetch_data_directly(self, uk: str = "ntHidnLhrlfclJar2z8wBg") -> Optional[Dict]:
"""直接请求接口(可能需要多次尝试)"""
# 先初始化浏览器获取Cookie
logger.info("初始化浏览器获取Cookie...")
cookies = self.init_browser()
if not cookies:
logger.warning("未能获取到Cookie尝试继续请求...")
for attempt in range(3): # 尝试3次
try:
callback_name = self.generate_callback_name()
timestamp = int(time.time() * 1000)
# 构建URL参数 - 使用更简单的参数
params = {
'tab': 'main',
'num': '10',
'uk': uk,
'source': 'pc',
'type': 'newhome',
'action': 'dynamic',
'format': 'jsonp',
'callback': callback_name,
'_': str(timestamp) # 时间戳参数
}
url = "https://mbd.baidu.com/webpage"
headers = self.build_headers()
logger.info(f"尝试第{attempt + 1}次请求...")
# 随机延迟
time.sleep(random.uniform(1, 2))
# 设置代理(如果需要)
proxies = None
if self.use_proxy and self.proxies:
proxy = random.choice(self.proxies)
proxies = {
'http': proxy,
'https': proxy
}
response = self.session.get(
url,
params=params,
headers=headers,
timeout=15, # 缩短超时时间
proxies=proxies
)
# 提取JSONP数据
text = response.text
if text.startswith(callback_name + '(') and text.endswith(')'):
json_str = text[len(callback_name) + 1:-1]
data = json.loads(json_str)
logger.info(f"成功获取JSON数据")
return data
else:
# 尝试直接解析为JSON可能是JSON格式
try:
data = json.loads(text)
logger.info("直接解析JSON成功")
return data
except:
pass
except requests.exceptions.Timeout:
logger.error(f"请求超时 (尝试{attempt + 1})")
except Exception as e:
logger.error(f"请求失败 (尝试{attempt + 1}): {e}")
# 等待后重试
if attempt < 2: # 如果不是最后一次尝试
time.sleep(random.uniform(2, 3))
return None
def fetch_via_browser(self, uk: str = "ntHidnLhrlfclJar2z8wBg", timeout: int = 15000) -> Optional[Dict]:
"""通过浏览器直接执行获取数据"""
playwright = sync_playwright().start()
try:
browser = playwright.chromium.launch(
headless=True, # 无头模式
args=[
'--disable-blink-features=AutomationControlled',
'--no-sandbox',
'--disable-dev-shm-usage'
],
timeout=timeout
)
context = browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent=self.ua.random,
locale='zh-CN',
navigation_timeout=timeout
)
page = context.new_page()
# 监听网络请求
results = []
def handle_response(response):
url = response.url
if "mbd.baidu.com/webpage" in url and "format=jsonp" in url:
try:
# 获取响应文本
text = response.text()
logger.info(f"捕获到请求: {url}")
# 从URL提取callback名称
import urllib.parse
parsed_url = urllib.parse.urlparse(url)
query_params = urllib.parse.parse_qs(parsed_url.query)
if 'callback' in query_params:
callback = query_params['callback'][0]
if text.startswith(callback + '(') and text.endswith(')'):
json_str = text[len(callback) + 1:-1]
data = json.loads(json_str)
results.append(data)
logger.info("成功解析JSONP数据")
except Exception as e:
logger.debug(f"处理响应失败: {e}")
page.on("response", handle_response)
# 访问百家号页面
target_url = f"https://baijiahao.baidu.com/u?app_id={uk}"
logger.info(f"访问页面: {target_url}")
try:
page.goto(target_url, wait_until='domcontentloaded', timeout=10000)
time.sleep(random.uniform(2, 3))
# 简单滚动
page.evaluate("window.scrollBy(0, 500)")
time.sleep(1)
page.evaluate("window.scrollBy(0, 500)")
time.sleep(1)
# 等待数据加载
time.sleep(2)
except PlaywrightTimeoutError:
logger.warning("页面加载超时,继续处理已捕获的数据...")
browser.close()
if results:
logger.info(f"通过浏览器捕获到 {len(results)} 个结果")
return results[0]
except Exception as e:
logger.error(f"浏览器方式获取失败: {e}")
finally:
playwright.stop()
return None
def fetch_with_ajax(self, uk: str = "ntHidnLhrlfclJar2z8wBg") -> Optional[Dict]:
"""使用简化参数直接请求"""
try:
timestamp = int(time.time() * 1000)
# 使用更简单的参数
params = {
'action': 'dynamic',
'uk': uk,
'type': 'newhome',
'num': '10',
'format': 'json',
'_': str(timestamp)
}
url = "https://mbd.baidu.com/webpage"
headers = {
'User-Agent': self.ua.random,
'Referer': 'https://baijiahao.baidu.com/',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'X-Requested-With': 'XMLHttpRequest'
}
logger.info("尝试AJAX方式请求...")
response = self.session.get(
url,
params=params,
headers=headers,
timeout=10
)
logger.info(f"AJAX响应状态: {response.status_code}")
try:
data = json.loads(response.text)
logger.info("AJAX方式成功获取数据")
return data
except json.JSONDecodeError as e:
logger.error(f"JSON解析失败: {e}")
logger.info(f"响应内容: {response.text[:200]}")
return None
except Exception as e:
logger.error(f"AJAX方式失败: {e}")
return None
def fetch_all_methods(self, uk: str = "ntHidnLhrlfclJar2z8wBg") -> Optional[Dict]:
"""尝试所有方法获取数据"""
logger.info("=" * 50)
logger.info(f"开始获取百家号数据UK: {uk}")
logger.info("=" * 50)
# 方法1直接请求
logger.info("\n方法1直接请求接口...")
data = self.fetch_data_directly(uk)
if data and data.get("errno") == "0" and data.get("data", {}).get("list") is not None:
logger.info(f"✓ 方法1成功获取到 {len(data['data']['list'])} 条数据")
return data
else:
logger.info("✗ 方法1失败或数据为空")
# 方法2通过浏览器获取
logger.info("\n方法2浏览器模拟获取...")
data = self.fetch_via_browser(uk)
if data and data.get("errno") == "0" and data.get("data", {}).get("list") is not None:
logger.info(f"✓ 方法2成功获取到 {len(data['data']['list'])} 条数据")
return data
else:
logger.info("✗ 方法2失败或数据为空")
# 方法3AJAX请求
logger.info("\n方法3AJAX请求...")
data = self.fetch_with_ajax(uk)
if data and data.get("errno") == "0" and data.get("data", {}).get("list") is not None:
logger.info(f"✓ 方法3成功获取到 {len(data['data']['list'])} 条数据")
return data
else:
logger.info("✗ 方法3失败或数据为空")
# 方法4备用请求
logger.info("\n方法4尝试备用请求方式...")
data = self.try_backup_method(uk)
if data:
logger.info("✓ 方法4成功获取数据")
return data
else:
logger.error("所有方法都失败了")
return None
def try_backup_method(self, uk: str) -> Optional[Dict]:
"""备用方法尝试不同的URL和参数"""
backup_urls = [
"https://author.baidu.com/rest/2.0/ugc/dynamic",
"https://mbd.baidu.com/dynamic/api",
"https://baijiahao.baidu.com/builder/api"
]
for url in backup_urls:
try:
params = {
'action': 'list',
'uk': uk,
'page': '1',
'page_size': '10',
'_': str(int(time.time() * 1000))
}
headers = {
'User-Agent': self.ua.random,
'Referer': 'https://baijiahao.baidu.com/'
}
response = requests.get(url, params=params, headers=headers, timeout=10)
if response.status_code == 200:
try:
data = response.json()
if data:
logger.info(f"备用URL {url} 成功")
return data
except:
pass
except Exception as e:
logger.debug(f"备用URL {url} 失败: {e}")
return None
def display_simple_data(data):
"""简单展示数据"""
if not data or "data" not in data or "list" not in data["data"]:
print("没有有效的数据")
return
articles = data["data"]["list"]
print(f"\n获取到 {len(articles)} 篇文章:")
for idx, article in enumerate(articles[:10]): # 显示前10条
print(f"\n{'=' * 60}")
print(f"文章 {idx + 1}:")
item_data = article.get("itemData", {})
# 标题
title = item_data.get("title", "无标题")
# 清理标题中的换行符
title = title.replace('\n', ' ').strip()
if not title or title == "无标题":
# 尝试获取origin_title
title = item_data.get("origin_title", "无标题").replace('\n', ' ').strip()
print(f"标题: {title[:100]}{'...' if len(title) > 100 else ''}")
# 作者
display_info = item_data.get("displaytype_exinfo", "")
author = "未知作者"
if display_info:
try:
info = json.loads(display_info)
author = info.get("name", info.get("display_name", "未知作者"))
except:
# 尝试正则匹配
name_match = re.search(r'"name":"([^"]+)"', display_info)
if name_match:
author = name_match.group(1)
print(f"作者: {author}")
# 发布时间
time_str = item_data.get("time", item_data.get("cst_time", "未知时间"))
print(f"发布时间: {time_str}")
# 文章ID
thread_id = item_data.get("thread_id", article.get("thread_id", "未知"))
print(f"文章ID: {thread_id}")
# 图片信息
img_src = item_data.get("imgSrc", [])
if img_src:
print(f"包含图片: {len(img_src)}")
# 标签/话题
targets = item_data.get("target", [])
if targets:
tags = [t.get("key", "") for t in targets if t.get("key")]
if tags:
print(f"标签: {', '.join(tags)}")
def main():
"""主函数"""
spider = BaiduBJHSpider()
# 获取数据
data = spider.fetch_all_methods()
if data:
# 保存完整数据到文件
filename = f'baijiahao_data_{int(time.time())}.json'
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"完整数据已保存到 {filename}")
# 简单展示数据
display_simple_data(data)
else:
print("未能获取到数据,建议:")
print("1. 检查网络连接")
print("2. 尝试使用代理")
print("3. 等待一段时间后重试")
print("4. 检查目标页面是否可正常访问")
if __name__ == "__main__":
# 设置更详细的日志
logging.getLogger("playwright").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
main()