""" 任务执行器模块 负责管理浏览器生命周期和执行点击任务 """ import time from typing import Dict, Optional from loguru import logger from config import Config from adspower_client import AdsPowerClient from ad_automation import MIPAdAutomation class TaskExecutor: """ 任务执行器 负责: 1. 管理AdsPower浏览器环境 2. 执行MIP广告点击任务 3. 记录执行结果 """ def __init__(self, max_workers: int = 1, use_proxy: bool = True): """ 初始化任务执行器 Args: max_workers: 最大并发数(当前仅支持1) use_proxy: 是否使用代理 """ self.max_workers = max_workers self.use_proxy = use_proxy self.client = AdsPowerClient() self._browser_info = None self._proxy_id = None # 保存创建的代理ID,用于关闭时清理 self._profile_id = None # 保存创建的Profile ID,用于关闭时清理 logger.info(f"TaskExecutor 初始化: max_workers={max_workers}, use_proxy={use_proxy}") def create_browser_profile(self, index: int = 1) -> Optional[Dict]: """ 创建浏览器环境(启动AdsPower浏览器) 流程: 1. 根据当前运行环境获取对应的分组ID(dev/prod) 2. 获取代理并创建AdsPower代理 3. 用代理ID创建新的profile 4. 启动浏览器 Args: index: 任务索引 Returns: 包含 profile_id 的字典,失败返回 None """ try: logger.info(f"[Task {index}] 创建浏览器环境...") # 1. 获取当前环境对应的分组ID (dev/prod) group_id = self.client.get_group_by_env() if not group_id: logger.error(f"[Task {index}] 获取分组ID失败,请确保AdsPower中存在dev或prod分组") return None logger.info(f"[Task {index}] 使用分组ID: {group_id}") # 2. 获取大麦代理并创建AdsPower代理 proxy_id = None if self.use_proxy: logger.info(f"[Task {index}] 获取大麦IP代理...") proxy_info = self.client.get_damai_proxy() if proxy_info: proxy_config = { "type": "http", "host": proxy_info["host"], "port": proxy_info["port"], "user": self.client.DAMAI_USER, "password": self.client.DAMAI_PASSWORD, "ipchecker": "ip2location", "remark": "Damai Auto Proxy" } proxy_id = self.client.create_proxy(proxy_config) if proxy_id: self._proxy_id = proxy_id logger.info(f"[Task {index}] 创建代理成功: {proxy_id}") else: logger.warning(f"[Task {index}] 创建代理失败,将不使用代理") else: logger.warning(f"[Task {index}] 获取大麦代理失败,将不使用代理") # 3. 创建新的profile(必须带proxy_id) if not proxy_id: logger.error(f"[Task {index}] 没有代理ID,无法创建profile") return None import time profile_name = f"task_{index}_{int(time.time())}" profile_id = self.client.create_profile(group_id=group_id, name=profile_name, proxy_id=proxy_id) if not profile_id: logger.error(f"[Task {index}] 创建profile失败") # 删除已创建的代理 if self._proxy_id: self.client.delete_proxy(self._proxy_id) self._proxy_id = None return None self._profile_id = profile_id logger.info(f"[Task {index}] 创建profile: {profile_id} (名称: {profile_name})") # 4. 启动浏览器 browser_info = self.client.start_browser(user_id=profile_id) if not browser_info or browser_info.get('code') != 0: error_msg = browser_info.get('msg', '未知错误') if browser_info else '无响应' logger.error(f"[Task {index}] 启动浏览器失败: {error_msg}") # 清理资源 self.client.delete_profile(profile_id) self._profile_id = None if self._proxy_id: self.client.delete_proxy(self._proxy_id) self._proxy_id = None return None self._browser_info = browser_info self.client.user_id = profile_id logger.info(f"[Task {index}] 浏览器已启动, profile_id: {profile_id}, proxy_id: {self._proxy_id}") return { 'profile_id': profile_id, 'browser_info': browser_info, 'proxy_id': self._proxy_id } except Exception as e: logger.error(f"[Task {index}] 创建浏览器环境异常: {str(e)}") import traceback traceback.print_exc() return None def execute_single_task(self, site: Dict, index: int, profile_id: str) -> Dict: """ 执行单个点击任务 Args: site: 站点信息,包含 id, site_url 等 index: 任务索引 profile_id: 浏览器 Profile ID Returns: 执行结果字典 {'success': bool, 'error': str} """ site_id = site.get('id') site_url = site.get('site_url') logger.info(f"[Task {index}] 开始执行: site_id={site_id}, url={site_url}") try: # 连接浏览器 if not self._browser_info: return {'success': False, 'error': '浏览器未启动'} browser = self.client.connect_browser(self._browser_info) if not browser: return {'success': False, 'error': '连接浏览器失败'} # 获取页面 page = self.client.get_page(browser) if not page: return {'success': False, 'error': '获取页面失败'} # 清理多余标签页 self._cleanup_tabs(browser) # 创建自动化实例并执行 automation = MIPAdAutomation(page, task_index=index) click_success, has_reply = automation.check_and_click_ad(site_url, site_id=site_id) if click_success: logger.info(f"[Task {index}] 点击成功, 收到回复: {has_reply}") return {'success': True, 'has_reply': has_reply} else: logger.warning(f"[Task {index}] 点击失败") return {'success': False, 'error': '点击广告失败'} except Exception as e: logger.error(f"[Task {index}] 执行异常: {str(e)}") import traceback traceback.print_exc() return {'success': False, 'error': str(e)} finally: # 关闭浏览器 self.close_browser(profile_id) def close_browser(self, profile_id: str = None): """ 关闭浏览器并清理资源(代理和Profile) Args: profile_id: Profile ID(可选) """ target_profile_id = profile_id or self._profile_id try: # 1. 关闭浏览器 logger.info(f"关闭浏览器: {target_profile_id or self.client.user_id}") self.client.stop_browser(user_id=target_profile_id) self._browser_info = None # 2. 删除创建的代理 if self._proxy_id: logger.info(f"删除代理: {self._proxy_id}") self.client.delete_proxy(self._proxy_id) self._proxy_id = None # 3. 删除创建的Profile if self._profile_id: logger.info(f"删除Profile: {self._profile_id}") self.client.delete_profile(self._profile_id) self._profile_id = None except Exception as e: logger.error(f"关闭浏览器异常: {str(e)}") def _cleanup_tabs(self, browser): """ 清理多余的标签页,只保留一个 Args: browser: Playwright Browser 实例 """ try: if browser.contexts: context = browser.contexts[0] pages = context.pages # 如果有多个标签页,关闭多余的 if len(pages) > 1: logger.info(f"清理多余标签页: {len(pages)} -> 1") for page in pages[1:]: try: page.close() except: pass except Exception as e: logger.debug(f"清理标签页异常: {str(e)}")