#!/usr/bin/env python # -*- coding: utf-8 -*- """ 任务执行器模块 提供广告点击任务的执行能力,包括: - 浏览器环境创建 - 单个任务执行 - 批量任务调度 """ from loguru import logger from adspower_client import AdsPowerClient from ad_automation import MIPAdAutomation from config import Config from data_manager import DataManager import time import threading from datetime import datetime from pathlib import Path from typing import List, Dict, Optional class TaskExecutor: """ 任务执行器 负责执行单个或批量广告点击任务。 支持代理配置、浏览器环境管理、任务结果追踪。 """ _browser_start_lock = threading.Lock() def __init__(self, max_workers: int = 1, use_proxy: bool = True): """ 初始化任务执行器 Args: max_workers: 最大并发数(1=串行,>1=并发) use_proxy: 是否使用代理 """ self.max_workers = max_workers self.use_proxy = use_proxy self.client = AdsPowerClient() self.dm = DataManager() # 创建截图目录(按日期组织) timestamp = datetime.now().strftime('%Y%m%d') self.screenshot_dir = Path("./test") / f"batch_{timestamp}" self.screenshot_dir.mkdir(parents=True, exist_ok=True) logger.debug(f"TaskExecutor initialized: workers={max_workers}, proxy={use_proxy}") def create_browser_profile(self, index: int) -> Optional[Dict]: """ 创建浏览器环境 Args: index: 环境编号 Returns: 环境信息字典,失败返回None """ try: # 获取分组ID group_id = self.client.get_group_by_env() time.sleep(0.5) # 如果使用代理,获取代理配置 proxy_config = {} proxy_id = None proxy_info = None if self.use_proxy: logger.info(f"[环境 {index}] 获取代理IP...") proxy_info = self.client.get_damai_proxy() time.sleep(0.5) if proxy_info: logger.info(f"[环境 {index}] 代理IP: {proxy_info['host']}:{proxy_info['port']}") proxy_data = { "type": "http", "host": proxy_info["host"], "port": proxy_info["port"], "user": self.client.DAMAI_USER, "password": self.client.DAMAI_PASSWORD, "remark": f"任务代理_{index}" } proxy_id = self.client.create_proxy(proxy_data) time.sleep(0.5) if proxy_id: logger.info(f"[环境 {index}] 创建代理: {proxy_id}") proxy_config = {"proxyid": proxy_id} # 根据环境变量决定操作系统 os_type = "Linux" if Config.ENV == "production" else "Windows" profile_data = { "name": f"任务_{index}_{datetime.now().strftime('%H%M%S')}", "group_id": str(group_id) if group_id else "0", "platform": "health.baidu.com", "repeat_config": [], "ignore_cookie_error": "1", "country": "cn", "city": "beijing", "remark": f"任务环境 #{index}", "fingerprint_config": { "automatic_timezone": "1", "flash": "block", "scan_port_type": "1", "location": "ask", "location_switch": "1", "canvas": "0", "webgl": "0", "audio": "0", "webrtc": "local", "do_not_track": "true", "hardware_concurrency": "default", "device_memory": "default", "gpu": "2", "mac_address_config": { "model": "1", "address": "" }, "browser_kernel_config": { "version": "latest", "type": "chrome" }, "random_ua": { "ua_system_version": [os_type] } } } logger.debug(f"[环境 {index}] 操作系统: {os_type} (ENV={Config.ENV})") if proxy_config: profile_data.update(proxy_config) response = self.client._make_request( 'POST', '/api/v2/browser-profile/create', json=profile_data ) if response and response.get('code') == 0: profile_id = response.get('data', {}).get('profile_id') logger.info(f"✅ 创建环境 #{index}: {profile_id}") return { 'index': index, 'profile_id': profile_id, 'name': profile_data['name'], 'proxy': proxy_info, 'proxy_id': proxy_id } else: logger.error(f"❌ 创建环境 #{index} 失败: {response}") return None except Exception as e: logger.error(f"❌ 创建环境 #{index} 异常: {str(e)}") return None def execute_single_task(self, site_info: Dict, task_index: int, profile_id: str = None) -> Dict: """ 执行单个点击任务 Args: site_info: 站点信息 task_index: 任务编号 profile_id: 已创建的Profile ID(可选) Returns: 执行结果字典 """ # 设置线程名称 threading.current_thread().name = f"Task-{task_index}" site_id = site_info.get('id') site_url = site_info.get('site_url', site_info.get('url')) result = { 'task_index': task_index, 'site_id': site_id, 'site_url': site_url, 'success': False, 'click_count': 0, 'has_ad': False, 'has_reply': False, 'error': None } # 创建任务目录 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') task_folder = self.screenshot_dir / f"task_{task_index}_{timestamp}" task_folder.mkdir(exist_ok=True) # 每个线程创建自己的客户端实例 client = AdsPowerClient() try: logger.info(f"[任务 {task_index}] 开始执行: {site_url}") # 如果没有传入profile_id,则创建新的 if not profile_id: profiles_data = client.list_profiles() if not profiles_data: result['error'] = "获取Profile列表失败" return result profiles = profiles_data.get('data', {}).get('list', []) if not profiles: result['error'] = "没有可用的Profile" return result profile_id = profiles[0].get('profile_id') logger.info(f"[任务 {task_index}] 使用Profile: {profile_id}") # 使用锁控制浏览器启动 with self._browser_start_lock: logger.debug(f"[任务 {task_index}] 启动浏览器...") browser_info = client.start_browser(user_id=profile_id) if not browser_info: result['error'] = "启动浏览器失败" return result time.sleep(1.5) time.sleep(1) # 连接浏览器 browser = client.connect_browser(browser_info) if not browser: result['error'] = "CDP连接失败" return result # 获取页面 context = browser.contexts[0] all_pages = context.pages logger.debug(f"[任务 {task_index}] 当前标签页数: {len(all_pages)}") # 关闭AdsPower启动页 for p in all_pages: try: if 'start.adspower.net' in p.url: p.close() except: pass # 获取或创建页面 remaining_pages = context.pages page = remaining_pages[0] if remaining_pages else context.new_page() # 执行广告点击和消息发送流程 logger.info(f"[任务 {task_index}] 开始执行广告点击和咨询流程...") automation = MIPAdAutomation(page, task_index=task_index) click_success, has_reply = automation.check_and_click_ad( url=site_url, site_id=site_id ) if click_success: result['success'] = True result['click_count'] = 1 result['has_ad'] = True result['has_reply'] = has_reply logger.info(f"[任务 {task_index}] ✅ 任务完成: 点击成功={click_success}, 收到回复={has_reply}") else: result['error'] = "广告点击失败" logger.warning(f"[任务 {task_index}] ❌ 广告点击失败") # 关闭浏览器 try: if browser: browser.close() time.sleep(0.5) except: pass # 停止浏览器 try: client.stop_browser(user_id=profile_id) logger.debug(f"[任务 {task_index}] 浏览器已关闭") time.sleep(1) except Exception as e: logger.warning(f"[任务 {task_index}] 停止浏览器失败: {str(e)}") # 删除浏览器Profile(释放资源) try: logger.debug(f"[任务 {task_index}] 删除浏览器Profile: {profile_id}") client.delete_profile(profile_id) except Exception as e: logger.warning(f"[任务 {task_index}] 删除Profile异常: {str(e)}") except Exception as e: logger.error(f"[任务 {task_index}] 执行异常: {str(e)}") result['error'] = str(e) import traceback traceback.print_exc() return result