#!/usr/bin/env python # -*- coding: utf-8 -*- """ 百度搜索结果爬虫 功能: 1. 从ai_mip_query_task表读取查询词 2. 访问百度搜索结果页 3. 提取article标签中的rl-link-data-url 4. 去重后插入到ai_mip_site表 5. 更新任务计数和状态 """ from loguru import logger from playwright.sync_api import sync_playwright, Browser, Page, TimeoutError as PlaywrightTimeoutError from db_manager import QueryTaskManager, SiteManager from typing import List, Dict, Set import time import re from urllib.parse import urljoin, urlparse, parse_qs from datetime import datetime class BaiduSearchCrawler: """百度搜索结果爬虫""" # 百度搜索URL模板 SEARCH_URL_TEMPLATE = "https://www.baidu.com/s?pd=note&rpf=pc&dyTabStr=MTIsMCwzLDEsMiwxMyw3LDYsNSw5&wd={query}&bs={query}" def __init__(self, headless: bool = True, timeout: int = 30000): """ 初始化爬虫 Args: headless: 是否无头模式 timeout: 超时时间(毫秒) """ self.headless = headless self.timeout = timeout self.task_mgr = QueryTaskManager() self.site_mgr = SiteManager() logger.info(f"BaiduSearchCrawler初始化: headless={headless}, timeout={timeout}ms") def _scroll_and_load_more(self, page: Page, target_count: int = None) -> None: """ 滚动页面加载更多内容,直到达到目标数量或无新内容 Args: page: Playwright页面对象 target_count: 目标数量(阈值),为None则滚动直到无新内容 """ if target_count: logger.info(f"开始滚动加载,目标数量: {target_count}") else: logger.info("开始滚动加载,直到无新内容") scroll_count = 0 no_new_content_count = 0 # 连续无新内容次数 while True: scroll_count += 1 try: # 记录滚动前的article数量 before_count = page.locator('article._aladdin_eb42s_1').count() # 检查是否已达到目标数量 if target_count and before_count >= target_count: logger.info(f"✅ 已达到目标数量: {before_count}/{target_count},停止滚动") break # 滚动到页面底部 page.evaluate("window.scrollTo(0, document.body.scrollHeight)") logger.info(f"[滚动 {scroll_count}] 已滚动到底部") # 等待新内容加载 time.sleep(2) # 检查是否有新内容加载 after_count = page.locator('article._aladdin_eb42s_1').count() new_items = after_count - before_count if new_items > 0: logger.info(f"[滚动 {scroll_count}] 加载了 {new_items} 个新内容 (总计: {after_count})") no_new_content_count = 0 # 重置计数器 # 如果设置了目标数量,检查是否已达到 if target_count and after_count >= target_count: logger.info(f"✅ 已达到目标数量: {after_count}/{target_count},停止滚动") break else: no_new_content_count += 1 logger.info(f"[滚动 {scroll_count}] 没有新内容加载 ({no_new_content_count}/2)") # 连续2次无新内容,停止滚动 if no_new_content_count >= 2: logger.info(f"⚠️ 连续无新内容,停止滚动 (总计: {after_count})") break # 安全限制:最多滚动50次防止无限循环 if scroll_count >= 50: logger.warning(f"⚠️ 已达到最大滚动次数限制 (50次),停止滚动") break except Exception as e: logger.warning(f"[滚动 {scroll_count}] 滚动异常: {str(e)}") break final_count = page.locator('article._aladdin_eb42s_1').count() logger.info(f"滚动加载完成,总计滚动 {scroll_count} 次,获取 {final_count} 个元素") def _extract_urls_from_page(self, page: Page, target_count: int = None) -> List[str]: """ 从页面中提取rl-link-data-url Args: page: Playwright页面对象 target_count: 目标数量(阈值) Returns: URL列表 """ try: # 等待article元素加载 page.wait_for_selector('article._aladdin_eb42s_1', timeout=self.timeout) time.sleep(2) # 等待动态内容加载 # 执行滚动加载,传入目标数量 self._scroll_and_load_more(page, target_count=target_count) # 获取所有article元素 articles = page.locator('article._aladdin_eb42s_1').all() logger.info(f"找到 {len(articles)} 个article元素") urls = [] for idx, article in enumerate(articles, 1): try: # 获取rl-link-data-url属性 data_url = article.get_attribute('rl-link-data-url') if data_url: urls.append(data_url) logger.debug(f"[{idx}] 提取URL: {data_url}") else: logger.warning(f"[{idx}] article没有rl-link-data-url属性") except Exception as e: logger.warning(f"[{idx}] 提取URL失败: {str(e)}") continue logger.info(f"✅ 成功提取 {len(urls)} 个URL") return urls except PlaywrightTimeoutError: logger.error("❌ 页面加载超时,未找到article元素") return [] except Exception as e: logger.error(f"❌ 提取URL异常: {str(e)}") return [] def _filter_health_baidu_urls(self, urls: List[str]) -> List[str]: """ 过滤出 health.baidu.com 开头的URL Args: urls: URL列表 Returns: 过滤后的URL列表 """ health_urls = [] for url in urls: if url.startswith('https://health.baidu.com'): health_urls.append(url) else: logger.debug(f"过滤非 health.baidu.com URL: {url}") filtered_count = len(urls) - len(health_urls) if filtered_count > 0: logger.info(f"过滤非 health.baidu.com: {len(urls)} -> {len(health_urls)} (过滤 {filtered_count} 个)") return health_urls def _deduplicate_urls(self, urls: List[str]) -> List[str]: """ URL去重 Args: urls: URL列表 Returns: 去重后的URL列表 """ # 使用set去重,保持顺序 seen = set() unique_urls = [] for url in urls: if url not in seen: seen.add(url) unique_urls.append(url) removed_count = len(urls) - len(unique_urls) if removed_count > 0: logger.info(f"去重: {len(urls)} -> {len(unique_urls)} (移除 {removed_count} 个重复)") return unique_urls def _filter_existing_urls(self, urls: List[str]) -> List[str]: """ 过滤数据库中已存在的URL Args: urls: URL列表 Returns: 新URL列表 """ new_urls = [] for url in urls: existing = self.site_mgr.get_site_by_url(url) if not existing: new_urls.append(url) else: logger.debug(f"URL已存在,跳过: {url}") filtered_count = len(urls) - len(new_urls) if filtered_count > 0: logger.info(f"过滤已存在: {len(urls)} -> {len(new_urls)} (过滤 {filtered_count} 个)") return new_urls def _save_urls_to_database(self, urls: List[str], query_word: str = None, category: str = None) -> int: """ 保存URL到数据库 Args: urls: URL列表 query_word: 来源查询词 category: 分类标签 Returns: 成功保存的数量 """ success_count = 0 for url in urls: try: site_id = self.site_mgr.add_site( site_url=url, site_name=None, # 自动使用URL作为名称 site_dimension=category, query_word=query_word, # 新增:来源查询词 frequency=1, time_start='09:00:00', time_end='21:00:00', interval_minutes=30 ) if site_id: success_count += 1 logger.debug(f"✅ 保存URL: {url} (ID: {site_id})") except Exception as e: logger.error(f"❌ 保存URL失败: {url}, {str(e)}") continue logger.info(f"保存到数据库: {success_count}/{len(urls)}") return success_count def crawl_query(self, query_word: str, task_id: int = None, category: str = None, threshold_max: int = None) -> Dict: """ 爬取单个查询词的搜索结果 Args: query_word: 查询词 task_id: 任务ID(用于更新计数) category: 分类标签 threshold_max: 最大抓取数量阈值 Returns: 爬取结果统计 """ result = { 'query_word': query_word, 'task_id': task_id, 'success': False, 'crawled_count': 0, 'valid_count': 0, 'new_count': 0, 'error': None } try: # 构建搜索URL search_url = self.SEARCH_URL_TEMPLATE.format(query=query_word) logger.info(f"开始爬取: {query_word}") if threshold_max: logger.info(f"目标阈值: {threshold_max}") logger.info(f"搜索URL: {search_url}") # 启动浏览器 with sync_playwright() as p: browser = p.chromium.launch(headless=self.headless) page = browser.new_page() # 设置User-Agent page.set_extra_http_headers({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' }) # 访问搜索页面 logger.info("访问搜索页面...") page.goto(search_url, timeout=self.timeout, wait_until='domcontentloaded') time.sleep(3) # 等待页面渲染 # 提取URL,传入阈值 urls = self._extract_urls_from_page(page, target_count=threshold_max) result['crawled_count'] = len(urls) # 关闭浏览器 browser.close() if not urls: result['error'] = "未提取到任何URL" return result # 过滤出 health.baidu.com 的URL health_urls = self._filter_health_baidu_urls(urls) if not health_urls: result['error'] = "未找到 health.baidu.com 的URL" result['crawled_count'] = len(urls) return result # 去重 unique_urls = self._deduplicate_urls(health_urls) result['crawled_count'] = len(health_urls) # 记录过滤后的数量 result['valid_count'] = len(unique_urls) # 过滤已存在的URL new_urls = self._filter_existing_urls(unique_urls) # 保存到数据库,传入query_word saved_count = self._save_urls_to_database(new_urls, query_word=query_word, category=category) result['new_count'] = saved_count # 更新任务计数 if task_id: self.task_mgr.increment_crawl_count( task_id, crawl_count=len(urls), valid_count=saved_count ) # 检查是否达到阈值 self.task_mgr.check_threshold(task_id) result['success'] = True logger.info(f"✅ 爬取完成: 爬取={result['crawled_count']}, 有效={result['valid_count']}, 新增={result['new_count']}") except PlaywrightTimeoutError: result['error'] = "页面加载超时" logger.error(f"❌ 页面加载超时: {query_word}") except Exception as e: result['error'] = str(e) logger.error(f"❌ 爬取失败: {query_word}, {str(e)}") import traceback traceback.print_exc() return result def crawl_tasks(self, limit: int = None) -> Dict: """ 批量爬取任务 Args: limit: 限制处理的任务数量 Returns: 批量爬取统计 """ logger.info("="*70) logger.info(" 百度搜索结果爬虫") logger.info("="*70) # 获取ready任务 ready_tasks = self.task_mgr.get_ready_tasks(limit=limit) if not ready_tasks: logger.warning("没有待执行的任务") return { 'total_tasks': 0, 'success_count': 0, 'failed_count': 0, 'total_crawled': 0, 'total_saved': 0 } logger.info(f"获取到 {len(ready_tasks)} 个待执行任务\n") # 统计信息 stats = { 'total_tasks': len(ready_tasks), 'success_count': 0, 'failed_count': 0, 'total_crawled': 0, 'total_saved': 0 } # 逐个处理任务 for idx, task in enumerate(ready_tasks, 1): task_id = task['id'] query_word = task['query_word'] category = task['category'] threshold_max = task['threshold_max'] # 获取阈值 logger.info(f"[{idx}/{len(ready_tasks)}] 处理任务: {query_word} (阈值: {threshold_max})") # 更新任务状态为doing self.task_mgr.update_task_status(task_id, 'doing') # 爬取,传入阈值 result = self.crawl_query(query_word, task_id, category, threshold_max=threshold_max) # 更新统计 if result['success']: stats['success_count'] += 1 stats['total_crawled'] += result['crawled_count'] stats['total_saved'] += result['new_count'] # 更新任务状态为finished self.task_mgr.update_task_status(task_id, 'finished') else: stats['failed_count'] += 1 # 更新任务状态为failed self.task_mgr.update_task_status( task_id, 'failed', error_message=result['error'] ) logger.info("") time.sleep(2) # 延迟,避免请求过快 # 输出总结 logger.info("="*70) logger.info(" 爬取完成") logger.info("="*70) logger.info(f"总任务数: {stats['total_tasks']}") logger.info(f"成功: {stats['success_count']}") logger.info(f"失败: {stats['failed_count']}") logger.info(f"总爬取: {stats['total_crawled']} 个URL") logger.info(f"新增保存: {stats['total_saved']} 个URL") logger.info("="*70) return stats if __name__ == "__main__": import sys # 配置日志 logger.remove() logger.add( sys.stdout, format="{time:HH:mm:ss} | {level: <8} | {message}", level="INFO" ) # 创建爬虫 crawler = BaiduSearchCrawler(headless=False) # headless=False 可以看到浏览器 # 批量爬取任务 crawler.crawl_tasks(limit=5) # 限制爬取5个任务