diff --git a/ad_automation.py b/ad_automation.py index 5a94896..cbf814d 100644 --- a/ad_automation.py +++ b/ad_automation.py @@ -85,12 +85,16 @@ class MIPAdAutomation: if not has_ad: logger.info("未检测到商业广告,跳过该链接") + # 记录无广告 + self._record_click_failure(url, "未检测到商业广告") return False, False # 点击广告 logger.info("检测到商业广告,准备点击") if not self._click_advertisement(ad_element): logger.warning("点击广告失败") + # 记录点击失败 + self._record_click_failure(url, "广告点击失败,页面未跳转") return False, False # 记录点击到数据库 @@ -110,6 +114,11 @@ class MIPAdAutomation: except Exception as e: logger.error(f"处理链接异常: {str(e)}") + # 记录异常 + try: + self._record_click_failure(url, f"异常: {str(e)}") + except: + pass return False, False finally: # 尝试关闭当前标签页,返回主窗口 diff --git a/test_db_tasks.py b/test_db_tasks.py index cf47af2..6dbc36b 100644 --- a/test_db_tasks.py +++ b/test_db_tasks.py @@ -18,14 +18,30 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict, Optional import threading -# 配置日志(添加线程标识) +# 配置日志(添加线程标识 + 文件输出) logger.remove() + +# 控制台输出 logger.add( sys.stdout, format="{time:HH:mm:ss} | [{thread.name}] | {level: <8} | {message}", level="INFO" ) +# 文件输出 +from pathlib import Path +log_dir = Path("./logs") +log_dir.mkdir(exist_ok=True) + +logger.add( + log_dir / "db_tasks_{time:YYYY-MM-DD}.log", + format="{time:YYYY-MM-DD HH:mm:ss} | [{thread.name}] | {level: <8} | {message}", + level="INFO", + rotation="00:00", # 每天凌晨切割日志 + retention="30 days", # 保留30天 + encoding="utf-8" +) + class DatabaseTaskExecutor: """数据库任务执行器""" @@ -45,13 +61,15 @@ class DatabaseTaskExecutor: self.client = AdsPowerClient() self.dm = DataManager() - # 创建截图目录 - self.screenshot_dir = Path("./screenshots") - self.screenshot_dir.mkdir(exist_ok=True) + # 创建截图目录(按日期组织) + timestamp = datetime.now().strftime('%Y%m%d') + self.screenshot_dir = Path("./test") / f"batch_{timestamp}" + self.screenshot_dir.mkdir(parents=True, exist_ok=True) logger.info(f"执行模式: {'并发' if max_workers > 1 else '串行'}") logger.info(f"最大并发数: {max_workers}") logger.info(f"使用代理: {use_proxy}") + logger.info(f"截图目录: {self.screenshot_dir}") def get_active_tasks(self, limit: Optional[int] = None) -> List[Dict]: """