Files
ai_mip/scheduler.py
2026-02-24 12:46:35 +08:00

681 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
调度器模块 - 集成真正的任务执行
提供Web界面所需的调度器功能并执行实际点击任务
"""
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from typing import Dict, List, Optional
from loguru import logger
from config import Config
from data_manager import DataManager
class ClickScheduler:
"""
点击调度器 - Web界面集成版
提供调度器的控制和状态查询功能,并在后台线程中执行实际点击任务。
优先执行从未被点击过的链接。
"""
def __init__(self):
self.data_manager = DataManager()
self.running = False
self.start_time = None
# 点击记录: {site_id: {'last_click': datetime, 'today_count': int, 'target_count': int}}
self.click_records: Dict[int, dict] = {}
# 配置
self.work_start_hour = getattr(Config, 'WORK_START_HOUR', 9)
self.work_end_hour = getattr(Config, 'WORK_END_HOUR', 21)
self.click_interval_minutes = getattr(Config, 'CLICK_INTERVAL_MINUTES', 30)
# 后台线程
self._scheduler_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
# 运行状态
self.current_task = None # 当前执行的任务
self.current_executor = None # 当前执行器实例
self.current_profile_id = None # 当前浏览器profile ID
self.last_cycle_time = None
self.total_clicks_today = 0
self.error_count = 0
# 并发配置
self.max_workers = getattr(Config, 'MAX_CONCURRENT_WORKERS', 2)
# 线程安全锁
self._click_records_lock = threading.Lock()
self._stats_lock = threading.Lock()
# 使用代理
self.use_proxy = True
logger.info("Web调度器初始化完成集成任务执行")
def start_scheduler(self):
"""启动调度器"""
if self.running:
logger.warning("调度器已在运行中")
return
self.running = True
self.start_time = datetime.now()
self._stop_event.clear()
# 初始化每日记录
self.reset_daily_records()
# 启动后台调度线程
self._scheduler_thread = threading.Thread(
target=self._scheduler_loop,
name="SchedulerThread",
daemon=True
)
self._scheduler_thread.start()
logger.info("调度器已启动,后台任务线程运行中")
def stop_scheduler(self):
"""停止调度器,同时结束当前所有任务"""
if not self.running:
logger.warning("调度器未运行")
return
logger.info("正在停止调度器...")
self.running = False
self._stop_event.set()
# 关闭当前运行的浏览器
if self.current_profile_id and self.current_executor:
logger.info(f"正在关闭浏览器 (profile: {self.current_profile_id})...")
try:
self.current_executor.client.close_browser(self.current_profile_id)
logger.info("浏览器已关闭")
except Exception as e:
logger.warning(f"关闭浏览器失败: {str(e)}")
# 清理当前任务状态
self.current_executor = None
self.current_profile_id = None
self.current_task = None
# 等待线程结束最多等待10秒
if self._scheduler_thread and self._scheduler_thread.is_alive():
self._scheduler_thread.join(timeout=10)
logger.info("调度器已停止")
def is_working_time(self) -> bool:
"""检查当前是否是工作时间"""
now = datetime.now()
return self.work_start_hour <= now.hour < self.work_end_hour
def reset_daily_records(self):
"""重置每日点击记录"""
logger.info("=" * 50)
logger.info("重置每日点击记录")
logger.info("=" * 50)
# 获取所有活跃站点
sites = self.data_manager.get_active_urls()
# 为每个站点随机生成今日目标点击次数
self.click_records = {}
for site in sites:
site_id = site.get('id')
target_count = random.randint(
getattr(Config, 'MIN_CLICK_COUNT', 1),
getattr(Config, 'MAX_CLICK_COUNT', 3)
)
self.click_records[site_id] = {
'last_click': None,
'today_count': 0,
'target_count': target_count,
'site_url': site.get('site_url'),
'click_count': site.get('click_count', 0) # 历史总点击次数
}
total_target = sum(r['target_count'] for r in self.click_records.values())
logger.info(f"{len(sites)} 个站点,总目标点击次数: {total_target}")
def get_pending_sites(self) -> List[Dict]:
"""
获取待点击的站点列表
优先级排序:
1. 历史点击次数为0的站点优先
2. 今日点击次数少的站点优先
3. 距离上次点击时间长的站点优先
Returns:
待点击的站点列表(已按优先级排序)
"""
if not self.click_records:
logger.warning("点击记录为空,执行重置")
self.reset_daily_records()
# 动态检测新导入的站点
current_sites = self.data_manager.get_active_urls()
current_site_ids = {site.get('id') for site in current_sites}
existing_site_ids = set(self.click_records.keys())
# 发现新站点,自动添加到点击记录
new_site_ids = current_site_ids - existing_site_ids
if new_site_ids:
logger.info(f"发现 {len(new_site_ids)} 个新导入的站点,加入点击队列")
for site in current_sites:
site_id = site.get('id')
if site_id in new_site_ids:
target_count = random.randint(
getattr(Config, 'MIN_CLICK_COUNT', 1),
getattr(Config, 'MAX_CLICK_COUNT', 3)
)
self.click_records[site_id] = {
'last_click': None,
'today_count': 0,
'target_count': target_count,
'site_url': site.get('site_url'),
'click_count': site.get('click_count', 0)
}
logger.info(f"新站点 {site_id}: {site.get('site_url')} - 今日目标 {target_count}")
# 移除已删除的站点
removed_site_ids = existing_site_ids - current_site_ids
for site_id in removed_site_ids:
del self.click_records[site_id]
logger.info(f"站点 {site_id} 已从数据库删除,移除出点击队列")
now = datetime.now()
pending_sites = []
for site_id, record in self.click_records.items():
# 检查是否已完成今日目标
if record['today_count'] >= record['target_count']:
continue
# 检查点击间隔≥30分钟
if record['last_click']:
elapsed = (now - record['last_click']).total_seconds() / 60
if elapsed < self.click_interval_minutes:
continue
pending_sites.append({
'id': site_id,
'site_url': record['site_url'],
'today_count': record['today_count'],
'target_count': record['target_count'],
'click_count': record.get('click_count', 0), # 历史总点击次数
'last_click': record['last_click']
})
# 按优先级排序:
# 1. 历史点击次数为0的优先从未点击过
# 2. 今日点击次数少的优先
# 3. 距离上次点击时间长的优先last_click 为 None 排最前)
def sort_key(site):
click_count = site.get('click_count', 0)
today_count = site.get('today_count', 0)
last_click = site.get('last_click')
# 从未点击过的排最前 (0),点击过的按点击次数排 (1 + click_count)
priority1 = 0 if click_count == 0 else (1 + click_count)
# 今日点击次数少的优先
priority2 = today_count
# 距离上次点击时间长的优先None表示从未点击排最前
if last_click is None:
priority3 = 0
else:
# 将时间转换为负数,时间越早数值越小
priority3 = last_click.timestamp()
return (priority1, priority2, priority3)
pending_sites.sort(key=sort_key)
return pending_sites
def execute_click_task(self, site: Dict) -> bool:
"""
执行单个站点的点击任务
Args:
site: 站点信息
Returns:
是否成功
"""
site_id = site['id']
site_url = site['site_url']
self.current_task = {
'site_id': site_id,
'site_url': site_url,
'start_time': datetime.now()
}
logger.info(f"[站点 {site_id}] 开始点击: {site_url}")
logger.info(f"[站点 {site_id}] 今日进度: {site['today_count'] + 1}/{site['target_count']}, 历史总点击: {site.get('click_count', 0)}")
try:
# 检查是否被停止
if self._stop_event.is_set():
logger.info(f"[站点 {site_id}] 调度器已停止,跳过任务")
return False
# 延迟导入避免循环依赖
from task_executor import TaskExecutor
# 创建任务执行器
executor = TaskExecutor(
max_workers=1,
use_proxy=self.use_proxy
)
self.current_executor = executor # 保存执行器引用
# 获取完整站点信息
all_sites = self.data_manager.get_active_urls()
target_site = next((s for s in all_sites if s.get('id') == site_id), None)
if not target_site:
logger.error(f"[站点 {site_id}] 未找到站点信息")
return False
# 检查是否被停止
if self._stop_event.is_set():
logger.info(f"[站点 {site_id}] 调度器已停止,跳过任务")
return False
# 创建浏览器环境
logger.info(f"[站点 {site_id}] 创建浏览器环境...")
profile_info = executor.create_browser_profile(1)
if not profile_info:
logger.error(f"[站点 {site_id}] 创建浏览器环境失败")
return False
self.current_profile_id = profile_info['profile_id'] # 保存profile ID
# 检查是否被停止
if self._stop_event.is_set():
logger.info(f"[站点 {site_id}] 调度器已停止,关闭浏览器")
executor.client.close_browser(profile_info['profile_id'])
return False
time.sleep(2)
# 执行点击任务
logger.info(f"[站点 {site_id}] 执行点击任务...")
result = executor.execute_single_task(target_site, 1, profile_info['profile_id'])
# 清理当前任务状态
self.current_executor = None
self.current_profile_id = None
if result['success']:
# 更新点击记录
self.click_records[site_id]['last_click'] = datetime.now()
self.click_records[site_id]['today_count'] += 1
self.click_records[site_id]['click_count'] = self.click_records[site_id].get('click_count', 0) + 1
self.total_clicks_today += 1
logger.info(f"[站点 {site_id}] 点击完成: {self.click_records[site_id]['today_count']}/{self.click_records[site_id]['target_count']}")
return True
else:
self.error_count += 1
logger.warning(f"[站点 {site_id}] 点击失败: {result.get('error', '未知错误')}")
return False
except Exception as e:
self.error_count += 1
logger.error(f"[站点 {site_id}] 点击异常: {str(e)}")
import traceback
traceback.print_exc()
return False
finally:
self.current_task = None
self.current_executor = None
self.current_profile_id = None
def _scheduler_loop(self):
"""调度器主循环(后台线程)"""
logger.info("调度器线程启动")
# 检查是否需要立即执行
check_interval = 60 # 每60秒检查一次
cycle_interval = 10 * 60 # 每10分钟执行一次点击循环
last_cycle = 0
while not self._stop_event.is_set():
try:
now = time.time()
# 检查是否到了执行周期
if now - last_cycle >= cycle_interval:
self._run_click_cycle()
last_cycle = now
# 检查是否需要重置每日记录(跨天)
current_date = datetime.now().date()
if hasattr(self, '_last_reset_date') and self._last_reset_date != current_date:
self.reset_daily_records()
self._last_reset_date = current_date
else:
self._last_reset_date = current_date
# 等待下一次检查
self._stop_event.wait(timeout=check_interval)
except Exception as e:
logger.error(f"调度器循环异常: {str(e)}")
import traceback
traceback.print_exc()
time.sleep(30) # 出错后等待30秒再继续
logger.info("调度器线程结束")
def _run_click_cycle(self):
"""执行一次点击循环"""
# 检查工作时间
if not self.is_working_time():
current_time = datetime.now().strftime('%H:%M')
logger.info(f"当前时间 {current_time} 不在工作时间 ({self.work_start_hour}:00-{self.work_end_hour}:00),跳过")
return
self.last_cycle_time = datetime.now()
logger.info("-" * 50)
logger.info(f"开始点击循环 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info("-" * 50)
# 获取待点击站点(已按优先级排序)
pending_sites = self.get_pending_sites()
if not pending_sites:
logger.info("没有待点击的站点")
return
# 显示优先级信息
never_clicked = sum(1 for s in pending_sites if s.get('click_count', 0) == 0)
logger.info(f"找到 {len(pending_sites)} 个待点击站点,其中 {never_clicked} 个从未点击过(优先执行)")
# 根据并发数执行
if self.max_workers == 1:
# 串行执行
for site in pending_sites:
# 检查是否被停止
if self._stop_event.is_set() or not self.running:
logger.info("调度器已停止,终止点击循环")
break
# 检查工作时间
if not self.is_working_time():
current_time = datetime.now().strftime('%H:%M')
logger.info(f"当前时间 {current_time} 已超出工作时间,停止执行剩余任务")
break
# 执行点击
self.execute_click_task(site)
# 任务间随机间隔
if site != pending_sites[-1] and not self._stop_event.is_set():
wait_minutes = random.randint(
getattr(Config, 'MIN_TASK_INTERVAL_MINUTES', 3),
getattr(Config, 'MAX_TASK_INTERVAL_MINUTES', 5)
)
logger.info(f"等待 {wait_minutes} 分钟后执行下一个任务...")
self._stop_event.wait(timeout=wait_minutes * 60)
else:
# 并发执行
self._run_concurrent_cycle(pending_sites)
# 显示今日进度
completed = sum(1 for r in self.click_records.values() if r['today_count'] >= r['target_count'])
total = len(self.click_records)
total_clicks = sum(r['today_count'] for r in self.click_records.values())
target_clicks = sum(r['target_count'] for r in self.click_records.values())
logger.info("-" * 50)
logger.info(f"今日进度: {completed}/{total} 个站点完成")
logger.info(f"点击次数: {total_clicks}/{target_clicks}")
logger.info("-" * 50)
def _run_concurrent_cycle(self, pending_sites: List[Dict]):
"""
并发执行点击任务
Args:
pending_sites: 待点击的站点列表
"""
# 按 max_workers 分批
batch_size = self.max_workers
batches = [pending_sites[i:i+batch_size] for i in range(0, len(pending_sites), batch_size)]
logger.info(f"并发模式: 共 {len(pending_sites)} 个任务,分 {len(batches)} 批执行(每批最多 {batch_size} 个)")
for batch_idx, batch in enumerate(batches):
# 检查是否被停止
if self._stop_event.is_set() or not self.running:
logger.info("调度器已停止,终止点击循环")
break
# 检查工作时间
if not self.is_working_time():
current_time = datetime.now().strftime('%H:%M')
logger.info(f"当前时间 {current_time} 已超出工作时间,停止执行剩余批次")
break
logger.info(f"=" * 40)
logger.info(f"开始批次 {batch_idx+1}/{len(batches)}: {len(batch)} 个任务并发执行")
for i, site in enumerate(batch, 1):
logger.info(f" - [Worker {i}] Site {site['id']}: {site['site_url'][:50]}...")
batch_start_time = time.time()
success_count = 0
fail_count = 0
# 使用 ThreadPoolExecutor 并发执行
with ThreadPoolExecutor(max_workers=len(batch)) as executor:
futures = {
executor.submit(self._execute_click_task_wrapper, site, worker_id): site
for worker_id, site in enumerate(batch, 1)
}
# 等待所有任务完成
for future in as_completed(futures):
site = futures[future]
try:
result = future.result()
if result.get('success'):
success_count += 1
else:
fail_count += 1
except Exception as e:
fail_count += 1
logger.error(f"任务异常: Site {site['id']} - {e}")
batch_duration = (time.time() - batch_start_time) / 60
logger.info(f"批次 {batch_idx+1} 完成: 成功 {success_count}, 失败 {fail_count}, 耗时 {batch_duration:.1f} 分钟")
# 批次间随机间隔(不是最后一批)
if batch_idx < len(batches) - 1 and not self._stop_event.is_set():
# 再次检查工作时间和停止状态
if not self.is_working_time():
current_time = datetime.now().strftime('%H:%M')
logger.info(f"当前时间 {current_time} 已超出工作时间,停止执行")
break
wait_minutes = random.randint(
getattr(Config, 'MIN_TASK_INTERVAL_MINUTES', 3),
getattr(Config, 'MAX_TASK_INTERVAL_MINUTES', 5)
)
logger.info(f"等待 {wait_minutes} 分钟后执行下一批次...")
self._stop_event.wait(timeout=wait_minutes * 60)
def _execute_click_task_wrapper(self, site: Dict, worker_id: int) -> Dict:
"""
线程安全的任务执行包装器
Args:
site: 站点信息
worker_id: Worker编号用于日志标识
Returns:
执行结果字典
"""
from task_executor import TaskExecutor
site_id = site['id']
site_url = site['site_url']
# 错峰启动:每个 worker 间隔 5-10 秒,避免同时调用 AdsPower API 触发限频
if worker_id > 1:
stagger_delay = (worker_id - 1) * random.randint(5, 10)
logger.info(f"[Worker {worker_id}] [Site {site_id}] 错峰等待 {stagger_delay} 秒后启动...")
time.sleep(stagger_delay)
logger.info(f"[Worker {worker_id}] [Site {site_id}] 开始点击: {site_url[:50]}...")
executor = None
try:
# 创建独立的 TaskExecutor 实例
executor = TaskExecutor(max_workers=1, use_proxy=self.use_proxy)
# 创建浏览器环境(每个 worker 独立的 Profile 和代理)
profile_info = executor.create_browser_profile(worker_id)
if not profile_info:
logger.error(f"[Worker {worker_id}] [Site {site_id}] 创建浏览器环境失败")
with self._stats_lock:
self.error_count += 1
return {'success': False, 'error': '创建浏览器环境失败'}
time.sleep(2)
# 获取完整站点信息
all_sites = self.data_manager.get_active_urls()
target_site = next((s for s in all_sites if s.get('id') == site_id), None)
if not target_site:
logger.error(f"[Worker {worker_id}] [Site {site_id}] 未找到站点信息")
return {'success': False, 'error': '未找到站点信息'}
# 执行任务
result = executor.execute_single_task(target_site, worker_id, profile_info['profile_id'])
if result['success']:
# 线程安全更新点击记录
with self._click_records_lock:
if site_id in self.click_records:
self.click_records[site_id]['last_click'] = datetime.now()
self.click_records[site_id]['today_count'] += 1
self.click_records[site_id]['click_count'] = self.click_records[site_id].get('click_count', 0) + 1
with self._stats_lock:
self.total_clicks_today += 1
logger.info(f"[Worker {worker_id}] [Site {site_id}] ✅ 点击完成")
else:
with self._stats_lock:
self.error_count += 1
logger.warning(f"[Worker {worker_id}] [Site {site_id}] ⚠️ 点击失败: {result.get('error', '未知错误')}")
return result
except Exception as e:
with self._stats_lock:
self.error_count += 1
logger.error(f"[Worker {worker_id}] [Site {site_id}] ❌ 异常: {str(e)}")
import traceback
traceback.print_exc()
return {'success': False, 'error': str(e)}
finally:
# 确保资源清理
if executor:
try:
executor.close_browser()
except Exception as e:
logger.warning(f"[Worker {worker_id}] 清理资源失败: {e}")
# ========== 以下是原有的Web接口方法 ==========
def add_url(self, url: str) -> bool:
"""添加URL到调度队列"""
try:
site_id = self.data_manager.add_url(url)
if site_id:
# 初始化点击记录
self.click_records[site_id] = {
'last_click': None,
'today_count': 0,
'target_count': random.randint(
getattr(Config, 'MIN_CLICK_COUNT', 1),
getattr(Config, 'MAX_CLICK_COUNT', 3)
),
'click_count': 0
}
return True
return False
except Exception as e:
logger.error(f"添加URL失败: {str(e)}")
return False
def add_urls(self, urls: List[str]) -> int:
"""批量添加URL"""
count = 0
for url in urls:
if self.add_url(url):
count += 1
return count
def get_url_detail(self, url: str) -> Optional[Dict]:
"""获取URL详情"""
return self.data_manager.get_url_detail(url)
def get_statistics(self) -> Dict:
"""获取统计数据"""
return self.data_manager.get_statistics()
def get_queue_status(self) -> Dict:
"""获取任务队列状态"""
sites = self.data_manager.get_active_urls()
pending = []
completed = []
for site in sites:
site_id = site.get('id')
record = self.click_records.get(site_id, {})
site_info = {
'site_id': site_id,
'site_url': site.get('site_url'),
'site_name': site.get('site_name'),
'today_count': record.get('today_count', 0),
'target_count': record.get('target_count', 0),
'click_count': site.get('click_count', 0), # 历史总点击次数
'last_click': record.get('last_click')
}
if record.get('today_count', 0) >= record.get('target_count', 0):
completed.append(site_info)
else:
pending.append(site_info)
return {
'pending': pending[:20],
'running': self.current_task,
'completed': completed[:20],
'scheduler_status': 'running' if self.running else 'stopped',
'is_working_time': self.is_working_time(),
'total_pending': len(pending),
'total_completed': len(completed),
'total_clicks_today': self.total_clicks_today,
'error_count': self.error_count
}