Files
ai_baijiahao/test2.py

526 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import random
import time
from typing import Dict, Any, Optional
import logging
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
from fake_useragent import UserAgent
import requests
import re
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class BaiduBJHSpider:
def __init__(self, use_proxy: bool = False):
self.ua = UserAgent()
self.use_proxy = use_proxy
self.proxies = [] # 如果需要代理,这里填你的代理列表
self.session_cookie = None
self.session = requests.Session()
# 设置请求超时和重试
self.session.mount('http://', requests.adapters.HTTPAdapter(max_retries=3))
self.session.mount('https://', requests.adapters.HTTPAdapter(max_retries=3))
def init_browser(self, timeout: int = 15000):
"""初始化浏览器环境获取Cookie"""
playwright = sync_playwright().start()
try:
# 配置浏览器参数
browser_args = [
'--disable-blink-features=AutomationControlled',
'--disable-web-security',
'--disable-features=IsolateOrigins,site-per-process',
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu',
]
# 启动浏览器
browser = playwright.chromium.launch(
headless=True, # 改为True无头模式更快
args=browser_args,
timeout=timeout
)
# 创建上下文
context = browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent=self.ua.random,
locale='zh-CN',
timezone_id='Asia/Shanghai',
# 设置超时
navigation_timeout=timeout,
java_script_enabled=True,
bypass_csp=True
)
# 设置额外的HTTP头
context.set_extra_http_headers({
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
})
page = context.new_page()
# 1. 首先访问百度首页获取基础Cookie
logger.info("访问百度首页...")
try:
page.goto('https://www.baidu.com', wait_until='domcontentloaded', timeout=10000)
time.sleep(random.uniform(1, 2))
except PlaywrightTimeoutError:
logger.warning("百度首页加载超时,继续执行...")
# 2. 访问百家号页面
logger.info("访问百家号页面...")
try:
# 使用更宽松的等待条件
page.goto('https://baijiahao.baidu.com/',
wait_until='domcontentloaded', # 改为domcontentloaded更快
timeout=10000)
time.sleep(random.uniform(2, 3))
except PlaywrightTimeoutError:
logger.warning("百家号页面加载超时,尝试继续...")
# 即使超时也尝试获取Cookie
# 获取Cookie
cookies = context.cookies()
self.session_cookie = '; '.join([f"{c['name']}={c['value']}" for c in cookies])
# 将Cookie添加到requests session中
for cookie in cookies:
self.session.cookies.set(cookie['name'], cookie['value'])
if cookies:
logger.info(f"成功获取到 {len(cookies)} 个Cookie")
else:
logger.warning("未获取到Cookie")
browser.close()
return cookies
except Exception as e:
logger.error(f"初始化浏览器失败: {e}")
return None
finally:
playwright.stop()
def build_headers(self, referer: str = "https://baijiahao.baidu.com/") -> Dict:
"""构建请求头"""
headers = {
'User-Agent': self.ua.random,
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
'Accept-Encoding': 'gzip, deflate',
'Referer': referer,
'Connection': 'keep-alive',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
}
if self.session_cookie:
headers['Cookie'] = self.session_cookie
return headers
def generate_callback_name(self) -> str:
"""生成随机的callback函数名"""
timestamp = int(time.time() * 1000)
return f"__jsonp{timestamp}"
def fetch_data_directly(self, uk: str = "ntHidnLhrlfclJar2z8wBg") -> Optional[Dict]:
"""直接请求接口(可能需要多次尝试)"""
# 先初始化浏览器获取Cookie
logger.info("初始化浏览器获取Cookie...")
cookies = self.init_browser()
if not cookies:
logger.warning("未能获取到Cookie尝试继续请求...")
for attempt in range(3): # 尝试3次
try:
callback_name = self.generate_callback_name()
timestamp = int(time.time() * 1000)
# 构建URL参数 - 使用更简单的参数
params = {
'tab': 'main',
'num': '10',
'uk': uk,
'source': 'pc',
'type': 'newhome',
'action': 'dynamic',
'format': 'jsonp',
'callback': callback_name,
'_': str(timestamp) # 时间戳参数
}
url = "https://mbd.baidu.com/webpage"
headers = self.build_headers()
logger.info(f"尝试第{attempt + 1}次请求...")
# 随机延迟
time.sleep(random.uniform(1, 2))
# 设置代理(如果需要)
proxies = None
if self.use_proxy and self.proxies:
proxy = random.choice(self.proxies)
proxies = {
'http': proxy,
'https': proxy
}
response = self.session.get(
url,
params=params,
headers=headers,
timeout=15, # 缩短超时时间
proxies=proxies
)
# 提取JSONP数据
text = response.text
if text.startswith(callback_name + '(') and text.endswith(')'):
json_str = text[len(callback_name) + 1:-1]
data = json.loads(json_str)
logger.info(f"成功获取JSON数据")
return data
else:
# 尝试直接解析为JSON可能是JSON格式
try:
data = json.loads(text)
logger.info("直接解析JSON成功")
return data
except:
pass
except requests.exceptions.Timeout:
logger.error(f"请求超时 (尝试{attempt + 1})")
except Exception as e:
logger.error(f"请求失败 (尝试{attempt + 1}): {e}")
# 等待后重试
if attempt < 2: # 如果不是最后一次尝试
time.sleep(random.uniform(2, 3))
return None
def fetch_via_browser(self, uk: str = "ntHidnLhrlfclJar2z8wBg", timeout: int = 15000) -> Optional[Dict]:
"""通过浏览器直接执行获取数据"""
playwright = sync_playwright().start()
try:
browser = playwright.chromium.launch(
headless=True, # 无头模式
args=[
'--disable-blink-features=AutomationControlled',
'--no-sandbox',
'--disable-dev-shm-usage'
],
timeout=timeout
)
context = browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent=self.ua.random,
locale='zh-CN',
navigation_timeout=timeout
)
page = context.new_page()
# 监听网络请求
results = []
def handle_response(response):
url = response.url
if "mbd.baidu.com/webpage" in url and "format=jsonp" in url:
try:
# 获取响应文本
text = response.text()
logger.info(f"捕获到请求: {url}")
# 从URL提取callback名称
import urllib.parse
parsed_url = urllib.parse.urlparse(url)
query_params = urllib.parse.parse_qs(parsed_url.query)
if 'callback' in query_params:
callback = query_params['callback'][0]
if text.startswith(callback + '(') and text.endswith(')'):
json_str = text[len(callback) + 1:-1]
data = json.loads(json_str)
results.append(data)
logger.info("成功解析JSONP数据")
except Exception as e:
logger.debug(f"处理响应失败: {e}")
page.on("response", handle_response)
# 访问百家号页面
target_url = f"https://baijiahao.baidu.com/u?app_id={uk}"
logger.info(f"访问页面: {target_url}")
try:
page.goto(target_url, wait_until='domcontentloaded', timeout=10000)
time.sleep(random.uniform(2, 3))
# 简单滚动
page.evaluate("window.scrollBy(0, 500)")
time.sleep(1)
page.evaluate("window.scrollBy(0, 500)")
time.sleep(1)
# 等待数据加载
time.sleep(2)
except PlaywrightTimeoutError:
logger.warning("页面加载超时,继续处理已捕获的数据...")
browser.close()
if results:
logger.info(f"通过浏览器捕获到 {len(results)} 个结果")
return results[0]
except Exception as e:
logger.error(f"浏览器方式获取失败: {e}")
finally:
playwright.stop()
return None
def fetch_with_ajax(self, uk: str = "ntHidnLhrlfclJar2z8wBg") -> Optional[Dict]:
"""使用简化参数直接请求"""
try:
timestamp = int(time.time() * 1000)
# 使用更简单的参数
params = {
'action': 'dynamic',
'uk': uk,
'type': 'newhome',
'num': '10',
'format': 'json',
'_': str(timestamp)
}
url = "https://mbd.baidu.com/webpage"
headers = {
'User-Agent': self.ua.random,
'Referer': 'https://baijiahao.baidu.com/',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'X-Requested-With': 'XMLHttpRequest'
}
logger.info("尝试AJAX方式请求...")
response = self.session.get(
url,
params=params,
headers=headers,
timeout=10
)
logger.info(f"AJAX响应状态: {response.status_code}")
try:
data = json.loads(response.text)
logger.info("AJAX方式成功获取数据")
return data
except json.JSONDecodeError as e:
logger.error(f"JSON解析失败: {e}")
logger.info(f"响应内容: {response.text[:200]}")
return None
except Exception as e:
logger.error(f"AJAX方式失败: {e}")
return None
def fetch_all_methods(self, uk: str = "ntHidnLhrlfclJar2z8wBg") -> Optional[Dict]:
"""尝试所有方法获取数据"""
logger.info("=" * 50)
logger.info(f"开始获取百家号数据UK: {uk}")
logger.info("=" * 50)
# 方法1直接请求
logger.info("\n方法1直接请求接口...")
data = self.fetch_data_directly(uk)
if data and data.get("errno") == "0" and data.get("data", {}).get("list") is not None:
logger.info(f"✓ 方法1成功获取到 {len(data['data']['list'])} 条数据")
return data
else:
logger.info("✗ 方法1失败或数据为空")
# 方法2通过浏览器获取
logger.info("\n方法2浏览器模拟获取...")
data = self.fetch_via_browser(uk)
if data and data.get("errno") == "0" and data.get("data", {}).get("list") is not None:
logger.info(f"✓ 方法2成功获取到 {len(data['data']['list'])} 条数据")
return data
else:
logger.info("✗ 方法2失败或数据为空")
# 方法3AJAX请求
logger.info("\n方法3AJAX请求...")
data = self.fetch_with_ajax(uk)
if data and data.get("errno") == "0" and data.get("data", {}).get("list") is not None:
logger.info(f"✓ 方法3成功获取到 {len(data['data']['list'])} 条数据")
return data
else:
logger.info("✗ 方法3失败或数据为空")
# 方法4备用请求
logger.info("\n方法4尝试备用请求方式...")
data = self.try_backup_method(uk)
if data:
logger.info("✓ 方法4成功获取数据")
return data
else:
logger.error("所有方法都失败了")
return None
def try_backup_method(self, uk: str) -> Optional[Dict]:
"""备用方法尝试不同的URL和参数"""
backup_urls = [
"https://author.baidu.com/rest/2.0/ugc/dynamic",
"https://mbd.baidu.com/dynamic/api",
"https://baijiahao.baidu.com/builder/api"
]
for url in backup_urls:
try:
params = {
'action': 'list',
'uk': uk,
'page': '1',
'page_size': '10',
'_': str(int(time.time() * 1000))
}
headers = {
'User-Agent': self.ua.random,
'Referer': 'https://baijiahao.baidu.com/'
}
response = requests.get(url, params=params, headers=headers, timeout=10)
if response.status_code == 200:
try:
data = response.json()
if data:
logger.info(f"备用URL {url} 成功")
return data
except:
pass
except Exception as e:
logger.debug(f"备用URL {url} 失败: {e}")
return None
def display_simple_data(data):
"""简单展示数据"""
if not data or "data" not in data or "list" not in data["data"]:
print("没有有效的数据")
return
articles = data["data"]["list"]
print(f"\n获取到 {len(articles)} 篇文章:")
for idx, article in enumerate(articles[:10]): # 显示前10条
print(f"\n{'=' * 60}")
print(f"文章 {idx + 1}:")
item_data = article.get("itemData", {})
# 标题
title = item_data.get("title", "无标题")
# 清理标题中的换行符
title = title.replace('\n', ' ').strip()
if not title or title == "无标题":
# 尝试获取origin_title
title = item_data.get("origin_title", "无标题").replace('\n', ' ').strip()
print(f"标题: {title[:100]}{'...' if len(title) > 100 else ''}")
# 作者
display_info = item_data.get("displaytype_exinfo", "")
author = "未知作者"
if display_info:
try:
info = json.loads(display_info)
author = info.get("name", info.get("display_name", "未知作者"))
except:
# 尝试正则匹配
name_match = re.search(r'"name":"([^"]+)"', display_info)
if name_match:
author = name_match.group(1)
print(f"作者: {author}")
# 发布时间
time_str = item_data.get("time", item_data.get("cst_time", "未知时间"))
print(f"发布时间: {time_str}")
# 文章ID
thread_id = item_data.get("thread_id", article.get("thread_id", "未知"))
print(f"文章ID: {thread_id}")
# 图片信息
img_src = item_data.get("imgSrc", [])
if img_src:
print(f"包含图片: {len(img_src)}")
# 标签/话题
targets = item_data.get("target", [])
if targets:
tags = [t.get("key", "") for t in targets if t.get("key")]
if tags:
print(f"标签: {', '.join(tags)}")
def main():
"""主函数"""
spider = BaiduBJHSpider()
# 获取数据
data = spider.fetch_all_methods()
if data:
# 保存完整数据到文件
filename = f'baijiahao_data_{int(time.time())}.json'
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"完整数据已保存到 {filename}")
# 简单展示数据
display_simple_data(data)
else:
print("未能获取到数据,建议:")
print("1. 检查网络连接")
print("2. 尝试使用代理")
print("3. 等待一段时间后重试")
print("4. 检查目标页面是否可正常访问")
if __name__ == "__main__":
# 设置更详细的日志
logging.getLogger("playwright").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
main()