"""
测试百度搜索爬虫
"""
from loguru import logger
from baidu_crawler import BaiduSearchCrawler
from db_manager import QueryTaskManager
from datetime import datetime
import sys
logger.remove()
logger.add(sys.stdout, format="{time:HH:mm:ss} | {level: <8} | {message}")
def test_single_query():
"""测试爬取单个查询词"""
print("="*70)
print(" 测试爬取单个查询词")
print("="*70)
# headless=False可以看到浏览器
# 会自动滚动直到无新内容
crawler = BaiduSearchCrawler(headless=False)
# 测试查询,设置阈值50
query_word = "糖尿病治疗"
result = crawler.crawl_query(query_word, category="医疗", threshold_max=50)
print("\n爬取结果:")
print(f" 查询词: {result['query_word']}")
print(f" 是否成功: {result['success']}")
print(f" 爬取数量: {result['crawled_count']}")
print(f" 有效数量: {result['valid_count']}")
print(f" 新增数量: {result['new_count']}")
if result['error']:
print(f" 错误信息: {result['error']}")
def test_batch_crawl():
"""测试批量爬取任务"""
print("="*70)
print(" 测试批量爬取任务")
print("="*70)
# 先创建一些测试任务
task_mgr = QueryTaskManager()
task_date = datetime.now().strftime('%Y%m%d')
test_queries = [
("高血压怎么治疗", "keyword", "医疗", 3, 30), # 阈值30
("在线教育平台哪个好", "phrase", "教育", 5, 20), # 阈值20
("免费法律咨询", "keyword", "法律", 4, 25), # 阈值25
]
logger.info("创建测试任务...")
for query, qtype, category, priority, threshold in test_queries:
task_mgr.create_task(
query_word=query,
query_type=qtype,
task_date=task_date,
threshold_max=threshold, # 使用各自的阈值
priority=priority,
category=category,
remark="测试任务"
)
print()
# 执行批量爬取,会自动滚动直到达到阈值
crawler = BaiduSearchCrawler(headless=False)
stats = crawler.crawl_tasks(limit=3)
print("\n批量爬取统计:")
print(f" 总任务数: {stats['total_tasks']}")
print(f" 成功: {stats['success_count']}")
print(f" 失败: {stats['failed_count']}")
print(f" 总爬取: {stats['total_crawled']}")
print(f" 新增保存: {stats['total_saved']}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='测试百度搜索爬虫')
parser.add_argument('--mode', choices=['single', 'batch'], default='single',
help='测试模式:single=单个查询, batch=批量任务')
args = parser.parse_args()
try:
if args.mode == 'single':
test_single_query()
else:
test_batch_crawl()
except Exception as e:
logger.error(f"测试失败: {str(e)}")
import traceback
traceback.print_exc()