Files
ai_mip/baidu_crawler.py

481 lines
17 KiB
Python
Raw Permalink Normal View History

2026-01-21 14:33:10 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
百度搜索结果爬虫
功能
1. 从ai_mip_query_task表读取查询词
2. 访问百度搜索结果页
3. 提取article标签中的rl-link-data-url
4. 去重后插入到ai_mip_site表
5. 更新任务计数和状态
"""
from loguru import logger
from playwright.sync_api import sync_playwright, Browser, Page, TimeoutError as PlaywrightTimeoutError
from db_manager import QueryTaskManager, SiteManager
from typing import List, Dict, Set
import time
import re
from urllib.parse import urljoin, urlparse, parse_qs
from datetime import datetime
class BaiduSearchCrawler:
"""百度搜索结果爬虫"""
# 百度搜索URL模板
SEARCH_URL_TEMPLATE = "https://www.baidu.com/s?pd=note&rpf=pc&dyTabStr=MTIsMCwzLDEsMiwxMyw3LDYsNSw5&wd={query}&bs={query}"
def __init__(self, headless: bool = True, timeout: int = 30000):
"""
初始化爬虫
Args:
headless: 是否无头模式
timeout: 超时时间毫秒
"""
self.headless = headless
self.timeout = timeout
self.task_mgr = QueryTaskManager()
self.site_mgr = SiteManager()
logger.info(f"BaiduSearchCrawler初始化: headless={headless}, timeout={timeout}ms")
def _scroll_and_load_more(self, page: Page, target_count: int = None) -> None:
"""
滚动页面加载更多内容直到达到目标数量或无新内容
Args:
page: Playwright页面对象
target_count: 目标数量阈值为None则滚动直到无新内容
"""
if target_count:
logger.info(f"开始滚动加载,目标数量: {target_count}")
else:
logger.info("开始滚动加载,直到无新内容")
scroll_count = 0
no_new_content_count = 0 # 连续无新内容次数
while True:
scroll_count += 1
try:
# 记录滚动前的article数量
before_count = page.locator('article._aladdin_eb42s_1').count()
# 检查是否已达到目标数量
if target_count and before_count >= target_count:
logger.info(f"✅ 已达到目标数量: {before_count}/{target_count},停止滚动")
break
# 滚动到页面底部
page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
logger.info(f"[滚动 {scroll_count}] 已滚动到底部")
# 等待新内容加载
time.sleep(2)
# 检查是否有新内容加载
after_count = page.locator('article._aladdin_eb42s_1').count()
new_items = after_count - before_count
if new_items > 0:
logger.info(f"[滚动 {scroll_count}] 加载了 {new_items} 个新内容 (总计: {after_count})")
no_new_content_count = 0 # 重置计数器
# 如果设置了目标数量,检查是否已达到
if target_count and after_count >= target_count:
logger.info(f"✅ 已达到目标数量: {after_count}/{target_count},停止滚动")
break
else:
no_new_content_count += 1
logger.info(f"[滚动 {scroll_count}] 没有新内容加载 ({no_new_content_count}/2)")
# 连续2次无新内容停止滚动
if no_new_content_count >= 2:
logger.info(f"⚠️ 连续无新内容,停止滚动 (总计: {after_count})")
break
# 安全限制最多滚动50次防止无限循环
if scroll_count >= 50:
logger.warning(f"⚠️ 已达到最大滚动次数限制 (50次),停止滚动")
break
except Exception as e:
logger.warning(f"[滚动 {scroll_count}] 滚动异常: {str(e)}")
break
final_count = page.locator('article._aladdin_eb42s_1').count()
logger.info(f"滚动加载完成,总计滚动 {scroll_count} 次,获取 {final_count} 个元素")
def _extract_urls_from_page(self, page: Page, target_count: int = None) -> List[str]:
"""
从页面中提取rl-link-data-url
Args:
page: Playwright页面对象
target_count: 目标数量阈值
Returns:
URL列表
"""
try:
# 等待article元素加载
page.wait_for_selector('article._aladdin_eb42s_1', timeout=self.timeout)
time.sleep(2) # 等待动态内容加载
# 执行滚动加载,传入目标数量
self._scroll_and_load_more(page, target_count=target_count)
# 获取所有article元素
articles = page.locator('article._aladdin_eb42s_1').all()
logger.info(f"找到 {len(articles)} 个article元素")
urls = []
for idx, article in enumerate(articles, 1):
try:
# 获取rl-link-data-url属性
data_url = article.get_attribute('rl-link-data-url')
if data_url:
urls.append(data_url)
logger.debug(f"[{idx}] 提取URL: {data_url}")
else:
logger.warning(f"[{idx}] article没有rl-link-data-url属性")
except Exception as e:
logger.warning(f"[{idx}] 提取URL失败: {str(e)}")
continue
logger.info(f"✅ 成功提取 {len(urls)} 个URL")
return urls
except PlaywrightTimeoutError:
logger.error("❌ 页面加载超时未找到article元素")
return []
except Exception as e:
logger.error(f"❌ 提取URL异常: {str(e)}")
return []
def _filter_health_baidu_urls(self, urls: List[str]) -> List[str]:
"""
过滤出 health.baidu.com 开头的URL
Args:
urls: URL列表
Returns:
过滤后的URL列表
"""
health_urls = []
for url in urls:
if url.startswith('https://health.baidu.com'):
health_urls.append(url)
else:
logger.debug(f"过滤非 health.baidu.com URL: {url}")
filtered_count = len(urls) - len(health_urls)
if filtered_count > 0:
logger.info(f"过滤非 health.baidu.com: {len(urls)} -> {len(health_urls)} (过滤 {filtered_count} 个)")
return health_urls
def _deduplicate_urls(self, urls: List[str]) -> List[str]:
"""
URL去重
Args:
urls: URL列表
Returns:
去重后的URL列表
"""
# 使用set去重保持顺序
seen = set()
unique_urls = []
for url in urls:
if url not in seen:
seen.add(url)
unique_urls.append(url)
removed_count = len(urls) - len(unique_urls)
if removed_count > 0:
logger.info(f"去重: {len(urls)} -> {len(unique_urls)} (移除 {removed_count} 个重复)")
return unique_urls
def _filter_existing_urls(self, urls: List[str]) -> List[str]:
"""
过滤数据库中已存在的URL
Args:
urls: URL列表
Returns:
新URL列表
"""
new_urls = []
for url in urls:
existing = self.site_mgr.get_site_by_url(url)
if not existing:
new_urls.append(url)
else:
logger.debug(f"URL已存在跳过: {url}")
filtered_count = len(urls) - len(new_urls)
if filtered_count > 0:
logger.info(f"过滤已存在: {len(urls)} -> {len(new_urls)} (过滤 {filtered_count} 个)")
return new_urls
def _save_urls_to_database(self, urls: List[str], query_word: str = None, category: str = None) -> int:
"""
保存URL到数据库
Args:
urls: URL列表
query_word: 来源查询词
category: 分类标签
Returns:
成功保存的数量
"""
success_count = 0
for url in urls:
try:
site_id = self.site_mgr.add_site(
site_url=url,
site_name=None, # 自动使用URL作为名称
site_dimension=category,
query_word=query_word, # 新增:来源查询词
frequency=1,
time_start='09:00:00',
time_end='21:00:00',
interval_minutes=30
)
if site_id:
success_count += 1
logger.debug(f"✅ 保存URL: {url} (ID: {site_id})")
except Exception as e:
logger.error(f"❌ 保存URL失败: {url}, {str(e)}")
continue
logger.info(f"保存到数据库: {success_count}/{len(urls)}")
return success_count
def crawl_query(self, query_word: str, task_id: int = None,
category: str = None, threshold_max: int = None) -> Dict:
"""
爬取单个查询词的搜索结果
Args:
query_word: 查询词
task_id: 任务ID用于更新计数
category: 分类标签
threshold_max: 最大抓取数量阈值
Returns:
爬取结果统计
"""
result = {
'query_word': query_word,
'task_id': task_id,
'success': False,
'crawled_count': 0,
'valid_count': 0,
'new_count': 0,
'error': None
}
try:
# 构建搜索URL
search_url = self.SEARCH_URL_TEMPLATE.format(query=query_word)
logger.info(f"开始爬取: {query_word}")
if threshold_max:
logger.info(f"目标阈值: {threshold_max}")
logger.info(f"搜索URL: {search_url}")
# 启动浏览器
with sync_playwright() as p:
browser = p.chromium.launch(headless=self.headless)
page = browser.new_page()
# 设置User-Agent
page.set_extra_http_headers({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
})
# 访问搜索页面
logger.info("访问搜索页面...")
page.goto(search_url, timeout=self.timeout, wait_until='domcontentloaded')
time.sleep(3) # 等待页面渲染
# 提取URL传入阈值
urls = self._extract_urls_from_page(page, target_count=threshold_max)
result['crawled_count'] = len(urls)
# 关闭浏览器
browser.close()
if not urls:
result['error'] = "未提取到任何URL"
return result
# 过滤出 health.baidu.com 的URL
health_urls = self._filter_health_baidu_urls(urls)
if not health_urls:
result['error'] = "未找到 health.baidu.com 的URL"
result['crawled_count'] = len(urls)
return result
# 去重
unique_urls = self._deduplicate_urls(health_urls)
result['crawled_count'] = len(health_urls) # 记录过滤后的数量
result['valid_count'] = len(unique_urls)
# 过滤已存在的URL
new_urls = self._filter_existing_urls(unique_urls)
# 保存到数据库传入query_word
saved_count = self._save_urls_to_database(new_urls, query_word=query_word, category=category)
result['new_count'] = saved_count
# 更新任务计数
if task_id:
self.task_mgr.increment_crawl_count(
task_id,
crawl_count=len(urls),
valid_count=saved_count
)
# 检查是否达到阈值
self.task_mgr.check_threshold(task_id)
result['success'] = True
logger.info(f"✅ 爬取完成: 爬取={result['crawled_count']}, 有效={result['valid_count']}, 新增={result['new_count']}")
except PlaywrightTimeoutError:
result['error'] = "页面加载超时"
logger.error(f"❌ 页面加载超时: {query_word}")
except Exception as e:
result['error'] = str(e)
logger.error(f"❌ 爬取失败: {query_word}, {str(e)}")
import traceback
traceback.print_exc()
return result
def crawl_tasks(self, limit: int = None) -> Dict:
"""
批量爬取任务
Args:
limit: 限制处理的任务数量
Returns:
批量爬取统计
"""
logger.info("="*70)
logger.info(" 百度搜索结果爬虫")
logger.info("="*70)
# 获取ready任务
ready_tasks = self.task_mgr.get_ready_tasks(limit=limit)
if not ready_tasks:
logger.warning("没有待执行的任务")
return {
'total_tasks': 0,
'success_count': 0,
'failed_count': 0,
'total_crawled': 0,
'total_saved': 0
}
logger.info(f"获取到 {len(ready_tasks)} 个待执行任务\n")
# 统计信息
stats = {
'total_tasks': len(ready_tasks),
'success_count': 0,
'failed_count': 0,
'total_crawled': 0,
'total_saved': 0
}
# 逐个处理任务
for idx, task in enumerate(ready_tasks, 1):
task_id = task['id']
query_word = task['query_word']
category = task['category']
threshold_max = task['threshold_max'] # 获取阈值
logger.info(f"[{idx}/{len(ready_tasks)}] 处理任务: {query_word} (阈值: {threshold_max})")
# 更新任务状态为doing
self.task_mgr.update_task_status(task_id, 'doing')
# 爬取,传入阈值
result = self.crawl_query(query_word, task_id, category, threshold_max=threshold_max)
# 更新统计
if result['success']:
stats['success_count'] += 1
stats['total_crawled'] += result['crawled_count']
stats['total_saved'] += result['new_count']
# 更新任务状态为finished
self.task_mgr.update_task_status(task_id, 'finished')
else:
stats['failed_count'] += 1
# 更新任务状态为failed
self.task_mgr.update_task_status(
task_id,
'failed',
error_message=result['error']
)
logger.info("")
time.sleep(2) # 延迟,避免请求过快
# 输出总结
logger.info("="*70)
logger.info(" 爬取完成")
logger.info("="*70)
logger.info(f"总任务数: {stats['total_tasks']}")
logger.info(f"成功: {stats['success_count']}")
logger.info(f"失败: {stats['failed_count']}")
logger.info(f"总爬取: {stats['total_crawled']} 个URL")
logger.info(f"新增保存: {stats['total_saved']} 个URL")
logger.info("="*70)
return stats
if __name__ == "__main__":
import sys
# 配置日志
logger.remove()
logger.add(
sys.stdout,
format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <level>{message}</level>",
level="INFO"
)
# 创建爬虫
crawler = BaiduSearchCrawler(headless=False) # headless=False 可以看到浏览器
# 批量爬取任务
crawler.crawl_tasks(limit=5) # 限制爬取5个任务