Files
ai_mip/task_executor.py
2026-02-24 12:46:35 +08:00

251 lines
9.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
任务执行器模块
负责管理浏览器生命周期和执行点击任务
"""
import time
from typing import Dict, Optional
from loguru import logger
from config import Config
from adspower_client import AdsPowerClient
from ad_automation import MIPAdAutomation
class TaskExecutor:
"""
任务执行器
负责:
1. 管理AdsPower浏览器环境
2. 执行MIP广告点击任务
3. 记录执行结果
"""
def __init__(self, max_workers: int = 1, use_proxy: bool = True):
"""
初始化任务执行器
Args:
max_workers: 最大并发数当前仅支持1
use_proxy: 是否使用代理
"""
self.max_workers = max_workers
self.use_proxy = use_proxy
self.client = AdsPowerClient()
self._browser_info = None
self._proxy_id = None # 保存创建的代理ID用于关闭时清理
self._profile_id = None # 保存创建的Profile ID用于关闭时清理
logger.info(f"TaskExecutor 初始化: max_workers={max_workers}, use_proxy={use_proxy}")
def create_browser_profile(self, index: int = 1) -> Optional[Dict]:
"""
创建浏览器环境启动AdsPower浏览器
流程:
1. 根据当前运行环境获取对应的分组IDdev/prod
2. 获取代理并创建AdsPower代理
3. 用代理ID创建新的profile
4. 启动浏览器
Args:
index: 任务索引
Returns:
包含 profile_id 的字典,失败返回 None
"""
try:
logger.info(f"[Task {index}] 创建浏览器环境...")
# 1. 获取当前环境对应的分组ID (dev/prod)
group_id = self.client.get_group_by_env()
if not group_id:
logger.error(f"[Task {index}] 获取分组ID失败请确保AdsPower中存在dev或prod分组")
return None
logger.info(f"[Task {index}] 使用分组ID: {group_id}")
# 2. 获取大麦代理并创建AdsPower代理
proxy_id = None
if self.use_proxy:
logger.info(f"[Task {index}] 获取大麦IP代理...")
proxy_info = self.client.get_damai_proxy()
if proxy_info:
proxy_config = {
"type": "http",
"host": proxy_info["host"],
"port": proxy_info["port"],
"user": self.client.DAMAI_USER,
"password": self.client.DAMAI_PASSWORD,
"ipchecker": "ip2location",
"remark": "Damai Auto Proxy"
}
proxy_id = self.client.create_proxy(proxy_config)
if proxy_id:
self._proxy_id = proxy_id
logger.info(f"[Task {index}] 创建代理成功: {proxy_id}")
else:
logger.warning(f"[Task {index}] 创建代理失败,将不使用代理")
else:
logger.warning(f"[Task {index}] 获取大麦代理失败,将不使用代理")
# 3. 创建新的profile必须带proxy_id
if not proxy_id:
logger.error(f"[Task {index}] 没有代理ID无法创建profile")
return None
import time
profile_name = f"task_{index}_{int(time.time())}"
profile_id = self.client.create_profile(group_id=group_id, name=profile_name, proxy_id=proxy_id)
if not profile_id:
logger.error(f"[Task {index}] 创建profile失败")
# 删除已创建的代理
if self._proxy_id:
self.client.delete_proxy(self._proxy_id)
self._proxy_id = None
return None
self._profile_id = profile_id
logger.info(f"[Task {index}] 创建profile: {profile_id} (名称: {profile_name})")
# 4. 启动浏览器
browser_info = self.client.start_browser(user_id=profile_id)
if not browser_info or browser_info.get('code') != 0:
error_msg = browser_info.get('msg', '未知错误') if browser_info else '无响应'
logger.error(f"[Task {index}] 启动浏览器失败: {error_msg}")
# 清理资源
self.client.delete_profile(profile_id)
self._profile_id = None
if self._proxy_id:
self.client.delete_proxy(self._proxy_id)
self._proxy_id = None
return None
self._browser_info = browser_info
self.client.user_id = profile_id
logger.info(f"[Task {index}] 浏览器已启动, profile_id: {profile_id}, proxy_id: {self._proxy_id}")
return {
'profile_id': profile_id,
'browser_info': browser_info,
'proxy_id': self._proxy_id
}
except Exception as e:
logger.error(f"[Task {index}] 创建浏览器环境异常: {str(e)}")
import traceback
traceback.print_exc()
return None
def execute_single_task(self, site: Dict, index: int, profile_id: str) -> Dict:
"""
执行单个点击任务
Args:
site: 站点信息,包含 id, site_url 等
index: 任务索引
profile_id: 浏览器 Profile ID
Returns:
执行结果字典 {'success': bool, 'error': str}
"""
site_id = site.get('id')
site_url = site.get('site_url')
logger.info(f"[Task {index}] 开始执行: site_id={site_id}, url={site_url}")
try:
# 连接浏览器
if not self._browser_info:
return {'success': False, 'error': '浏览器未启动'}
browser = self.client.connect_browser(self._browser_info)
if not browser:
return {'success': False, 'error': '连接浏览器失败'}
# 获取页面
page = self.client.get_page(browser)
if not page:
return {'success': False, 'error': '获取页面失败'}
# 清理多余标签页
self._cleanup_tabs(browser)
# 创建自动化实例并执行
automation = MIPAdAutomation(page, task_index=index)
click_success, has_reply = automation.check_and_click_ad(site_url, site_id=site_id)
if click_success:
logger.info(f"[Task {index}] 点击成功, 收到回复: {has_reply}")
return {'success': True, 'has_reply': has_reply}
else:
logger.warning(f"[Task {index}] 点击失败")
return {'success': False, 'error': '点击广告失败'}
except Exception as e:
logger.error(f"[Task {index}] 执行异常: {str(e)}")
import traceback
traceback.print_exc()
return {'success': False, 'error': str(e)}
finally:
# 关闭浏览器
self.close_browser(profile_id)
def close_browser(self, profile_id: str = None):
"""
关闭浏览器并清理资源代理和Profile
Args:
profile_id: Profile ID可选
"""
target_profile_id = profile_id or self._profile_id
try:
# 1. 关闭浏览器
logger.info(f"关闭浏览器: {target_profile_id or self.client.user_id}")
self.client.stop_browser(user_id=target_profile_id)
self._browser_info = None
# 2. 删除创建的代理
if self._proxy_id:
logger.info(f"删除代理: {self._proxy_id}")
self.client.delete_proxy(self._proxy_id)
self._proxy_id = None
# 3. 删除创建的Profile
if self._profile_id:
logger.info(f"删除Profile: {self._profile_id}")
self.client.delete_profile(self._profile_id)
self._profile_id = None
except Exception as e:
logger.error(f"关闭浏览器异常: {str(e)}")
def _cleanup_tabs(self, browser):
"""
清理多余的标签页,只保留一个
Args:
browser: Playwright Browser 实例
"""
try:
if browser.contexts:
context = browser.contexts[0]
pages = context.pages
# 如果有多个标签页,关闭多余的
if len(pages) > 1:
logger.info(f"清理多余标签页: {len(pages)} -> 1")
for page in pages[1:]:
try:
page.close()
except:
pass
except Exception as e:
logger.debug(f"清理标签页异常: {str(e)}")