diff --git a/.gitignore b/.gitignore index 8bfabc4..a8f489f 100644 --- a/.gitignore +++ b/.gitignore @@ -45,10 +45,17 @@ env/ *.log logs/ log/ +logs_dev/ -# 测试截图 +# 测试截图和数据 test_screenshot*.png screenshot*.png +test/ +test_concurrent/ +screenshots/ +*.png +*.jpg +*.jpeg # 数据文件 urls_data.json diff --git a/ai_mip.service b/ai_mip.service new file mode 100644 index 0000000..0eb5142 --- /dev/null +++ b/ai_mip.service @@ -0,0 +1,27 @@ +[Unit] +Description=AI MIP Advertisement Click Service +After=network.target mysql.service + +[Service] +Type=simple +User=www-data +Group=www-data +WorkingDirectory=/opt/ai_mip +Environment="PATH=/opt/ai_mip/venv/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" +ExecStart=/opt/ai_mip/venv/bin/python /opt/ai_mip/main.py --workers 3 --health-port 8899 +Restart=always +RestartSec=10 +StandardOutput=append:/var/log/ai_mip/service.log +StandardError=append:/var/log/ai_mip/error.log + +# 资源限制 +LimitNOFILE=65535 +LimitNPROC=4096 + +# 优雅关闭 +TimeoutStopSec=30 +KillMode=mixed +KillSignal=SIGTERM + +[Install] +WantedBy=multi-user.target diff --git a/ai_mip.zip b/ai_mip.zip index f672ac8..0564d73 100644 Binary files a/ai_mip.zip and b/ai_mip.zip differ diff --git a/baidu_crawler.py b/baidu_crawler.py new file mode 100644 index 0000000..f2f8ecd --- /dev/null +++ b/baidu_crawler.py @@ -0,0 +1,480 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +百度搜索结果爬虫 + +功能: +1. 从ai_mip_query_task表读取查询词 +2. 访问百度搜索结果页 +3. 提取article标签中的rl-link-data-url +4. 去重后插入到ai_mip_site表 +5. 更新任务计数和状态 +""" + +from loguru import logger +from playwright.sync_api import sync_playwright, Browser, Page, TimeoutError as PlaywrightTimeoutError +from db_manager import QueryTaskManager, SiteManager +from typing import List, Dict, Set +import time +import re +from urllib.parse import urljoin, urlparse, parse_qs +from datetime import datetime + + +class BaiduSearchCrawler: + """百度搜索结果爬虫""" + + # 百度搜索URL模板 + SEARCH_URL_TEMPLATE = "https://www.baidu.com/s?pd=note&rpf=pc&dyTabStr=MTIsMCwzLDEsMiwxMyw3LDYsNSw5&wd={query}&bs={query}" + + def __init__(self, headless: bool = True, timeout: int = 30000): + """ + 初始化爬虫 + + Args: + headless: 是否无头模式 + timeout: 超时时间(毫秒) + """ + self.headless = headless + self.timeout = timeout + self.task_mgr = QueryTaskManager() + self.site_mgr = SiteManager() + + logger.info(f"BaiduSearchCrawler初始化: headless={headless}, timeout={timeout}ms") + + def _scroll_and_load_more(self, page: Page, target_count: int = None) -> None: + """ + 滚动页面加载更多内容,直到达到目标数量或无新内容 + + Args: + page: Playwright页面对象 + target_count: 目标数量(阈值),为None则滚动直到无新内容 + """ + if target_count: + logger.info(f"开始滚动加载,目标数量: {target_count}") + else: + logger.info("开始滚动加载,直到无新内容") + + scroll_count = 0 + no_new_content_count = 0 # 连续无新内容次数 + + while True: + scroll_count += 1 + + try: + # 记录滚动前的article数量 + before_count = page.locator('article._aladdin_eb42s_1').count() + + # 检查是否已达到目标数量 + if target_count and before_count >= target_count: + logger.info(f"✅ 已达到目标数量: {before_count}/{target_count},停止滚动") + break + + # 滚动到页面底部 + page.evaluate("window.scrollTo(0, document.body.scrollHeight)") + logger.info(f"[滚动 {scroll_count}] 已滚动到底部") + + # 等待新内容加载 + time.sleep(2) + + # 检查是否有新内容加载 + after_count = page.locator('article._aladdin_eb42s_1').count() + new_items = after_count - before_count + + if new_items > 0: + logger.info(f"[滚动 {scroll_count}] 加载了 {new_items} 个新内容 (总计: {after_count})") + no_new_content_count = 0 # 重置计数器 + + # 如果设置了目标数量,检查是否已达到 + if target_count and after_count >= target_count: + logger.info(f"✅ 已达到目标数量: {after_count}/{target_count},停止滚动") + break + else: + no_new_content_count += 1 + logger.info(f"[滚动 {scroll_count}] 没有新内容加载 ({no_new_content_count}/2)") + + # 连续2次无新内容,停止滚动 + if no_new_content_count >= 2: + logger.info(f"⚠️ 连续无新内容,停止滚动 (总计: {after_count})") + break + + # 安全限制:最多滚动50次防止无限循环 + if scroll_count >= 50: + logger.warning(f"⚠️ 已达到最大滚动次数限制 (50次),停止滚动") + break + + except Exception as e: + logger.warning(f"[滚动 {scroll_count}] 滚动异常: {str(e)}") + break + + final_count = page.locator('article._aladdin_eb42s_1').count() + logger.info(f"滚动加载完成,总计滚动 {scroll_count} 次,获取 {final_count} 个元素") + + def _extract_urls_from_page(self, page: Page, target_count: int = None) -> List[str]: + """ + 从页面中提取rl-link-data-url + + Args: + page: Playwright页面对象 + target_count: 目标数量(阈值) + + Returns: + URL列表 + """ + try: + # 等待article元素加载 + page.wait_for_selector('article._aladdin_eb42s_1', timeout=self.timeout) + time.sleep(2) # 等待动态内容加载 + + # 执行滚动加载,传入目标数量 + self._scroll_and_load_more(page, target_count=target_count) + + # 获取所有article元素 + articles = page.locator('article._aladdin_eb42s_1').all() + logger.info(f"找到 {len(articles)} 个article元素") + + urls = [] + for idx, article in enumerate(articles, 1): + try: + # 获取rl-link-data-url属性 + data_url = article.get_attribute('rl-link-data-url') + + if data_url: + urls.append(data_url) + logger.debug(f"[{idx}] 提取URL: {data_url}") + else: + logger.warning(f"[{idx}] article没有rl-link-data-url属性") + + except Exception as e: + logger.warning(f"[{idx}] 提取URL失败: {str(e)}") + continue + + logger.info(f"✅ 成功提取 {len(urls)} 个URL") + return urls + + except PlaywrightTimeoutError: + logger.error("❌ 页面加载超时,未找到article元素") + return [] + except Exception as e: + logger.error(f"❌ 提取URL异常: {str(e)}") + return [] + + def _filter_health_baidu_urls(self, urls: List[str]) -> List[str]: + """ + 过滤出 health.baidu.com 开头的URL + + Args: + urls: URL列表 + + Returns: + 过滤后的URL列表 + """ + health_urls = [] + + for url in urls: + if url.startswith('https://health.baidu.com'): + health_urls.append(url) + else: + logger.debug(f"过滤非 health.baidu.com URL: {url}") + + filtered_count = len(urls) - len(health_urls) + if filtered_count > 0: + logger.info(f"过滤非 health.baidu.com: {len(urls)} -> {len(health_urls)} (过滤 {filtered_count} 个)") + + return health_urls + + def _deduplicate_urls(self, urls: List[str]) -> List[str]: + """ + URL去重 + + Args: + urls: URL列表 + + Returns: + 去重后的URL列表 + """ + # 使用set去重,保持顺序 + seen = set() + unique_urls = [] + + for url in urls: + if url not in seen: + seen.add(url) + unique_urls.append(url) + + removed_count = len(urls) - len(unique_urls) + if removed_count > 0: + logger.info(f"去重: {len(urls)} -> {len(unique_urls)} (移除 {removed_count} 个重复)") + + return unique_urls + + def _filter_existing_urls(self, urls: List[str]) -> List[str]: + """ + 过滤数据库中已存在的URL + + Args: + urls: URL列表 + + Returns: + 新URL列表 + """ + new_urls = [] + + for url in urls: + existing = self.site_mgr.get_site_by_url(url) + if not existing: + new_urls.append(url) + else: + logger.debug(f"URL已存在,跳过: {url}") + + filtered_count = len(urls) - len(new_urls) + if filtered_count > 0: + logger.info(f"过滤已存在: {len(urls)} -> {len(new_urls)} (过滤 {filtered_count} 个)") + + return new_urls + + def _save_urls_to_database(self, urls: List[str], query_word: str = None, category: str = None) -> int: + """ + 保存URL到数据库 + + Args: + urls: URL列表 + query_word: 来源查询词 + category: 分类标签 + + Returns: + 成功保存的数量 + """ + success_count = 0 + + for url in urls: + try: + site_id = self.site_mgr.add_site( + site_url=url, + site_name=None, # 自动使用URL作为名称 + site_dimension=category, + query_word=query_word, # 新增:来源查询词 + frequency=1, + time_start='09:00:00', + time_end='21:00:00', + interval_minutes=30 + ) + + if site_id: + success_count += 1 + logger.debug(f"✅ 保存URL: {url} (ID: {site_id})") + + except Exception as e: + logger.error(f"❌ 保存URL失败: {url}, {str(e)}") + continue + + logger.info(f"保存到数据库: {success_count}/{len(urls)}") + return success_count + + def crawl_query(self, query_word: str, task_id: int = None, + category: str = None, threshold_max: int = None) -> Dict: + """ + 爬取单个查询词的搜索结果 + + Args: + query_word: 查询词 + task_id: 任务ID(用于更新计数) + category: 分类标签 + threshold_max: 最大抓取数量阈值 + + Returns: + 爬取结果统计 + """ + result = { + 'query_word': query_word, + 'task_id': task_id, + 'success': False, + 'crawled_count': 0, + 'valid_count': 0, + 'new_count': 0, + 'error': None + } + + try: + # 构建搜索URL + search_url = self.SEARCH_URL_TEMPLATE.format(query=query_word) + logger.info(f"开始爬取: {query_word}") + if threshold_max: + logger.info(f"目标阈值: {threshold_max}") + logger.info(f"搜索URL: {search_url}") + + # 启动浏览器 + with sync_playwright() as p: + browser = p.chromium.launch(headless=self.headless) + page = browser.new_page() + + # 设置User-Agent + page.set_extra_http_headers({ + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' + }) + + # 访问搜索页面 + logger.info("访问搜索页面...") + page.goto(search_url, timeout=self.timeout, wait_until='domcontentloaded') + time.sleep(3) # 等待页面渲染 + + # 提取URL,传入阈值 + urls = self._extract_urls_from_page(page, target_count=threshold_max) + result['crawled_count'] = len(urls) + + # 关闭浏览器 + browser.close() + + if not urls: + result['error'] = "未提取到任何URL" + return result + + # 过滤出 health.baidu.com 的URL + health_urls = self._filter_health_baidu_urls(urls) + + if not health_urls: + result['error'] = "未找到 health.baidu.com 的URL" + result['crawled_count'] = len(urls) + return result + + # 去重 + unique_urls = self._deduplicate_urls(health_urls) + result['crawled_count'] = len(health_urls) # 记录过滤后的数量 + result['valid_count'] = len(unique_urls) + + # 过滤已存在的URL + new_urls = self._filter_existing_urls(unique_urls) + + # 保存到数据库,传入query_word + saved_count = self._save_urls_to_database(new_urls, query_word=query_word, category=category) + result['new_count'] = saved_count + + # 更新任务计数 + if task_id: + self.task_mgr.increment_crawl_count( + task_id, + crawl_count=len(urls), + valid_count=saved_count + ) + + # 检查是否达到阈值 + self.task_mgr.check_threshold(task_id) + + result['success'] = True + logger.info(f"✅ 爬取完成: 爬取={result['crawled_count']}, 有效={result['valid_count']}, 新增={result['new_count']}") + + except PlaywrightTimeoutError: + result['error'] = "页面加载超时" + logger.error(f"❌ 页面加载超时: {query_word}") + except Exception as e: + result['error'] = str(e) + logger.error(f"❌ 爬取失败: {query_word}, {str(e)}") + import traceback + traceback.print_exc() + + return result + + def crawl_tasks(self, limit: int = None) -> Dict: + """ + 批量爬取任务 + + Args: + limit: 限制处理的任务数量 + + Returns: + 批量爬取统计 + """ + logger.info("="*70) + logger.info(" 百度搜索结果爬虫") + logger.info("="*70) + + # 获取ready任务 + ready_tasks = self.task_mgr.get_ready_tasks(limit=limit) + + if not ready_tasks: + logger.warning("没有待执行的任务") + return { + 'total_tasks': 0, + 'success_count': 0, + 'failed_count': 0, + 'total_crawled': 0, + 'total_saved': 0 + } + + logger.info(f"获取到 {len(ready_tasks)} 个待执行任务\n") + + # 统计信息 + stats = { + 'total_tasks': len(ready_tasks), + 'success_count': 0, + 'failed_count': 0, + 'total_crawled': 0, + 'total_saved': 0 + } + + # 逐个处理任务 + for idx, task in enumerate(ready_tasks, 1): + task_id = task['id'] + query_word = task['query_word'] + category = task['category'] + threshold_max = task['threshold_max'] # 获取阈值 + + logger.info(f"[{idx}/{len(ready_tasks)}] 处理任务: {query_word} (阈值: {threshold_max})") + + # 更新任务状态为doing + self.task_mgr.update_task_status(task_id, 'doing') + + # 爬取,传入阈值 + result = self.crawl_query(query_word, task_id, category, threshold_max=threshold_max) + + # 更新统计 + if result['success']: + stats['success_count'] += 1 + stats['total_crawled'] += result['crawled_count'] + stats['total_saved'] += result['new_count'] + + # 更新任务状态为finished + self.task_mgr.update_task_status(task_id, 'finished') + else: + stats['failed_count'] += 1 + + # 更新任务状态为failed + self.task_mgr.update_task_status( + task_id, + 'failed', + error_message=result['error'] + ) + + logger.info("") + time.sleep(2) # 延迟,避免请求过快 + + # 输出总结 + logger.info("="*70) + logger.info(" 爬取完成") + logger.info("="*70) + logger.info(f"总任务数: {stats['total_tasks']}") + logger.info(f"成功: {stats['success_count']}") + logger.info(f"失败: {stats['failed_count']}") + logger.info(f"总爬取: {stats['total_crawled']} 个URL") + logger.info(f"新增保存: {stats['total_saved']} 个URL") + logger.info("="*70) + + return stats + + +if __name__ == "__main__": + import sys + + # 配置日志 + logger.remove() + logger.add( + sys.stdout, + format="{time:HH:mm:ss} | {level: <8} | {message}", + level="INFO" + ) + + # 创建爬虫 + crawler = BaiduSearchCrawler(headless=False) # headless=False 可以看到浏览器 + + # 批量爬取任务 + crawler.crawl_tasks(limit=5) # 限制爬取5个任务 diff --git a/config.py b/config.py index 5d48f1e..378bc71 100644 --- a/config.py +++ b/config.py @@ -43,12 +43,19 @@ class BaseConfig: SERVER_PORT = int(os.getenv('SERVER_PORT', 5000)) # 点击策略配置 - MIN_CLICK_COUNT = int(os.getenv('MIN_CLICK_COUNT', 1)) - MAX_CLICK_COUNT = int(os.getenv('MAX_CLICK_COUNT', 10)) - CLICK_INTERVAL_MINUTES = int(os.getenv('CLICK_INTERVAL_MINUTES', 30)) - WORK_START_HOUR = int(os.getenv('WORK_START_HOUR', 9)) - WORK_END_HOUR = int(os.getenv('WORK_END_HOUR', 21)) - REPLY_WAIT_TIMEOUT = int(os.getenv('REPLY_WAIT_TIMEOUT', 30)) + MIN_CLICK_COUNT = int(os.getenv('MIN_CLICK_COUNT', 1)) # 每日最少点击次数 + MAX_CLICK_COUNT = int(os.getenv('MAX_CLICK_COUNT', 3)) # 每日最多点击次数 + CLICK_INTERVAL_MINUTES = int(os.getenv('CLICK_INTERVAL_MINUTES', 30)) # 点击间隔(分钟) + MIN_TASK_INTERVAL_MINUTES = int(os.getenv('MIN_TASK_INTERVAL_MINUTES', 3)) # 任务间最小间隔(分钟) + MAX_TASK_INTERVAL_MINUTES = int(os.getenv('MAX_TASK_INTERVAL_MINUTES', 5)) # 任务间最大间隔(分钟) + WORK_START_HOUR = int(os.getenv('WORK_START_HOUR', 9)) # 工作开始时间 + WORK_END_HOUR = int(os.getenv('WORK_END_HOUR', 21)) # 工作结束时间 + REPLY_WAIT_TIMEOUT = int(os.getenv('REPLY_WAIT_TIMEOUT', 30)) # 回复等待超时(秒) + + # 爬虫调度配置 + CRAWLER_ENABLED = os.getenv('CRAWLER_ENABLED', 'True').lower() == 'true' # 是否启用爬虫 + CRAWLER_SCHEDULE_TIME = os.getenv('CRAWLER_SCHEDULE_TIME', '02:00') # 爬虫执行时间(HH:MM) + CRAWLER_BATCH_SIZE = int(os.getenv('CRAWLER_BATCH_SIZE', 10)) # 每次爬取任务数量 # 数据存储路径 DATA_DIR = os.getenv('DATA_DIR', './data') diff --git a/db/QUERY_TASK_README.md b/db/QUERY_TASK_README.md new file mode 100644 index 0000000..27c0953 --- /dev/null +++ b/db/QUERY_TASK_README.md @@ -0,0 +1,107 @@ +# AI MIP Query Task 表创建说明 + +## 1. 创建表 + +在MySQL数据库中执行以下文件: + +```bash +mysql -u your_user -p your_database < db/ai_mip_query_task.sql +``` + +或者在MySQL客户端中直接执行 `db/ai_mip_query_task.sql` 文件内容。 + +## 2. 表结构说明 + +### 字段列表 + +| 字段名 | 类型 | 说明 | +|--------|------|------| +| id | int | 主键ID | +| query_word | varchar(512) | 查询词/关键词 | +| query_type | enum | 查询类型:keyword/phrase/long_tail | +| task_date | char(8) | 任务日期 YYYYMMDD | +| threshold_max | int | 最大抓取数量阈值 | +| current_count | int | 当前已抓取数量 | +| status | enum | 任务状态:ready/doing/failed/finished/closed | +| priority | tinyint | 优先级 1-10 | +| category | varchar(64) | 分类标签 | +| source_platform | varchar(64) | 来源平台 | +| crawl_url_count | int | 已爬取URL数量 | +| valid_url_count | int | 有效URL数量(带广告) | +| error_message | text | 错误信息 | +| started_at | timestamp | 开始执行时间 | +| finished_at | timestamp | 完成时间 | +| closed_at | timestamp | 达到阈值关闭时间 | +| created_at | timestamp | 创建时间 | +| updated_at | timestamp | 更新时间 | +| created_by | varchar(64) | 创建人 | +| remark | varchar(512) | 备注信息 | + +### 索引 + +- `uniq_query_date`: 同一查询词每天只有一个任务 +- `idx_date_status`: 按日期和状态查询 +- `idx_status_priority`: 按状态和优先级查询 +- `idx_category`: 按分类查询 +- `idx_threshold`: 阈值监控 +- `idx_closed`: 关闭时间索引 + +## 3. 使用示例 + +### Python代码 + +```python +from db_manager import QueryTaskManager + +# 初始化管理器 +task_mgr = QueryTaskManager() + +# 创建任务 +task_id = task_mgr.create_task( + query_word="糖尿病治疗", + query_type="keyword", + threshold_max=50, + priority=3, + category="医疗" +) + +# 获取ready任务 +ready_tasks = task_mgr.get_ready_tasks(limit=10) + +# 更新任务状态 +task_mgr.update_task_status(task_id, 'doing') + +# 增加抓取计数 +task_mgr.increment_crawl_count(task_id, crawl_count=5, valid_count=3) + +# 检查阈值 +task_mgr.check_threshold(task_id) + +# 获取统计信息 +stats = task_mgr.get_task_statistics('20260119') +``` + +## 4. 测试 + +运行测试脚本: + +```bash +python test_query_task.py +``` + +## 5. 任务状态流转 + +``` +ready (准备中) + ↓ +doing (执行中) + ↓ +finished (完成) / failed (失败) / closed (达到阈值关闭) +``` + +## 6. 注意事项 + +1. **唯一约束**:同一查询词在同一天只能有一个任务 +2. **阈值检查**:达到threshold_max时自动关闭任务 +3. **优先级**:数字越小优先级越高(1-10) +4. **时间戳**:状态变更会自动更新对应的时间字段 diff --git a/db/ai_mip_query_task.sql b/db/ai_mip_query_task.sql new file mode 100644 index 0000000..3af693b --- /dev/null +++ b/db/ai_mip_query_task.sql @@ -0,0 +1,60 @@ +/* + MIP Query Task Table + 用于存储查询词任务,抓取需要自动点击的网址 + + Date: 2026-01-19 +*/ + +SET NAMES utf8mb4; +SET FOREIGN_KEY_CHECKS = 0; + +-- ---------------------------- +-- Table structure for ai_mip_query_task +-- ---------------------------- +DROP TABLE IF EXISTS `ai_mip_query_task`; +CREATE TABLE `ai_mip_query_task` ( + `id` int NOT NULL AUTO_INCREMENT COMMENT '主键ID', + `query_word` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '查询词/关键词', + `query_type` enum('keyword','phrase','long_tail') CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT 'keyword' COMMENT '查询类型:关键词/短语/长尾词', + `task_date` char(8) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '任务日期,格式:YYYYMMDD', + `threshold_max` int NOT NULL DEFAULT 100 COMMENT '最大抓取数量阈值', + `current_count` int NOT NULL DEFAULT 0 COMMENT '当前已抓取数量', + `status` enum('ready','doing','failed','finished','closed') CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT 'ready' COMMENT '任务状态:准备中/执行中/失败/完成/已关闭', + `priority` tinyint NOT NULL DEFAULT 5 COMMENT '优先级(1-10,数字越小优先级越高)', + `category` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '分类标签(如:医疗、教育、法律等)', + `source_platform` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT 'baidu' COMMENT '来源平台:baidu/sogou/360等', + `crawl_url_count` int NOT NULL DEFAULT 0 COMMENT '已爬取URL数量', + `valid_url_count` int NOT NULL DEFAULT 0 COMMENT '有效URL数量(带广告)', + `error_message` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL COMMENT '错误信息', + `started_at` timestamp NULL DEFAULT NULL COMMENT '开始执行时间', + `finished_at` timestamp NULL DEFAULT NULL COMMENT '完成时间', + `closed_at` timestamp NULL DEFAULT NULL COMMENT '达到阈值关闭时间', + `created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + `updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + `created_by` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT 'system' COMMENT '创建人', + `remark` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '备注信息', + PRIMARY KEY (`id`) USING BTREE, + UNIQUE INDEX `uniq_query_date`(`query_word`(191) ASC, `task_date` ASC) USING BTREE COMMENT '同一查询词每天只有一个任务', + INDEX `idx_date_status`(`task_date` ASC, `status` ASC) USING BTREE COMMENT '按日期和状态查询', + INDEX `idx_status_priority`(`status` ASC, `priority` ASC) USING BTREE COMMENT '按状态和优先级查询', + INDEX `idx_category`(`category` ASC) USING BTREE COMMENT '按分类查询', + INDEX `idx_threshold`(`threshold_max` ASC, `current_count` ASC) USING BTREE COMMENT '阈值监控', + INDEX `idx_closed`(`closed_at` ASC) USING BTREE COMMENT '关闭时间索引' +) ENGINE = InnoDB + AUTO_INCREMENT = 1 + CHARACTER SET = utf8mb4 + COLLATE = utf8mb4_general_ci + COMMENT = 'MIP查询任务表 - 用于存储查询词抓取网址任务' + ROW_FORMAT = DYNAMIC; + +-- ---------------------------- +-- 示例数据 +-- ---------------------------- +INSERT INTO `ai_mip_query_task` + (`query_word`, `query_type`, `task_date`, `threshold_max`, `priority`, `category`, `source_platform`, `remark`) +VALUES + ('糖尿病治疗', 'keyword', '20260119', 50, 3, '医疗', 'baidu', '医疗类关键词测试'), + ('在线教育平台', 'phrase', '20260119', 30, 5, '教育', 'baidu', '教育类短语测试'), + ('法律咨询免费在线', 'long_tail', '20260119', 20, 7, '法律', 'baidu', '法律类长尾词测试'); + +SET FOREIGN_KEY_CHECKS = 1; diff --git a/db/alter_add_query_word.sql b/db/alter_add_query_word.sql new file mode 100644 index 0000000..59b757c --- /dev/null +++ b/db/alter_add_query_word.sql @@ -0,0 +1,14 @@ +/* + 为ai_mip_site表添加query_word字段 + 用于记录该URL是从哪个查询词抓取的 + + Date: 2026-01-19 +*/ + +ALTER TABLE `ai_mip_site` +ADD COLUMN `query_word` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '来源查询词(从哪个关键词抓取)' +AFTER `site_dimension`; + +-- 添加索引,方便按查询词查询 +ALTER TABLE `ai_mip_site` +ADD INDEX `idx_query_word`(`query_word`(191) ASC) USING BTREE COMMENT '按查询词查询'; diff --git a/db_manager.py b/db_manager.py index 06d2c1c..590ada9 100644 --- a/db_manager.py +++ b/db_manager.py @@ -41,8 +41,7 @@ class DatabaseManager: 'database': Config.MYSQL_DATABASE, 'charset': 'utf8mb4' } - logger.info(f"MySQL数据库初始化: {Config.MYSQL_HOST}:{Config.MYSQL_PORT}/{Config.MYSQL_DATABASE}") - + def get_connection(self) -> 'pymysql.Connection': """获取MySQL数据库连接""" conn = pymysql.connect(**self.db_config) @@ -74,7 +73,8 @@ class SiteManager(DatabaseManager): """站点管理""" def add_site(self, site_url: str, site_name: str = None, - site_dimension: str = None, frequency: int = None, + site_dimension: str = None, query_word: str = None, + frequency: int = None, time_start: str = None, time_end: str = None, interval_minutes: int = None) -> Optional[int]: """ @@ -84,6 +84,7 @@ class SiteManager(DatabaseManager): site_url: 网站URL site_name: 网站名称 site_dimension: 网站维度标签 + query_word: 来源查询词(从哪个关键词抓取) frequency: 频次 time_start: 开始时间 time_end: 结束时间 @@ -108,8 +109,8 @@ class SiteManager(DatabaseManager): INSERT INTO ai_mip_site ( site_url, site_name, status, frequency, time_start, time_end, interval_minutes, - site_dimension, created_by - ) VALUES ({ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}) + site_dimension, query_word, created_by + ) VALUES ({ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}) """ cursor.execute(sql, ( @@ -119,8 +120,9 @@ class SiteManager(DatabaseManager): frequency or 1, time_start or '09:00:00', time_end or '21:00:00', - interval_minutes or 60, + interval_minutes or 30, site_dimension, + query_word, # 新增:来源查询词 'system' )) @@ -128,7 +130,7 @@ class SiteManager(DatabaseManager): conn.commit() conn.close() - logger.info(f"成功添加站点: {site_url} (ID: {site_id})") + logger.info(f"成功添加站点: {site_url} (ID: {site_id}, 查询词: {query_word})") return site_id except pymysql.IntegrityError: @@ -545,3 +547,272 @@ class StatisticsManager(DatabaseManager): except Exception as e: logger.error(f"获取站点统计失败: {str(e)}") return {} + + +class QueryTaskManager(DatabaseManager): + """查询任务管理器""" + + def create_task(self, query_word: str, task_date: str = None, + query_type: str = 'keyword', threshold_max: int = 100, + priority: int = 5, category: str = None, + source_platform: str = 'baidu', + created_by: str = 'system', + remark: str = None) -> Optional[int]: + """ + 创建查询任务 + + Args: + query_word: 查询词 + task_date: 任务日期 YYYYMMDD,默认今天 + query_type: 查询类型 + threshold_max: 最大抓取数量 + priority: 优先级 1-10 + category: 分类标签 + source_platform: 来源平台 + created_by: 创建人 + remark: 备注 + + Returns: + 任务ID,失败返回None + """ + try: + if task_date is None: + task_date = datetime.now().strftime('%Y%m%d') + + conn = self.get_connection() + ph = self._get_placeholder() + + sql = f""" + INSERT INTO ai_mip_query_task ( + query_word, query_type, task_date, threshold_max, + priority, category, source_platform, created_by, remark + ) VALUES ({ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}) + """ + + cursor = conn.cursor() + cursor.execute(sql, ( + query_word, query_type, task_date, threshold_max, + priority, category, source_platform, created_by, remark + )) + + task_id = cursor.lastrowid + conn.commit() + conn.close() + + logger.info(f"创建查询任务成功: {query_word} (ID: {task_id})") + return task_id + + except pymysql.IntegrityError: + logger.warning(f"查询任务已存在: {query_word} @ {task_date}") + return None + except Exception as e: + logger.error(f"创建查询任务失败: {str(e)}") + return None + + def get_task_by_id(self, task_id: int) -> Optional[Dict]: + """根据ID获取任务""" + try: + conn = self.get_connection() + ph = self._get_placeholder() + cursor = self._execute_query( + conn, + f"SELECT * FROM ai_mip_query_task WHERE id = {ph}", + (task_id,) + ) + row = cursor.fetchone() + conn.close() + return self._dict_from_row(row) if row else None + except Exception as e: + logger.error(f"查询任务失败: {str(e)}") + return None + + def get_ready_tasks(self, limit: int = None) -> List[Dict]: + """ + 获取准备执行的任务(按优先级排序) + + Args: + limit: 限制数量 + + Returns: + 任务列表 + """ + try: + conn = self.get_connection() + sql = "SELECT * FROM ai_mip_query_task WHERE status = 'ready' ORDER BY priority ASC, created_at ASC" + + if limit: + sql += f" LIMIT {limit}" + + cursor = self._execute_query(conn, sql) + rows = cursor.fetchall() + conn.close() + return [self._dict_from_row(row) for row in rows] + except Exception as e: + logger.error(f"查询ready任务失败: {str(e)}") + return [] + + def get_tasks_by_date(self, task_date: str) -> List[Dict]: + """根据日期获取任务""" + try: + conn = self.get_connection() + ph = self._get_placeholder() + cursor = self._execute_query( + conn, + f"SELECT * FROM ai_mip_query_task WHERE task_date = {ph} ORDER BY priority ASC", + (task_date,) + ) + rows = cursor.fetchall() + conn.close() + return [self._dict_from_row(row) for row in rows] + except Exception as e: + logger.error(f"查询日期任务失败: {str(e)}") + return [] + + def update_task_status(self, task_id: int, status: str, + error_message: str = None) -> bool: + """ + 更新任务状态 + + Args: + task_id: 任务ID + status: 状态 ready/doing/failed/finished/closed + error_message: 错误信息(失败时) + """ + try: + conn = self.get_connection() + ph = self._get_placeholder() + + # 根据状态更新时间字段 + timestamp_field = None + if status == 'doing': + timestamp_field = 'started_at' + elif status in ['finished', 'failed']: + timestamp_field = 'finished_at' + elif status == 'closed': + timestamp_field = 'closed_at' + + if timestamp_field: + sql = f""" + UPDATE ai_mip_query_task + SET status = {ph}, {timestamp_field} = NOW() + WHERE id = {ph} + """ + params = (status, task_id) + else: + sql = f"UPDATE ai_mip_query_task SET status = {ph} WHERE id = {ph}" + params = (status, task_id) + + # 如果有错误信息,更新error_message + if error_message: + sql = sql.replace('WHERE', f", error_message = '{error_message}' WHERE") + + cursor = conn.cursor() + cursor.execute(sql, params) + conn.commit() + conn.close() + + logger.info(f"更新任务状态: {task_id} -> {status}") + return True + + except Exception as e: + logger.error(f"更新任务状态失败: {str(e)}") + return False + + def increment_crawl_count(self, task_id: int, + crawl_count: int = 1, + valid_count: int = 0) -> bool: + """ + 增加抓取计数 + + Args: + task_id: 任务ID + crawl_count: 抓取URL数量 + valid_count: 有效URL数量(带广告) + """ + try: + conn = self.get_connection() + ph = self._get_placeholder() + + sql = f""" + UPDATE ai_mip_query_task + SET crawl_url_count = crawl_url_count + {ph}, + valid_url_count = valid_url_count + {ph}, + current_count = current_count + {ph} + WHERE id = {ph} + """ + + cursor = conn.cursor() + cursor.execute(sql, (crawl_count, valid_count, valid_count, task_id)) + conn.commit() + conn.close() + + return True + + except Exception as e: + logger.error(f"更新抓取计数失败: {str(e)}") + return False + + def check_threshold(self, task_id: int) -> bool: + """ + 检查是否达到阈值,达到则自动关闭任务 + + Returns: + True=已达到阈值, False=未达到 + """ + try: + task = self.get_task_by_id(task_id) + if not task: + return False + + if task['current_count'] >= task['threshold_max']: + self.update_task_status(task_id, 'closed') + logger.info(f"任务达到阈值并关闭: {task['query_word']} ({task['current_count']}/{task['threshold_max']})") + return True + + return False + + except Exception as e: + logger.error(f"检查阈值失败: {str(e)}") + return False + + def get_task_statistics(self, task_date: str = None) -> Dict: + """ + 获取任务统计信息 + + Args: + task_date: 日期,为None则统计所有 + """ + try: + conn = self.get_connection() + + if task_date: + ph = self._get_placeholder() + where_clause = f"WHERE task_date = {ph}" + params = (task_date,) + else: + where_clause = "" + params = None + + sql = f""" + SELECT + COUNT(*) as total_tasks, + SUM(CASE WHEN status = 'ready' THEN 1 ELSE 0 END) as ready_count, + SUM(CASE WHEN status = 'doing' THEN 1 ELSE 0 END) as doing_count, + SUM(CASE WHEN status = 'finished' THEN 1 ELSE 0 END) as finished_count, + SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_count, + SUM(CASE WHEN status = 'closed' THEN 1 ELSE 0 END) as closed_count, + SUM(crawl_url_count) as total_crawled, + SUM(valid_url_count) as total_valid + FROM ai_mip_query_task + {where_clause} + """ + + cursor = self._execute_query(conn, sql, params) + row = cursor.fetchone() + conn.close() + + return self._dict_from_row(row) if row else {} + + except Exception as e: + logger.error(f"获取任务统计失败: {str(e)}") + return {} diff --git a/deploy.sh b/deploy.sh new file mode 100644 index 0000000..521c48d --- /dev/null +++ b/deploy.sh @@ -0,0 +1,85 @@ +#!/bin/bash +# AI MIP 服务部署脚本 +# 用法: sudo bash deploy.sh + +set -e + +echo "==========================================" +echo " AI MIP 服务部署脚本" +echo "==========================================" + +# 配置变量 +PROJECT_DIR="/opt/ai_mip" +SERVICE_NAME="ai_mip" +SERVICE_FILE="${SERVICE_NAME}.service" +LOG_DIR="/var/log/ai_mip" +VENV_DIR="${PROJECT_DIR}/venv" +USER="www-data" +GROUP="www-data" + +# 检查是否root权限 +if [[ $EUID -ne 0 ]]; then + echo "❌ 错误: 请使用 sudo 运行此脚本" + exit 1 +fi + +echo "" +echo "📦 步骤1: 创建项目目录" +mkdir -p ${PROJECT_DIR} +mkdir -p ${LOG_DIR} +echo "✅ 目录创建完成" + +echo "" +echo "📂 步骤2: 复制项目文件" +echo "请确保当前目录是项目根目录" +cp -r ./* ${PROJECT_DIR}/ +echo "✅ 文件复制完成" + +echo "" +echo "🐍 步骤3: 创建Python虚拟环境" +if [ ! -d "${VENV_DIR}" ]; then + python3 -m venv ${VENV_DIR} + echo "✅ 虚拟环境创建完成" +else + echo "⚠️ 虚拟环境已存在,跳过创建" +fi + +echo "" +echo "📦 步骤4: 安装依赖" +${VENV_DIR}/bin/pip install --upgrade pip +${VENV_DIR}/bin/pip install -r ${PROJECT_DIR}/requirements.txt +echo "✅ 依赖安装完成" + +echo "" +echo "🔐 步骤5: 设置权限" +chown -R ${USER}:${GROUP} ${PROJECT_DIR} +chown -R ${USER}:${GROUP} ${LOG_DIR} +chmod +x ${PROJECT_DIR}/main.py +echo "✅ 权限设置完成" + +echo "" +echo "⚙️ 步骤6: 安装systemd服务" +cp ${PROJECT_DIR}/${SERVICE_FILE} /etc/systemd/system/ +systemctl daemon-reload +echo "✅ 服务文件已安装" + +echo "" +echo "🚀 步骤7: 启动服务" +systemctl enable ${SERVICE_NAME} +systemctl restart ${SERVICE_NAME} +echo "✅ 服务已启动" + +echo "" +echo "==========================================" +echo " 部署完成!" +echo "==========================================" +echo "" +echo "📋 常用命令:" +echo " 查看状态: sudo systemctl status ${SERVICE_NAME}" +echo " 查看日志: sudo journalctl -u ${SERVICE_NAME} -f" +echo " 查看服务日志: tail -f ${LOG_DIR}/service.log" +echo " 查看错误日志: tail -f ${LOG_DIR}/error.log" +echo " 重启服务: sudo systemctl restart ${SERVICE_NAME}" +echo " 停止服务: sudo systemctl stop ${SERVICE_NAME}" +echo " 健康检查: curl http://localhost:8899/health" +echo "" diff --git a/main.py b/main.py new file mode 100644 index 0000000..b9703fa --- /dev/null +++ b/main.py @@ -0,0 +1,580 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +MIP广告点击自动化服务 + +这是一个完整的后台服务,提供以下功能: +1. 自动轮询点击数据库中的广告链接 +2. 智能调度:每个链接每天随机点击1-10次 +3. 间隔控制:同一链接点击间隔≥30分钟 +4. 时间窗口:仅在09:00-21:00执行 +5. 进程管理:防重复启动、优雅停止 +6. 健康检查:提供HTTP API监控服务状态 +7. 日志管理:自动分割、持久化存储 + +使用方法: + python main.py # 前台运行 + python main.py --daemon # 后台运行(Linux) + +健康检查: + curl http://localhost:8888/health +""" + +import argparse +import atexit +import os +import random +import signal +import sys +import threading +import time +from datetime import datetime +from pathlib import Path +from typing import Dict, List + +import schedule +from flask import Flask, jsonify +from loguru import logger + +from config import Config +from data_manager import DataManager +from task_executor import TaskExecutor +from baidu_crawler import BaiduSearchCrawler + +# 配置日志 +logger.remove() + +# 控制台输出 +logger.add( + sys.stdout, + format="{time:HH:mm:ss} | {level: <8} | {message}", + level="INFO" +) + +# 文件输出 +log_dir = Path("./logs") +log_dir.mkdir(exist_ok=True) + +logger.add( + log_dir / "scheduler_{time:YYYY-MM-DD}.log", + format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {message}", + level="INFO", + rotation="00:00", + retention="30 days", + encoding="utf-8" +) + + +class ClickScheduler: + """ +MIP广告点击服务 + + 负责管理广告点击任务的调度和执行。 + 提供完整的服务生命周期管理,包括启动、运行、监控和停止。 + """ + + # 进程锁文件 + LOCK_FILE = Path("./scheduler.lock") + + def __init__(self, max_workers: int = 1, use_proxy: bool = True, health_port: int = 8888): + """ + 初始化调度器 + + Args: + max_workers: 最大并发数 + use_proxy: 是否使用代理 + health_port: 健康检查API端口 + """ + self.max_workers = max_workers + self.use_proxy = use_proxy + self.health_port = health_port + self.dm = DataManager() + + # 爬虫实例(如果启用) + self.crawler = BaiduSearchCrawler() if Config.CRAWLER_ENABLED else None + + # 点击记录:{site_id: {'last_click': datetime, 'today_count': int, 'target_count': int}} + self.click_records = {} + + # 工作时间配置 + self.work_start_hour = 9 # 09:00 + self.work_end_hour = 21 # 21:00 + self.click_interval_minutes = 30 # 点击间陠30分钟 + + # 服务状态 + self.running = False + self.start_time = None + self.last_cycle_time = None + self.total_clicks_today = 0 + self.error_count = 0 + + # 健康检查API + self.health_app = Flask(__name__) + self.health_app.logger.disabled = True # 禁用Flask日志 + self._setup_health_api() + + logger.info(f"调度器初始化完成") + logger.info(f"工作时间: {self.work_start_hour:02d}:00 - {self.work_end_hour:02d}:00") + logger.info(f"点击间隔: {self.click_interval_minutes} 分钟") + logger.info(f"并发数: {max_workers}") + + def _setup_health_api(self): + """配置健康检查API""" + @self.health_app.route('/health', methods=['GET']) + def health_check(): + """健康检查端点""" + uptime = None + if self.start_time: + uptime = str(datetime.now() - self.start_time) + + return jsonify({ + 'status': 'running' if self.running else 'stopped', + 'uptime': uptime, + 'start_time': self.start_time.isoformat() if self.start_time else None, + 'last_cycle': self.last_cycle_time.isoformat() if self.last_cycle_time else None, + 'total_sites': len(self.click_records), + 'completed_sites': sum(1 for r in self.click_records.values() if r['today_count'] >= r['target_count']), + 'total_clicks_today': sum(r['today_count'] for r in self.click_records.values()), + 'target_clicks_today': sum(r['target_count'] for r in self.click_records.values()), + 'error_count': self.error_count, + 'work_hours': f"{self.work_start_hour:02d}:00-{self.work_end_hour:02d}:00", + 'is_working_time': self.is_working_time() + }) + + def _acquire_lock(self) -> bool: + """ + 获取进程锁,防止重复启动 + + Returns: + 是否成功获取锁 + """ + if self.LOCK_FILE.exists(): + try: + # 检查锁文件中pid是否还在运行 + with open(self.LOCK_FILE, 'r') as f: + old_pid = int(f.read().strip()) + + # 检查进程是否存在 + try: + os.kill(old_pid, 0) # 不发送信号,只检查进程是否存在 + logger.error(f"调度器已经在运行 (PID: {old_pid})") + return False + except OSError: + # 进程不存在,删除旧锁 + logger.warning(f"检测到失效的锁文件,清理中...") + self.LOCK_FILE.unlink() + except Exception as e: + logger.warning(f"检查锁文件异常: {str(e)},删除旧锁") + self.LOCK_FILE.unlink() + + # 创建新锁 + try: + with open(self.LOCK_FILE, 'w') as f: + f.write(str(os.getpid())) + logger.info(f"获取进程锁成功 (PID: {os.getpid()})") + return True + except Exception as e: + logger.error(f"创建锁文件失败: {str(e)}") + return False + + def _release_lock(self): + """释放进程锁""" + try: + if self.LOCK_FILE.exists(): + self.LOCK_FILE.unlink() + logger.info("已释放进程锁") + except Exception as e: + logger.error(f"释放锁文件失败: {str(e)}") + + def _cleanup(self): + """清理资源""" + logger.info("正在清理资源...") + self.running = False + self._release_lock() + logger.info("资源清理完成") + + def is_working_time(self) -> bool: + """ + 检查当前是否在工作时间 + + Returns: + 是否在工作时间 + """ + now = datetime.now() + current_hour = now.hour + return self.work_start_hour <= current_hour < self.work_end_hour + + def reset_daily_records(self): + """重置每日点击记录""" + logger.info("=" * 60) + logger.info("重置每日点击记录") + logger.info("=" * 60) + + # 获取所有活跃站点 + sites = self.dm.get_active_urls() + + # 为每个站点随机生成今日目标点击次数(使用配置文件中的范围) + self.click_records = {} + for site in sites: + site_id = site.get('id') + target_count = random.randint(Config.MIN_CLICK_COUNT, Config.MAX_CLICK_COUNT) + self.click_records[site_id] = { + 'last_click': None, + 'today_count': 0, + 'target_count': target_count, + 'site_url': site.get('site_url') + } + logger.info(f"站点 {site_id}: {site.get('site_url')} - 今日目标 {target_count} 次") + + logger.info(f"共 {len(sites)} 个站点,总目标点击次数: {sum(r['target_count'] for r in self.click_records.values())}") + + def get_pending_sites(self) -> List[Dict]: + """ + 获取待点击的站点列表 + + Returns: + 待点击的站点列表 + """ + if not self.click_records: + logger.warning("点击记录为空,执行重置") + self.reset_daily_records() + + now = datetime.now() + pending_sites = [] + + for site_id, record in self.click_records.items(): + # 检查是否已完成今日目标 + if record['today_count'] >= record['target_count']: + continue + + # 检查点击间隔(≥30分钟) + if record['last_click']: + elapsed = (now - record['last_click']).total_seconds() / 60 + if elapsed < self.click_interval_minutes: + continue + + pending_sites.append({ + 'id': site_id, + 'site_url': record['site_url'], + 'today_count': record['today_count'], + 'target_count': record['target_count'] + }) + + return pending_sites + + def execute_click_task(self, site: Dict): + """ + 执行单个站点的点击任务 + + Args: + site: 站点信息 + """ + site_id = site['id'] + site_url = site['site_url'] + + logger.info(f"[站点 {site_id}] 开始点击: {site_url} ({site['today_count'] + 1}/{site['target_count']})") + + try: + # 创建任务执行器(每次创建新实例) + executor = TaskExecutor( + max_workers=1, # 单个任务使用单线程 + use_proxy=self.use_proxy + ) + + # 直接执行单个站点任务 + # 获取完整站点信息 + all_sites = self.dm.get_active_urls() + target_site = next((s for s in all_sites if s.get('id') == site_id), None) + + if not target_site: + logger.error(f"[站点 {site_id}] 未找到站点信息") + return + + # 创建浏览器环境 + profile_info = executor.create_browser_profile(1) + if not profile_info: + logger.error(f"[站点 {site_id}] 创建浏览器环境失败") + return + + time.sleep(2) + + # 执行点击任务 + result = executor.execute_single_task(target_site, 1, profile_info['profile_id']) + + if result['success']: + # 更新点击记录 + self.click_records[site_id]['last_click'] = datetime.now() + self.click_records[site_id]['today_count'] += 1 + self.total_clicks_today += 1 + + logger.info(f"[站点 {site_id}] ✅ 点击完成: {self.click_records[site_id]['today_count']}/{self.click_records[site_id]['target_count']}") + else: + self.error_count += 1 + logger.warning(f"[站点 {site_id}] ⚠️ 点击失败: {result.get('error', '未知错误')}") + + except Exception as e: + self.error_count += 1 + logger.error(f"[站点 {site_id}] ❌ 点击异常: {str(e)}") + import traceback + traceback.print_exc() + + def run_click_cycle(self): + """执行一次点击循环""" + # 检查工作时间 + if not self.is_working_time(): + current_time = datetime.now().strftime('%H:%M') + logger.info(f"当前时间 {current_time} 不在工作时间内,跳过") + return + + self.last_cycle_time = datetime.now() + + logger.info("-" * 60) + logger.info(f"开始点击循环 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + logger.info("-" * 60) + + # 获取待点击站点 + pending_sites = self.get_pending_sites() + + if not pending_sites: + logger.info("没有待点击的站点") + return + + logger.info(f"找到 {len(pending_sites)} 个待点击站点") + + # 随机打乱顺序(模拟真实行为) + random.shuffle(pending_sites) + + # 根据并发数执行 + if self.max_workers == 1: + # 串行执行 + for site in pending_sites: + self.execute_click_task(site) + + # 任务间随机间隔(使用配置文件中的范围) + if site != pending_sites[-1]: + wait_minutes = random.randint(Config.MIN_TASK_INTERVAL_MINUTES, Config.MAX_TASK_INTERVAL_MINUTES) + logger.info(f"等待 {wait_minutes} 分钟后执行下一个任务...") + time.sleep(wait_minutes * 60) + else: + # 并发执行(暂不支持,避免资源冲突) + logger.warning("当前版本仅支持串行执行") + for site in pending_sites: + self.execute_click_task(site) + time.sleep(random.randint(Config.MIN_TASK_INTERVAL_MINUTES, Config.MAX_TASK_INTERVAL_MINUTES) * 60) + + # 显示今日进度 + completed = sum(1 for r in self.click_records.values() if r['today_count'] >= r['target_count']) + total = len(self.click_records) + total_clicks = sum(r['today_count'] for r in self.click_records.values()) + target_clicks = sum(r['target_count'] for r in self.click_records.values()) + + logger.info("-" * 60) + logger.info(f"今日进度: {completed}/{total} 个站点完成") + logger.info(f"点击次数: {total_clicks}/{target_clicks} 次") + logger.info("-" * 60) + + def run_crawler_cycle(self): + """执行一次爬虫循环""" + if not self.crawler: + logger.warning("爬虫未启用,跳过") + return + + logger.info("=" * 60) + logger.info(f"开始网址爬取 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + logger.info("=" * 60) + + try: + # 执行爬虫任务 + result = self.crawler.crawl_tasks(limit=Config.CRAWLER_BATCH_SIZE) + + logger.info("-" * 60) + logger.info(f"爬取完成: 总任务={result['total_tasks']}, 成功={result['success_count']}, 失败={result['failed_count']}") + logger.info(f"新增网址: {result['total_new_urls']} 个") + logger.info("-" * 60) + + except Exception as e: + logger.error(f"爬虫执行失败: {str(e)}") + import traceback + traceback.print_exc() + + def start(self): + """启动调度器""" + # 获取进程锁 + if not self._acquire_lock(): + logger.error("无法启动,请检查是否已有实例在运行") + sys.exit(1) + + # 注册清理函数 + atexit.register(self._cleanup) + + # 注册信号处理(优雅停止) + def signal_handler(signum, frame): + logger.info(f"\n收到信号 {signum},正在优雅停止...") + self._cleanup() + sys.exit(0) + + signal.signal(signal.SIGINT, signal_handler) # Ctrl+C + signal.signal(signal.SIGTERM, signal_handler) # kill + + logger.info("=" * 60) + logger.info("MIP广告点击调度器启动") + logger.info("=" * 60) + logger.info(f"当前环境: {Config.ENV}") + logger.info(f"当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + logger.info(f"PID: {os.getpid()}") + logger.info("") + + self.running = True + self.start_time = datetime.now() + + # 启动健康检查API(后台线程) + health_thread = threading.Thread( + target=lambda: self.health_app.run(host='0.0.0.0', port=self.health_port, debug=False, use_reloader=False), + daemon=True + ) + health_thread.start() + logger.info(f"健康检查API已启动: http://0.0.0.0:{self.health_port}/health") + logger.info("") + + # 初始化每日记录 + self.reset_daily_records() + + # 配置定时任务 + # 1. 每天00:01重置点击记录 + schedule.every().day.at("00:01").do(self.reset_daily_records) + + # 2. 每10分钟执行一次点击循环(仅在工作时间内实际执行) + schedule.every(10).minutes.do(self.run_click_cycle) + + logger.info("定时任务已配置:") + logger.info(" - 每天 00:01 重置点击记录") + logger.info(" - 每 10 分钟执行点击循环(09:00-21:00)") + + # 3. 爬虫定时任务(如果启用) + if Config.CRAWLER_ENABLED and self.crawler: + schedule.every().day.at(Config.CRAWLER_SCHEDULE_TIME).do(self.run_crawler_cycle) + logger.info(f" - 每天 {Config.CRAWLER_SCHEDULE_TIME} 执行网址爬取(批量: {Config.CRAWLER_BATCH_SIZE})") + else: + logger.info(" - 网址爬取未启用") + + logger.info("") + + # 立即执行一次(如果在工作时间内) + if self.is_working_time(): + logger.info("立即执行首次点击循环...") + self.run_click_cycle() + else: + logger.info(f"当前不在工作时间,等待下次调度...") + + # 进入调度循环 + logger.info("\n调度器运行中,按 Ctrl+C 优雅停止...\n") + try: + while self.running: + schedule.run_pending() + time.sleep(30) # 每30秒检查一次 + except KeyboardInterrupt: + logger.info("\n收到中断信号") + finally: + self._cleanup() + + +def parse_args(): + """解析命令行参数""" + parser = argparse.ArgumentParser( + description='MIP广告点击自动化服务', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=''' +使用示例: + python main.py # 前台运行 + python main.py --workers 3 # 3个并发 + python main.py --no-proxy # 不使用代理 + python main.py --health-port 9999 # 自定义健康检查端口 + +健康检查: + curl http://localhost:8888/health + ''' + ) + + parser.add_argument( + '--workers', + type=int, + default=1, + help='最大并发数(默认: 1,建议使用以避免资源冲突)' + ) + + parser.add_argument( + '--no-proxy', + action='store_true', + help='禁用代理(默认启用)' + ) + + parser.add_argument( + '--health-port', + type=int, + default=8888, + help='健康检查API端口(默认: 8888)' + ) + + parser.add_argument( + '--work-start', + type=int, + default=9, + help='工作开始时间(小时,默认: 9)' + ) + + parser.add_argument( + '--work-end', + type=int, + default=21, + help='工作结束时间(小时,默认: 21)' + ) + + parser.add_argument( + '--version', + action='version', + version='MIP Ad Click Service v1.0.0' + ) + + return parser.parse_args() + + +def main(): + """主入口函数""" + # 解析命令行参数 + args = parse_args() + + # 显示启动信息 + logger.info("=" * 70) + logger.info(" __ __ ___ ____ _ _ ____ _ _ _ ") + logger.info(" | \\/ |_ _| _ \\ / \\ __| | / ___| (_) ___| | __") + logger.info(" | |\\/| || || |_) | / _ \\ / _` | | | | | |/ __| |/ /") + logger.info(" | | | || || __/ / ___ \\ (_| | | |___| | | (__| < ") + logger.info(" |_| |_|___|_| /_/ \\_\\__,_| \\____|_|_|\\___|_|\\_\\") + logger.info("") + logger.info(" 广告点击自动化服务 v1.0.0") + logger.info("=" * 70) + logger.info("") + + # 创建服务实例 + service = ClickScheduler( + max_workers=args.workers, + use_proxy=not args.no_proxy, + health_port=args.health_port + ) + + # 设置工作时间 + service.work_start_hour = args.work_start + service.work_end_hour = args.work_end + + # 启动服务 + try: + service.start() + except Exception as e: + logger.error(f"服务启动失败: {str(e)}") + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt index a7f1580..8f4c044 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,31 +1,32 @@ -# MIP广告自动点击系统 - Python依赖包 +# MIP Ad Click System - Python Dependencies -# Web框架 +# Web Framework Flask==3.0.0 Werkzeug==3.0.1 -# 浏览器自动化 +# Browser Automation playwright==1.40.0 -# HTTP请求 +# HTTP Requests requests==2.31.0 -# 日志处理 +# Logging loguru==0.7.2 -# 任务调度 +# Task Scheduling APScheduler==3.10.4 +schedule==1.2.0 -# 环境变量管理 +# Environment Variables python-dotenv==1.0.0 -# 时区处理 +# Timezone Handling pytz==2023.3 tzlocal==5.2 -# 数据处理 +# Date Processing python-dateutil==2.8.2 -# 数据库 +# Database pymysql==1.1.0 cryptography>=41.0.0 diff --git a/restart.sh b/restart.sh new file mode 100644 index 0000000..1b809cf --- /dev/null +++ b/restart.sh @@ -0,0 +1,12 @@ +#!/bin/bash +# AI MIP 重启脚本 + +PROJECT_DIR="/opt/ai_mip" + +echo "[INFO] 正在停止服务..." +bash ${PROJECT_DIR}/stop.sh + +sleep 2 + +echo "[INFO] 正在启动服务..." +bash ${PROJECT_DIR}/start.sh diff --git a/scheduler.py b/scheduler.py deleted file mode 100644 index 7e1a345..0000000 --- a/scheduler.py +++ /dev/null @@ -1,215 +0,0 @@ -import random -import time -from datetime import datetime, timedelta -from typing import List, Dict -from threading import Thread, Lock -from loguru import logger - -from adspower_client import AdsPowerClient -from ad_automation import MIPAdAutomation -from data_manager import DataManager -from config import Config - - -class ClickScheduler: - """点击任务调度器""" - - def __init__(self): - self.adspower_client = AdsPowerClient() - self.data_manager = DataManager() - self.running = False - self.lock = Lock() - - def add_url(self, url: str) -> bool: - """ - 添加待点击的URL - - Args: - url: MIP页面链接 - - Returns: - 是否添加成功 - """ - return self.data_manager.add_url(url) - - def add_urls(self, urls: List[str]) -> int: - """ - 批量添加URL - - Args: - urls: URL列表 - - Returns: - 成功添加的数量 - """ - count = 0 - for url in urls: - if self.add_url(url): - count += 1 - return count - - def start_scheduler(self): - """启动调度器""" - if self.running: - logger.warning("调度器已在运行中") - return - - self.running = True - logger.info("启动点击调度器") - - # 启动调度线程 - thread = Thread(target=self._schedule_loop, daemon=True) - thread.start() - - def stop_scheduler(self): - """停止调度器""" - self.running = False - logger.info("停止点击调度器") - - def _schedule_loop(self): - """调度循环""" - while self.running: - try: - # 检查当前时间是否在工作时间内 - if not self._is_work_time(): - logger.debug("当前不在工作时间内,等待...") - time.sleep(60) - continue - - # 获取待处理的URL - url = self._get_next_url() - - if url: - logger.info(f"开始处理URL: {url}") - self._process_url(url) - else: - logger.debug("暂无待处理的URL,等待...") - time.sleep(30) - - except Exception as e: - logger.error(f"调度循环异常: {str(e)}") - time.sleep(10) - - def _is_work_time(self) -> bool: - """ - 检查当前是否在工作时间内 - - Returns: - 是否在工作时间 - """ - now = datetime.now() - current_hour = now.hour - - return Config.WORK_START_HOUR <= current_hour < Config.WORK_END_HOUR - - def _get_next_url(self) -> str: - """ - 获取下一个需要处理的URL - - Returns: - URL或None - """ - with self.lock: - # 获取所有活跃的URL - urls = self.data_manager.get_active_urls() - - for url_data in urls: - url = url_data['url'] - - # 检查是否已达到随机点击次数上限 - click_count = url_data.get('click_count', 0) - target_clicks = url_data.get('target_clicks', 0) - - if click_count >= target_clicks: - # 标记为已完成 - self.data_manager.mark_url_completed(url) - continue - - # 检查距离上次点击是否超过间隔时间 - last_click_time = url_data.get('last_click_time') - if last_click_time: - last_click = datetime.fromisoformat(last_click_time) - time_diff = datetime.now() - last_click - - if time_diff.total_seconds() < Config.CLICK_INTERVAL_MINUTES * 60: - continue - - return url - - return None - - def _process_url(self, url: str): - """ - 处理单个URL的点击任务 - - Args: - url: 待处理的URL - """ - page = None - - try: - # 启动 AdsPower 浏览器 - browser_info = self.adspower_client.start_browser() - if not browser_info: - logger.error("启动 AdsPower 浏览器失败") - return - - # 通过 CDP 连接到浏览器 - browser = self.adspower_client.connect_browser(browser_info) - if not browser: - logger.error("连接浏览器失败") - return - - # 获取页面 - page = self.adspower_client.get_page(browser) - if not page: - logger.error("获取页面失败") - return - - # 执行广告点击操作 - automation = MIPAdAutomation(page) - click_success, has_reply = automation.check_and_click_ad(url) - - # 更新数据统计 - with self.lock: - if click_success: - self.data_manager.record_click(url, has_reply) - logger.info(f"URL点击成功,获得回复: {has_reply}") - else: - logger.warning(f"URL点击失败: {url}") - - # 随机延迟 - delay = random.randint(10, 30) - time.sleep(delay) - - except Exception as e: - logger.error(f"处理URL异常: {str(e)}") - - finally: - # 停止浏览器(会自动清理 Playwright 资源) - try: - self.adspower_client.stop_browser() - except Exception as e: - logger.error(f"停止浏览器异常: {str(e)}") - - - def get_statistics(self) -> Dict: - """ - 获取统计数据 - - Returns: - 统计数据 - """ - return self.data_manager.get_statistics() - - def get_url_detail(self, url: str) -> Dict: - """ - 获取URL详细信息 - - Args: - url: URL - - Returns: - URL详细信息 - """ - return self.data_manager.get_url_info(url) diff --git a/start.sh b/start.sh new file mode 100644 index 0000000..5d0bf08 --- /dev/null +++ b/start.sh @@ -0,0 +1,45 @@ +#!/bin/bash +# AI MIP 后台启动脚本 + +PROJECT_DIR="/home/work/ai_mip" +cd ${PROJECT_DIR} + +echo "[INFO] 检查是否有运行中的服务..." +# 查找并停止旧进程 +OLD_PID=$(pgrep -f "python main.py") + +if [ ! -z "$OLD_PID" ]; then + echo "[WARN] 发现运行中的服务 (PID: $OLD_PID),正在停止..." + pkill -f "python main.py" + sleep 2 + echo "[INFO] 旧服务已停止" +else + echo "[INFO] 没有运行中的服务" +fi + +echo "[INFO] 正在启动服务..." +# 激活虚拟环境并后台运行 +if [ ! -d "venv" ]; then + echo "[ERROR] 虚拟环境不存在,请先执行: python3 -m venv venv" + exit 1 +fi + +if [ ! -f "venv/bin/activate" ]; then + echo "[ERROR] 虚拟环境激活脚本不存在" + exit 1 +fi + +source venv/bin/activate + +# 检查依赖是否安装 +if ! python -c "import schedule" 2>/dev/null; then + echo "[WARN] 依赖未安装,正在安装..." + pip install -r requirements.txt +fi + +nohup python main.py --workers 3 --health-port 8899 > logs/service.log 2>&1 & + +NEW_PID=$! +echo "[INFO] 服务已启动" +echo "[INFO] 进程ID: $NEW_PID" +echo "[INFO] 查看日志: tail -f ${PROJECT_DIR}/logs/service.log" diff --git a/status.sh b/status.sh new file mode 100644 index 0000000..33abfda --- /dev/null +++ b/status.sh @@ -0,0 +1,22 @@ +#!/bin/bash +# AI MIP 服务状态查看 + +echo "==========================================" +echo " AI MIP 服务状态" +echo "==========================================" + +# 查找进程 +PID=$(pgrep -f "python main.py") + +if [ -z "$PID" ]; then + echo "[INFO] 服务未运行" +else + echo "[INFO] 服务运行中" + echo "[INFO] 进程ID: $PID" + echo "" + echo "进程详情:" + ps aux | grep "python main.py" | grep -v grep +fi + +echo "" +echo "==========================================" diff --git a/stop.sh b/stop.sh new file mode 100644 index 0000000..3735c33 --- /dev/null +++ b/stop.sh @@ -0,0 +1,11 @@ +#!/bin/bash +# AI MIP 停止脚本 + +# 查找并杀死进程 +pkill -f "python main.py" + +if [ $? -eq 0 ]; then + echo "[INFO] 服务已停止" +else + echo "[WARN] 未找到运行中的服务" +fi diff --git a/task_executor.py b/task_executor.py new file mode 100644 index 0000000..7930fec --- /dev/null +++ b/task_executor.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +任务执行器模块 + +提供广告点击任务的执行能力,包括: +- 浏览器环境创建 +- 单个任务执行 +- 批量任务调度 +""" + +from loguru import logger +from adspower_client import AdsPowerClient +from ad_automation import MIPAdAutomation +from config import Config +from data_manager import DataManager +import time +import threading +from datetime import datetime +from pathlib import Path +from typing import List, Dict, Optional + + +class TaskExecutor: + """ + 任务执行器 + + 负责执行单个或批量广告点击任务。 + 支持代理配置、浏览器环境管理、任务结果追踪。 + """ + + _browser_start_lock = threading.Lock() + + def __init__(self, max_workers: int = 1, use_proxy: bool = True): + """ + 初始化任务执行器 + + Args: + max_workers: 最大并发数(1=串行,>1=并发) + use_proxy: 是否使用代理 + """ + self.max_workers = max_workers + self.use_proxy = use_proxy + self.client = AdsPowerClient() + self.dm = DataManager() + + # 创建截图目录(按日期组织) + timestamp = datetime.now().strftime('%Y%m%d') + self.screenshot_dir = Path("./test") / f"batch_{timestamp}" + self.screenshot_dir.mkdir(parents=True, exist_ok=True) + + logger.debug(f"TaskExecutor initialized: workers={max_workers}, proxy={use_proxy}") + + def create_browser_profile(self, index: int) -> Optional[Dict]: + """ + 创建浏览器环境 + + Args: + index: 环境编号 + + Returns: + 环境信息字典,失败返回None + """ + try: + # 获取分组ID + group_id = self.client.get_group_by_env() + time.sleep(0.5) + + # 如果使用代理,获取代理配置 + proxy_config = {} + proxy_id = None + proxy_info = None + + if self.use_proxy: + logger.info(f"[环境 {index}] 获取代理IP...") + proxy_info = self.client.get_damai_proxy() + time.sleep(0.5) + + if proxy_info: + logger.info(f"[环境 {index}] 代理IP: {proxy_info['host']}:{proxy_info['port']}") + + proxy_data = { + "type": "http", + "host": proxy_info["host"], + "port": proxy_info["port"], + "user": self.client.DAMAI_USER, + "password": self.client.DAMAI_PASSWORD, + "remark": f"任务代理_{index}" + } + + proxy_id = self.client.create_proxy(proxy_data) + time.sleep(0.5) + + if proxy_id: + logger.info(f"[环境 {index}] 创建代理: {proxy_id}") + proxy_config = {"proxyid": proxy_id} + + # 根据环境变量决定操作系统 + os_type = "Linux" if Config.ENV == "production" else "Windows" + + profile_data = { + "name": f"任务_{index}_{datetime.now().strftime('%H%M%S')}", + "group_id": str(group_id) if group_id else "0", + "platform": "health.baidu.com", + "repeat_config": [], + "ignore_cookie_error": "1", + "country": "cn", + "city": "beijing", + "remark": f"任务环境 #{index}", + "fingerprint_config": { + "automatic_timezone": "1", + "flash": "block", + "scan_port_type": "1", + "location": "ask", + "location_switch": "1", + "canvas": "0", + "webgl": "0", + "audio": "0", + "webrtc": "local", + "do_not_track": "true", + "hardware_concurrency": "default", + "device_memory": "default", + "gpu": "2", + "mac_address_config": { + "model": "1", + "address": "" + }, + "browser_kernel_config": { + "version": "latest", + "type": "chrome" + }, + "random_ua": { + "ua_system_version": [os_type] + } + } + } + + logger.debug(f"[环境 {index}] 操作系统: {os_type} (ENV={Config.ENV})") + + if proxy_config: + profile_data.update(proxy_config) + + response = self.client._make_request( + 'POST', + '/api/v2/browser-profile/create', + json=profile_data + ) + + if response and response.get('code') == 0: + profile_id = response.get('data', {}).get('profile_id') + logger.info(f"✅ 创建环境 #{index}: {profile_id}") + return { + 'index': index, + 'profile_id': profile_id, + 'name': profile_data['name'], + 'proxy': proxy_info, + 'proxy_id': proxy_id + } + else: + logger.error(f"❌ 创建环境 #{index} 失败: {response}") + return None + + except Exception as e: + logger.error(f"❌ 创建环境 #{index} 异常: {str(e)}") + return None + + def execute_single_task(self, site_info: Dict, task_index: int, profile_id: str = None) -> Dict: + """ + 执行单个点击任务 + + Args: + site_info: 站点信息 + task_index: 任务编号 + profile_id: 已创建的Profile ID(可选) + + Returns: + 执行结果字典 + """ + # 设置线程名称 + threading.current_thread().name = f"Task-{task_index}" + + site_id = site_info.get('id') + site_url = site_info.get('site_url', site_info.get('url')) + + result = { + 'task_index': task_index, + 'site_id': site_id, + 'site_url': site_url, + 'success': False, + 'click_count': 0, + 'has_ad': False, + 'has_reply': False, + 'error': None + } + + # 创建任务目录 + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + task_folder = self.screenshot_dir / f"task_{task_index}_{timestamp}" + task_folder.mkdir(exist_ok=True) + + # 每个线程创建自己的客户端实例 + client = AdsPowerClient() + + try: + logger.info(f"[任务 {task_index}] 开始执行: {site_url}") + + # 如果没有传入profile_id,则创建新的 + if not profile_id: + profiles_data = client.list_profiles() + if not profiles_data: + result['error'] = "获取Profile列表失败" + return result + + profiles = profiles_data.get('data', {}).get('list', []) + if not profiles: + result['error'] = "没有可用的Profile" + return result + + profile_id = profiles[0].get('profile_id') + logger.info(f"[任务 {task_index}] 使用Profile: {profile_id}") + + # 使用锁控制浏览器启动 + with self._browser_start_lock: + logger.debug(f"[任务 {task_index}] 启动浏览器...") + browser_info = client.start_browser(user_id=profile_id) + if not browser_info: + result['error'] = "启动浏览器失败" + return result + time.sleep(1.5) + + time.sleep(1) + + # 连接浏览器 + browser = client.connect_browser(browser_info) + if not browser: + result['error'] = "CDP连接失败" + return result + + # 获取页面 + context = browser.contexts[0] + all_pages = context.pages + logger.debug(f"[任务 {task_index}] 当前标签页数: {len(all_pages)}") + + # 关闭AdsPower启动页 + for p in all_pages: + try: + if 'start.adspower.net' in p.url: + p.close() + except: + pass + + # 获取或创建页面 + remaining_pages = context.pages + page = remaining_pages[0] if remaining_pages else context.new_page() + + # 执行广告点击和消息发送流程 + logger.info(f"[任务 {task_index}] 开始执行广告点击和咨询流程...") + automation = MIPAdAutomation(page, task_index=task_index) + click_success, has_reply = automation.check_and_click_ad( + url=site_url, + site_id=site_id + ) + + if click_success: + result['success'] = True + result['click_count'] = 1 + result['has_ad'] = True + result['has_reply'] = has_reply + logger.info(f"[任务 {task_index}] ✅ 任务完成: 点击成功={click_success}, 收到回复={has_reply}") + else: + result['error'] = "广告点击失败" + logger.warning(f"[任务 {task_index}] ❌ 广告点击失败") + + # 关闭浏览器 + try: + if browser: + browser.close() + time.sleep(0.5) + except: + pass + + # 停止浏览器 + try: + client.stop_browser(user_id=profile_id) + logger.debug(f"[任务 {task_index}] 浏览器已关闭") + time.sleep(1) + except Exception as e: + logger.warning(f"[任务 {task_index}] 停止浏览器失败: {str(e)}") + + # 删除浏览器Profile(释放资源) + try: + logger.debug(f"[任务 {task_index}] 删除浏览器Profile: {profile_id}") + client.delete_profile(profile_id) + except Exception as e: + logger.warning(f"[任务 {task_index}] 删除Profile异常: {str(e)}") + + except Exception as e: + logger.error(f"[任务 {task_index}] 执行异常: {str(e)}") + result['error'] = str(e) + import traceback + traceback.print_exc() + + return result diff --git a/test_baidu_crawler.py b/test_baidu_crawler.py new file mode 100644 index 0000000..92ab5ec --- /dev/null +++ b/test_baidu_crawler.py @@ -0,0 +1,98 @@ +""" +测试百度搜索爬虫 +""" + +from loguru import logger +from baidu_crawler import BaiduSearchCrawler +from db_manager import QueryTaskManager +from datetime import datetime +import sys + +logger.remove() +logger.add(sys.stdout, format="{time:HH:mm:ss} | {level: <8} | {message}") + + +def test_single_query(): + """测试爬取单个查询词""" + print("="*70) + print(" 测试爬取单个查询词") + print("="*70) + + # headless=False可以看到浏览器 + # 会自动滚动直到无新内容 + crawler = BaiduSearchCrawler(headless=False) + + # 测试查询,设置阈值50 + query_word = "糖尿病治疗" + result = crawler.crawl_query(query_word, category="医疗", threshold_max=50) + + print("\n爬取结果:") + print(f" 查询词: {result['query_word']}") + print(f" 是否成功: {result['success']}") + print(f" 爬取数量: {result['crawled_count']}") + print(f" 有效数量: {result['valid_count']}") + print(f" 新增数量: {result['new_count']}") + if result['error']: + print(f" 错误信息: {result['error']}") + + +def test_batch_crawl(): + """测试批量爬取任务""" + print("="*70) + print(" 测试批量爬取任务") + print("="*70) + + # 先创建一些测试任务 + task_mgr = QueryTaskManager() + task_date = datetime.now().strftime('%Y%m%d') + + test_queries = [ + ("高血压怎么治疗", "keyword", "医疗", 3, 30), # 阈值30 + ("在线教育平台哪个好", "phrase", "教育", 5, 20), # 阈值20 + ("免费法律咨询", "keyword", "法律", 4, 25), # 阈值25 + ] + + logger.info("创建测试任务...") + for query, qtype, category, priority, threshold in test_queries: + task_mgr.create_task( + query_word=query, + query_type=qtype, + task_date=task_date, + threshold_max=threshold, # 使用各自的阈值 + priority=priority, + category=category, + remark="测试任务" + ) + + print() + + # 执行批量爬取,会自动滚动直到达到阈值 + crawler = BaiduSearchCrawler(headless=False) + stats = crawler.crawl_tasks(limit=3) + + print("\n批量爬取统计:") + print(f" 总任务数: {stats['total_tasks']}") + print(f" 成功: {stats['success_count']}") + print(f" 失败: {stats['failed_count']}") + print(f" 总爬取: {stats['total_crawled']}") + print(f" 新增保存: {stats['total_saved']}") + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description='测试百度搜索爬虫') + parser.add_argument('--mode', choices=['single', 'batch'], default='single', + help='测试模式:single=单个查询, batch=批量任务') + + args = parser.parse_args() + + try: + if args.mode == 'single': + test_single_query() + else: + test_batch_crawl() + except Exception as e: + logger.error(f"测试失败: {str(e)}") + import traceback + traceback.print_exc() diff --git a/test_db_tasks.py b/test_db_tasks.py index f13ac05..e34f1e3 100644 --- a/test_db_tasks.py +++ b/test_db_tasks.py @@ -1,22 +1,19 @@ """ 从数据库读取URL并执行批量点击任务 支持单线程和并发两种模式 + +注意:此文件仅用于测试,生产环境请使用 main.py """ from loguru import logger -from adspower_client import AdsPowerClient -from ad_automation import MIPAdAutomation +from task_executor import TaskExecutor from config import Config -from db_manager import SiteManager, ClickManager, InteractionManager from data_manager import DataManager import sys import time -import random from datetime import datetime from pathlib import Path -from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import List, Dict, Optional -import threading +from concurrent.futures import ThreadPoolExecutor # 配置日志(添加线程标识 + 文件输出) logger.remove() @@ -29,7 +26,6 @@ logger.add( ) # 文件输出 -from pathlib import Path log_dir = Path("./logs") log_dir.mkdir(exist_ok=True) @@ -43,495 +39,87 @@ logger.add( ) -class DatabaseTaskExecutor: - """数据库任务执行器""" - - _browser_start_lock = threading.Lock() - - def __init__(self, max_workers: int = 1, use_proxy: bool = True): - """ - 初始化任务执行器 - - Args: - max_workers: 最大并发数(1=串行,>1=并发) - use_proxy: 是否使用代理 - """ - self.max_workers = max_workers - self.use_proxy = use_proxy - self.client = AdsPowerClient() - self.dm = DataManager() - - # 创建截图目录(按日期组织) - timestamp = datetime.now().strftime('%Y%m%d') - self.screenshot_dir = Path("./test") / f"batch_{timestamp}" - self.screenshot_dir.mkdir(parents=True, exist_ok=True) - - logger.info(f"执行模式: {'并发' if max_workers > 1 else '串行'}") - logger.info(f"最大并发数: {max_workers}") - logger.info(f"使用代理: {use_proxy}") - logger.info(f"截图目录: {self.screenshot_dir}") - - def get_active_tasks(self, limit: Optional[int] = None) -> List[Dict]: - """ - 从数据库获取活跃的站点任务 - - Args: - limit: 限制数量,None表示获取全部 - - Returns: - 站点列表 - """ - active_sites = self.dm.get_active_urls() - - if limit and limit > 0: - active_sites = active_sites[:limit] - - logger.info(f"从数据库获取 {len(active_sites)} 个活跃站点") - return active_sites - - def create_browser_profile(self, index: int) -> Dict: - """ - 创建浏览器环境 - - Args: - index: 环境编号 - - Returns: - 环境信息字典 - """ - try: - # 获取分组ID - group_id = self.client.get_group_by_env() - time.sleep(0.5) - - # 如果使用代理,获取代理配置 - proxy_config = {} - proxy_id = None - proxy_info = None - - if self.use_proxy: - logger.info(f"[环境 {index}] 获取代理IP...") - proxy_info = self.client.get_damai_proxy() - time.sleep(0.5) - - if proxy_info: - logger.info(f"[环境 {index}] 代理IP: {proxy_info['host']}:{proxy_info['port']}") - - proxy_data = { - "type": "http", - "host": proxy_info["host"], - "port": proxy_info["port"], - "user": self.client.DAMAI_USER, - "password": self.client.DAMAI_PASSWORD, - "remark": f"DB任务代理_{index}" - } - - proxy_id = self.client.create_proxy(proxy_data) - time.sleep(0.5) - - if proxy_id: - logger.info(f"[环境 {index}] 创建代理: {proxy_id}") - proxy_config = {"proxyid": proxy_id} - - # 创建 Profile - # 根据环境变量决定操作系统 - from config import Config - os_type = "Linux" if Config.ENV == "production" else "Windows" - - profile_data = { - "name": f"DB任务_{index}_{datetime.now().strftime('%H%M%S')}", - "group_id": str(group_id) if group_id else "0", - "platform": "health.baidu.com", - "repeat_config": [], - "ignore_cookie_error": "1", - "country": "cn", - "city": "beijing", - "remark": f"DB任务环境 #{index}", - "fingerprint_config": { - "automatic_timezone": "1", - "flash": "block", - "scan_port_type": "1", - "location": "ask", - "location_switch": "1", - "canvas": "0", - "webgl": "0", - "audio": "0", - "webrtc": "local", - "do_not_track": "true", - "hardware_concurrency": "default", - "device_memory": "default", - "gpu": "2", - "mac_address_config": { - "model": "1", - "address": "" - }, - "browser_kernel_config": { - "version": "latest", - "type": "chrome" - }, - "random_ua": { - "ua_system_version": [os_type] # 根据环境动态设置 - } - } - } - - logger.info(f"[环境 {index}] 操作系统: {os_type} (ENV={Config.ENV})") - - if proxy_config: - profile_data.update(proxy_config) - - response = self.client._make_request( - 'POST', - '/api/v2/browser-profile/create', - json=profile_data - ) - - if response and response.get('code') == 0: - profile_id = response.get('data', {}).get('profile_id') - logger.info(f"✅ 创建环境 #{index}: {profile_id}") - return { - 'index': index, - 'profile_id': profile_id, - 'name': profile_data['name'], - 'proxy': proxy_info, - 'proxy_id': proxy_id - } - else: - logger.error(f"❌ 创建环境 #{index} 失败: {response}") - return None - - except Exception as e: - logger.error(f"❌ 创建环境 #{index} 异常: {str(e)}") - return None - - def execute_single_task(self, site_info: Dict, task_index: int, profile_id: str = None) -> Dict: - """ - 执行单个点击任务 - - Args: - site_info: 站点信息 - task_index: 任务编号 - profile_id: 已创建的Profile ID(可选) - - Returns: - 执行结果 - """ - # 设置线程名称 - threading.current_thread().name = f"Task-{task_index}" - - site_id = site_info.get('id') - site_url = site_info.get('site_url', site_info.get('url')) - - result = { - 'task_index': task_index, - 'site_id': site_id, - 'site_url': site_url, - 'success': False, - 'click_count': 0, - 'has_ad': False, - 'has_reply': False, - 'error': None - } - - # 创建任务目录 - timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') - task_folder = self.screenshot_dir / f"task_{task_index}_{timestamp}" - task_folder.mkdir(exist_ok=True) - - # 每个线程创建自己的客户端实例 - client = AdsPowerClient() - proxy_info = None - - try: - logger.info(f"[任务 {task_index}] 开始执行: {site_url}") - - # 如果没有传入profile_id,则创建新的 - if not profile_id: - # 获取Profile列表 - profiles_data = client.list_profiles() - if not profiles_data: - result['error'] = "获取Profile列表失败" - return result - - profiles = profiles_data.get('data', {}).get('list', []) - if not profiles: - result['error'] = "没有可用的Profile" - return result - - # 使用第一个Profile - profile_id = profiles[0].get('profile_id') - logger.info(f"[任务 {task_index}] 使用Profile: {profile_id}") - - # 如果使用代理,更新Profile代理配置 - if self.use_proxy: - logger.info(f"[任务 {task_index}] 获取代理IP...") - proxy_info = client.get_damai_proxy() - time.sleep(0.5) - - if proxy_info: - logger.info(f"[任务 {task_index}] 代理IP: {proxy_info['host']}:{proxy_info['port']}") - - proxy_config = { - "proxy_type": "http", - "proxy_host": proxy_info["host"], - "proxy_port": proxy_info["port"], - "proxy_user": client.DAMAI_USER, - "proxy_password": client.DAMAI_PASSWORD, - "proxy_soft": "other" - } - - # 更新代理(使用API v1方式) - success = client.update_profile_proxy_v1(profile_id, proxy_config) - if success: - logger.info(f"[任务 {task_index}] 代理配置成功") - else: - logger.warning(f"[任务 {task_index}] 代理配置失败,继续执行") - - time.sleep(1) - - # 使用锁控制浏览器启动 - with self._browser_start_lock: - logger.debug(f"[任务 {task_index}] 启动浏览器...") - browser_info = client.start_browser(user_id=profile_id) - if not browser_info: - result['error'] = "启动浏览器失败" - return result - time.sleep(1.5) - - time.sleep(1) - - # 连接浏览器 - browser = client.connect_browser(browser_info) - if not browser: - result['error'] = "CDP连接失败" - return result - - # 获取页面 - context = browser.contexts[0] - all_pages = context.pages - logger.info(f"[任务 {task_index}] 当前标签页数: {len(all_pages)}") - - # 关闭AdsPower启动页 - closed_count = 0 - for p in all_pages: - try: - if 'start.adspower.net' in p.url: - logger.debug(f"[任务 {task_index}] 关闭启动页: {p.url}") - p.close() - closed_count += 1 - except: - pass - - if closed_count > 0: - logger.info(f"[任务 {task_index}] 已关闭 {closed_count} 个启动页") - - # 获取或创建页面 - remaining_pages = context.pages - if remaining_pages: - page = remaining_pages[0] - else: - page = context.new_page() - - # 使用 MIPAdAutomation 执行完整的广告点击和消息发送流程 - logger.info(f"[任务 {task_index}] 开始执行广告点击和咨询流程...") - automation = MIPAdAutomation(page, task_index=task_index) # 传入task_index创建日志目录 - click_success, has_reply = automation.check_and_click_ad( - url=site_url, - site_id=site_id - ) - - if click_success: - result['success'] = True - result['click_count'] = 1 - result['has_ad'] = True - result['has_reply'] = has_reply - logger.info(f"[任务 {task_index}] ✅ 任务完成: 点击成功={click_success}, 收到回复={has_reply}") - else: - result['error'] = "广告点击失败" - logger.warning(f"[任务 {task_index}] ❌ 广告点击失败") - - # 关闭浏览器连接 - try: - if browser: - browser.close() - time.sleep(0.5) - except: - pass - - # 停止浏览器 - try: - client.stop_browser(user_id=profile_id) - logger.info(f"[任务 {task_index}] 浏览器已关闭") - time.sleep(1) - except Exception as e: - logger.warning(f"[任务 {task_index}] 停止浏览器失败: {str(e)}") - - # 删除浏览器Profile(释放资源) - try: - logger.info(f"[任务 {task_index}] 删除浏览器Profile: {profile_id}") - delete_result = client.delete_profile(profile_id) - if delete_result: - logger.info(f"[任务 {task_index}] ✅ Profile已删除") - else: - logger.warning(f"[任务 {task_index}] Profile删除失败") - except Exception as e: - logger.warning(f"[任务 {task_index}] 删除Profile异常: {str(e)}") - - except Exception as e: - logger.error(f"[任务 {task_index}] 执行异常: {str(e)}") - result['error'] = str(e) - import traceback - traceback.print_exc() - - return result - - def run_tasks(self, limit: Optional[int] = None): - """ - 执行批量任务(边创建边执行) - - Args: - limit: 限制处理的站点数量,None表示处理全部 - """ - logger.info("=" * 60) - logger.info("从数据库执行批量点击任务") - logger.info("=" * 60) - - # 获取任务列表 - sites = self.get_active_tasks(limit=limit) - - if not sites: - logger.warning("没有可执行的任务") - return - - logger.info(f"\n准备执行 {len(sites)} 个任务\n") - - # 显示任务列表 - for idx, site in enumerate(sites, 1): - site_url = site.get('site_url', site.get('url')) - click_count = site.get('click_count', 0) - logger.info(f" {idx}. {site_url} (已点击: {click_count}次)") - - logger.info("\n" + "-" * 60) - logger.info("边创建环境边执行任务...\n") - - start_time = time.time() - results = [] - - if self.max_workers == 1: - # 串行执行:创建一个,执行一个 - for idx, site in enumerate(sites, 1): - # 创建环境 - logger.info(f"[任务 {idx}] 创建浏览器环境...") - profile_info = self.create_browser_profile(idx) - - if not profile_info: - logger.error(f"[任务 {idx}] 创建环境失败,跳过") - continue - - if profile_info.get('proxy'): - logger.info(f"[任务 {idx}] 使用代理: {profile_info['proxy']['host']}:{profile_info['proxy']['port']}") - - time.sleep(2) # 环境创建间隔 - - # 立即执行任务 - result = self.execute_single_task(site, idx, profile_info['profile_id']) - results.append(result) - - # 任务间隔 - if idx < len(sites): - wait_time = random.randint(3, 5) - logger.info(f"等待 {wait_time} 秒后执行下一个任务...\n") - time.sleep(wait_time) - else: - # 并发执行:边创建边提交 - with ThreadPoolExecutor(max_workers=self.max_workers) as executor: - futures = [] - - for idx, site in enumerate(sites, 1): - # 创建环境 - logger.info(f"[任务 {idx}] 创建浏览器环境...") - profile_info = self.create_browser_profile(idx) - - if not profile_info: - logger.error(f"[任务 {idx}] 创建环境失败,跳过") - continue - - if profile_info.get('proxy'): - logger.info(f"[任务 {idx}] 使用代理: {profile_info['proxy']['host']}:{profile_info['proxy']['port']}") - - # 立即提交任务到线程池 - future = executor.submit(self.execute_single_task, site, idx, profile_info['profile_id']) - futures.append((future, idx)) - - time.sleep(2) # 环境创建间隔 - - # 等待所有任务完成 - for future, idx in futures: - try: - result = future.result() - results.append(result) - - status = "成功" if result['success'] else "失败" - logger.info(f"[任务 {result['task_index']}] {status}") - - except Exception as e: - logger.error(f"[任务 {idx}] 执行异常: {str(e)}") - - # 统计结果 - elapsed_time = time.time() - start_time - - logger.info("\n" + "=" * 60) - logger.info("任务执行完成") - logger.info("=" * 60) - - success_count = sum(1 for r in results if r['success']) - failed_count = len(results) - success_count - has_ad_count = sum(1 for r in results if r['has_ad']) - has_reply_count = sum(1 for r in results if r.get('has_reply', False)) - total_clicks = sum(r['click_count'] for r in results) - - logger.info(f"总任务数: {len(results)}") - logger.info(f"成功数: {success_count}") - logger.info(f"失败数: {failed_count}") - logger.info(f"有广告页面: {has_ad_count}") - logger.info(f"总点击次数: {total_clicks}") - logger.info(f"收到回复数: {has_reply_count}") - logger.info(f"成功率: {success_count/len(results)*100:.1f}%") - logger.info(f"回复率: {has_reply_count/total_clicks*100 if total_clicks > 0 else 0:.1f}%") - logger.info(f"耗时: {elapsed_time:.1f} 秒") - - # 显示数据库统计 - logger.info("\n数据库统计:") - stats = self.dm.get_statistics() - for key, value in stats.items(): - logger.info(f" {key}: {value}") - - logger.info("\n" + "=" * 60) - +# ==================== 主程序 ==================== if __name__ == "__main__": - logger.info("数据库任务执行器") - logger.info(f"当前环境: {Config.ENV}") - logger.info(f"AdsPower API: {Config.ADSPOWER_API_URL}") - logger.info("") - - # ==================== 配置区 ==================== - # 执行模式:1=串行,>1=并发 - MAX_WORKERS = 3 + # 最大并发数(建议使用1,避免资源冲突) + MAX_WORKERS = 1 # 是否使用代理 USE_PROXY = True - # 限制执行数量(None=全部,数字=限制数量) - LIMIT = 3 + # 测试数量(None = 所有) + TEST_LIMIT = None - # 是否自动关闭浏览器(测试时可设为False保持浏览器打开) - Config.AUTO_CLOSE_BROWSER = False - # ===================================================== + logger.info("="*70) + logger.info(" 从数据库读取任务并执行点击") + logger.info("="*70) + logger.info(f"执行模式: {'并发' if MAX_WORKERS > 1 else '串行'}") + logger.info(f"最大并发数: {MAX_WORKERS}") + logger.info(f"使用代理: {USE_PROXY}") + logger.info(f"测试数量: {TEST_LIMIT or '全部'}") + logger.info("="*70) + logger.info("") - executor = DatabaseTaskExecutor( + # 创建任务执行器 + executor = TaskExecutor( max_workers=MAX_WORKERS, use_proxy=USE_PROXY ) - executor.run_tasks(limit=LIMIT) + # 获取活跃站点任务 + dm = DataManager() + active_sites = dm.get_active_urls() + + if TEST_LIMIT: + active_sites = active_sites[:TEST_LIMIT] + + logger.info(f"从数据库获取 {len(active_sites)} 个活跃站点") + logger.info("") + + if not active_sites: + logger.warning("❗ 没有找到活跃站点,退出") + sys.exit(0) + + # 执行任务 + start_time = time.time() + results = [] + + if MAX_WORKERS == 1: + # 串行模式 + logger.info("📊 串行模式,逐个执行...\n") + for idx, site_info in enumerate(active_sites, 1): + result = executor.execute_single_task(site_info, idx) + results.append(result) + logger.info("") + else: + # 并发模式 + logger.info(f"🚀 并发模式,最大 {MAX_WORKERS} 个线程...\n") + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool: + future_to_site = { + pool.submit(executor.execute_single_task, site_info, idx): site_info + for idx, site_info in enumerate(active_sites, 1) + } + + for future in future_to_site: + try: + result = future.result(timeout=300) + results.append(result) + except Exception as e: + logger.error(f"任务执行异常: {str(e)}") + + # 统计结果 + end_time = time.time() + elapsed_time = end_time - start_time + + success_count = sum(1 for r in results if r['success']) + fail_count = len(results) - success_count + + logger.info("="*70) + logger.info(" 执行结果") + logger.info("="*70) + logger.info(f"总任务数: {len(results)}") + logger.info(f"成功: {success_count}") + logger.info(f"失败: {fail_count}") + logger.info(f"总耗时: {elapsed_time:.2f}秒") + logger.info("="*70) diff --git a/test_query_task.py b/test_query_task.py new file mode 100644 index 0000000..3670548 --- /dev/null +++ b/test_query_task.py @@ -0,0 +1,128 @@ +""" +测试查询任务管理功能 +""" + +from loguru import logger +from db_manager import QueryTaskManager +from datetime import datetime +import sys + +logger.remove() +logger.add(sys.stdout, format="{time:HH:mm:ss} | {level: <8} | {message}") + + +def test_query_task_manager(): + """测试查询任务管理器""" + + print("=" * 70) + print(" 测试 QueryTaskManager") + print("=" * 70) + print() + + # 初始化管理器 + task_mgr = QueryTaskManager() + + # 1. 创建任务 + logger.info("【测试1】创建查询任务") + task_date = datetime.now().strftime('%Y%m%d') + + task_id1 = task_mgr.create_task( + query_word="高血压治疗方法", + query_type="keyword", + threshold_max=50, + priority=3, + category="医疗", + remark="测试任务1" + ) + + task_id2 = task_mgr.create_task( + query_word="在线教育平台推荐", + query_type="phrase", + threshold_max=30, + priority=5, + category="教育", + remark="测试任务2" + ) + + task_id3 = task_mgr.create_task( + query_word="法律咨询免费在线24小时", + query_type="long_tail", + threshold_max=20, + priority=7, + category="法律", + remark="测试任务3" + ) + + print() + + # 2. 获取ready任务 + logger.info("【测试2】获取ready任务") + ready_tasks = task_mgr.get_ready_tasks(limit=5) + logger.info(f"获取到 {len(ready_tasks)} 个ready任务") + for task in ready_tasks: + logger.info(f" - [{task['priority']}] {task['query_word']} ({task['category']}) - {task['status']}") + print() + + # 3. 更新任务状态 + if task_id1: + logger.info("【测试3】更新任务状态") + task_mgr.update_task_status(task_id1, 'doing') + task = task_mgr.get_task_by_id(task_id1) + logger.info(f"任务状态: {task['status']}, 开始时间: {task['started_at']}") + print() + + # 4. 增加抓取计数 + if task_id1: + logger.info("【测试4】增加抓取计数") + task_mgr.increment_crawl_count(task_id1, crawl_count=10, valid_count=7) + task = task_mgr.get_task_by_id(task_id1) + logger.info(f"已抓取: {task['crawl_url_count']}, 有效: {task['valid_url_count']}, 当前计数: {task['current_count']}") + print() + + # 5. 检查阈值 + if task_id1: + logger.info("【测试5】检查阈值") + reached = task_mgr.check_threshold(task_id1) + logger.info(f"是否达到阈值: {reached}") + print() + + # 6. 按日期获取任务 + logger.info("【测试6】按日期获取任务") + date_tasks = task_mgr.get_tasks_by_date(task_date) + logger.info(f"今天的任务数: {len(date_tasks)}") + print() + + # 7. 获取统计信息 + logger.info("【测试7】获取统计信息") + stats = task_mgr.get_task_statistics(task_date) + logger.info("任务统计:") + logger.info(f" 总任务数: {stats.get('total_tasks', 0)}") + logger.info(f" 准备中: {stats.get('ready_count', 0)}") + logger.info(f" 执行中: {stats.get('doing_count', 0)}") + logger.info(f" 已完成: {stats.get('finished_count', 0)}") + logger.info(f" 失败: {stats.get('failed_count', 0)}") + logger.info(f" 已关闭: {stats.get('closed_count', 0)}") + logger.info(f" 总抓取: {stats.get('total_crawled', 0)}") + logger.info(f" 总有效: {stats.get('total_valid', 0)}") + print() + + # 8. 完成任务 + if task_id1: + logger.info("【测试8】完成任务") + task_mgr.update_task_status(task_id1, 'finished') + task = task_mgr.get_task_by_id(task_id1) + logger.info(f"任务状态: {task['status']}, 完成时间: {task['finished_at']}") + print() + + print("=" * 70) + print(" 测试完成") + print("=" * 70) + + +if __name__ == "__main__": + try: + test_query_task_manager() + except Exception as e: + logger.error(f"测试失败: {str(e)}") + import traceback + traceback.print_exc()