681 lines
27 KiB
Python
681 lines
27 KiB
Python
"""
|
||
调度器模块 - 集成真正的任务执行
|
||
提供Web界面所需的调度器功能,并执行实际点击任务
|
||
"""
|
||
import random
|
||
import threading
|
||
import time
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from datetime import datetime
|
||
from typing import Dict, List, Optional
|
||
from loguru import logger
|
||
|
||
from config import Config
|
||
from data_manager import DataManager
|
||
|
||
|
||
class ClickScheduler:
|
||
"""
|
||
点击调度器 - Web界面集成版
|
||
|
||
提供调度器的控制和状态查询功能,并在后台线程中执行实际点击任务。
|
||
优先执行从未被点击过的链接。
|
||
"""
|
||
|
||
def __init__(self):
|
||
self.data_manager = DataManager()
|
||
self.running = False
|
||
self.start_time = None
|
||
|
||
# 点击记录: {site_id: {'last_click': datetime, 'today_count': int, 'target_count': int}}
|
||
self.click_records: Dict[int, dict] = {}
|
||
|
||
# 配置
|
||
self.work_start_hour = getattr(Config, 'WORK_START_HOUR', 9)
|
||
self.work_end_hour = getattr(Config, 'WORK_END_HOUR', 21)
|
||
self.click_interval_minutes = getattr(Config, 'CLICK_INTERVAL_MINUTES', 30)
|
||
|
||
# 后台线程
|
||
self._scheduler_thread: Optional[threading.Thread] = None
|
||
self._stop_event = threading.Event()
|
||
|
||
# 运行状态
|
||
self.current_task = None # 当前执行的任务
|
||
self.current_executor = None # 当前执行器实例
|
||
self.current_profile_id = None # 当前浏览器profile ID
|
||
self.last_cycle_time = None
|
||
self.total_clicks_today = 0
|
||
self.error_count = 0
|
||
|
||
# 并发配置
|
||
self.max_workers = getattr(Config, 'MAX_CONCURRENT_WORKERS', 2)
|
||
|
||
# 线程安全锁
|
||
self._click_records_lock = threading.Lock()
|
||
self._stats_lock = threading.Lock()
|
||
|
||
# 使用代理
|
||
self.use_proxy = True
|
||
|
||
logger.info("Web调度器初始化完成(集成任务执行)")
|
||
|
||
def start_scheduler(self):
|
||
"""启动调度器"""
|
||
if self.running:
|
||
logger.warning("调度器已在运行中")
|
||
return
|
||
|
||
self.running = True
|
||
self.start_time = datetime.now()
|
||
self._stop_event.clear()
|
||
|
||
# 初始化每日记录
|
||
self.reset_daily_records()
|
||
|
||
# 启动后台调度线程
|
||
self._scheduler_thread = threading.Thread(
|
||
target=self._scheduler_loop,
|
||
name="SchedulerThread",
|
||
daemon=True
|
||
)
|
||
self._scheduler_thread.start()
|
||
|
||
logger.info("调度器已启动,后台任务线程运行中")
|
||
|
||
def stop_scheduler(self):
|
||
"""停止调度器,同时结束当前所有任务"""
|
||
if not self.running:
|
||
logger.warning("调度器未运行")
|
||
return
|
||
|
||
logger.info("正在停止调度器...")
|
||
self.running = False
|
||
self._stop_event.set()
|
||
|
||
# 关闭当前运行的浏览器
|
||
if self.current_profile_id and self.current_executor:
|
||
logger.info(f"正在关闭浏览器 (profile: {self.current_profile_id})...")
|
||
try:
|
||
self.current_executor.client.close_browser(self.current_profile_id)
|
||
logger.info("浏览器已关闭")
|
||
except Exception as e:
|
||
logger.warning(f"关闭浏览器失败: {str(e)}")
|
||
|
||
# 清理当前任务状态
|
||
self.current_executor = None
|
||
self.current_profile_id = None
|
||
self.current_task = None
|
||
|
||
# 等待线程结束(最多等待10秒)
|
||
if self._scheduler_thread and self._scheduler_thread.is_alive():
|
||
self._scheduler_thread.join(timeout=10)
|
||
|
||
logger.info("调度器已停止")
|
||
|
||
def is_working_time(self) -> bool:
|
||
"""检查当前是否是工作时间"""
|
||
now = datetime.now()
|
||
return self.work_start_hour <= now.hour < self.work_end_hour
|
||
|
||
def reset_daily_records(self):
|
||
"""重置每日点击记录"""
|
||
logger.info("=" * 50)
|
||
logger.info("重置每日点击记录")
|
||
logger.info("=" * 50)
|
||
|
||
# 获取所有活跃站点
|
||
sites = self.data_manager.get_active_urls()
|
||
|
||
# 为每个站点随机生成今日目标点击次数
|
||
self.click_records = {}
|
||
for site in sites:
|
||
site_id = site.get('id')
|
||
target_count = random.randint(
|
||
getattr(Config, 'MIN_CLICK_COUNT', 1),
|
||
getattr(Config, 'MAX_CLICK_COUNT', 3)
|
||
)
|
||
self.click_records[site_id] = {
|
||
'last_click': None,
|
||
'today_count': 0,
|
||
'target_count': target_count,
|
||
'site_url': site.get('site_url'),
|
||
'click_count': site.get('click_count', 0) # 历史总点击次数
|
||
}
|
||
|
||
total_target = sum(r['target_count'] for r in self.click_records.values())
|
||
logger.info(f"共 {len(sites)} 个站点,总目标点击次数: {total_target}")
|
||
|
||
def get_pending_sites(self) -> List[Dict]:
|
||
"""
|
||
获取待点击的站点列表
|
||
|
||
优先级排序:
|
||
1. 历史点击次数为0的站点优先
|
||
2. 今日点击次数少的站点优先
|
||
3. 距离上次点击时间长的站点优先
|
||
|
||
Returns:
|
||
待点击的站点列表(已按优先级排序)
|
||
"""
|
||
if not self.click_records:
|
||
logger.warning("点击记录为空,执行重置")
|
||
self.reset_daily_records()
|
||
|
||
# 动态检测新导入的站点
|
||
current_sites = self.data_manager.get_active_urls()
|
||
current_site_ids = {site.get('id') for site in current_sites}
|
||
existing_site_ids = set(self.click_records.keys())
|
||
|
||
# 发现新站点,自动添加到点击记录
|
||
new_site_ids = current_site_ids - existing_site_ids
|
||
if new_site_ids:
|
||
logger.info(f"发现 {len(new_site_ids)} 个新导入的站点,加入点击队列")
|
||
for site in current_sites:
|
||
site_id = site.get('id')
|
||
if site_id in new_site_ids:
|
||
target_count = random.randint(
|
||
getattr(Config, 'MIN_CLICK_COUNT', 1),
|
||
getattr(Config, 'MAX_CLICK_COUNT', 3)
|
||
)
|
||
self.click_records[site_id] = {
|
||
'last_click': None,
|
||
'today_count': 0,
|
||
'target_count': target_count,
|
||
'site_url': site.get('site_url'),
|
||
'click_count': site.get('click_count', 0)
|
||
}
|
||
logger.info(f"新站点 {site_id}: {site.get('site_url')} - 今日目标 {target_count} 次")
|
||
|
||
# 移除已删除的站点
|
||
removed_site_ids = existing_site_ids - current_site_ids
|
||
for site_id in removed_site_ids:
|
||
del self.click_records[site_id]
|
||
logger.info(f"站点 {site_id} 已从数据库删除,移除出点击队列")
|
||
|
||
now = datetime.now()
|
||
pending_sites = []
|
||
|
||
for site_id, record in self.click_records.items():
|
||
# 检查是否已完成今日目标
|
||
if record['today_count'] >= record['target_count']:
|
||
continue
|
||
|
||
# 检查点击间隔(≥30分钟)
|
||
if record['last_click']:
|
||
elapsed = (now - record['last_click']).total_seconds() / 60
|
||
if elapsed < self.click_interval_minutes:
|
||
continue
|
||
|
||
pending_sites.append({
|
||
'id': site_id,
|
||
'site_url': record['site_url'],
|
||
'today_count': record['today_count'],
|
||
'target_count': record['target_count'],
|
||
'click_count': record.get('click_count', 0), # 历史总点击次数
|
||
'last_click': record['last_click']
|
||
})
|
||
|
||
# 按优先级排序:
|
||
# 1. 历史点击次数为0的优先(从未点击过)
|
||
# 2. 今日点击次数少的优先
|
||
# 3. 距离上次点击时间长的优先(last_click 为 None 排最前)
|
||
def sort_key(site):
|
||
click_count = site.get('click_count', 0)
|
||
today_count = site.get('today_count', 0)
|
||
last_click = site.get('last_click')
|
||
|
||
# 从未点击过的排最前 (0),点击过的按点击次数排 (1 + click_count)
|
||
priority1 = 0 if click_count == 0 else (1 + click_count)
|
||
|
||
# 今日点击次数少的优先
|
||
priority2 = today_count
|
||
|
||
# 距离上次点击时间长的优先(None表示从未点击,排最前)
|
||
if last_click is None:
|
||
priority3 = 0
|
||
else:
|
||
# 将时间转换为负数,时间越早数值越小
|
||
priority3 = last_click.timestamp()
|
||
|
||
return (priority1, priority2, priority3)
|
||
|
||
pending_sites.sort(key=sort_key)
|
||
|
||
return pending_sites
|
||
|
||
def execute_click_task(self, site: Dict) -> bool:
|
||
"""
|
||
执行单个站点的点击任务
|
||
|
||
Args:
|
||
site: 站点信息
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
site_id = site['id']
|
||
site_url = site['site_url']
|
||
|
||
self.current_task = {
|
||
'site_id': site_id,
|
||
'site_url': site_url,
|
||
'start_time': datetime.now()
|
||
}
|
||
|
||
logger.info(f"[站点 {site_id}] 开始点击: {site_url}")
|
||
logger.info(f"[站点 {site_id}] 今日进度: {site['today_count'] + 1}/{site['target_count']}, 历史总点击: {site.get('click_count', 0)}")
|
||
|
||
try:
|
||
# 检查是否被停止
|
||
if self._stop_event.is_set():
|
||
logger.info(f"[站点 {site_id}] 调度器已停止,跳过任务")
|
||
return False
|
||
|
||
# 延迟导入避免循环依赖
|
||
from task_executor import TaskExecutor
|
||
|
||
# 创建任务执行器
|
||
executor = TaskExecutor(
|
||
max_workers=1,
|
||
use_proxy=self.use_proxy
|
||
)
|
||
self.current_executor = executor # 保存执行器引用
|
||
|
||
# 获取完整站点信息
|
||
all_sites = self.data_manager.get_active_urls()
|
||
target_site = next((s for s in all_sites if s.get('id') == site_id), None)
|
||
|
||
if not target_site:
|
||
logger.error(f"[站点 {site_id}] 未找到站点信息")
|
||
return False
|
||
|
||
# 检查是否被停止
|
||
if self._stop_event.is_set():
|
||
logger.info(f"[站点 {site_id}] 调度器已停止,跳过任务")
|
||
return False
|
||
|
||
# 创建浏览器环境
|
||
logger.info(f"[站点 {site_id}] 创建浏览器环境...")
|
||
profile_info = executor.create_browser_profile(1)
|
||
if not profile_info:
|
||
logger.error(f"[站点 {site_id}] 创建浏览器环境失败")
|
||
return False
|
||
|
||
self.current_profile_id = profile_info['profile_id'] # 保存profile ID
|
||
|
||
# 检查是否被停止
|
||
if self._stop_event.is_set():
|
||
logger.info(f"[站点 {site_id}] 调度器已停止,关闭浏览器")
|
||
executor.client.close_browser(profile_info['profile_id'])
|
||
return False
|
||
|
||
time.sleep(2)
|
||
|
||
# 执行点击任务
|
||
logger.info(f"[站点 {site_id}] 执行点击任务...")
|
||
result = executor.execute_single_task(target_site, 1, profile_info['profile_id'])
|
||
|
||
# 清理当前任务状态
|
||
self.current_executor = None
|
||
self.current_profile_id = None
|
||
|
||
if result['success']:
|
||
# 更新点击记录
|
||
self.click_records[site_id]['last_click'] = datetime.now()
|
||
self.click_records[site_id]['today_count'] += 1
|
||
self.click_records[site_id]['click_count'] = self.click_records[site_id].get('click_count', 0) + 1
|
||
self.total_clicks_today += 1
|
||
|
||
logger.info(f"[站点 {site_id}] 点击完成: {self.click_records[site_id]['today_count']}/{self.click_records[site_id]['target_count']}")
|
||
return True
|
||
else:
|
||
self.error_count += 1
|
||
logger.warning(f"[站点 {site_id}] 点击失败: {result.get('error', '未知错误')}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
self.error_count += 1
|
||
logger.error(f"[站点 {site_id}] 点击异常: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
finally:
|
||
self.current_task = None
|
||
self.current_executor = None
|
||
self.current_profile_id = None
|
||
|
||
def _scheduler_loop(self):
|
||
"""调度器主循环(后台线程)"""
|
||
logger.info("调度器线程启动")
|
||
|
||
# 检查是否需要立即执行
|
||
check_interval = 60 # 每60秒检查一次
|
||
cycle_interval = 10 * 60 # 每10分钟执行一次点击循环
|
||
last_cycle = 0
|
||
|
||
while not self._stop_event.is_set():
|
||
try:
|
||
now = time.time()
|
||
|
||
# 检查是否到了执行周期
|
||
if now - last_cycle >= cycle_interval:
|
||
self._run_click_cycle()
|
||
last_cycle = now
|
||
|
||
# 检查是否需要重置每日记录(跨天)
|
||
current_date = datetime.now().date()
|
||
if hasattr(self, '_last_reset_date') and self._last_reset_date != current_date:
|
||
self.reset_daily_records()
|
||
self._last_reset_date = current_date
|
||
else:
|
||
self._last_reset_date = current_date
|
||
|
||
# 等待下一次检查
|
||
self._stop_event.wait(timeout=check_interval)
|
||
|
||
except Exception as e:
|
||
logger.error(f"调度器循环异常: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
time.sleep(30) # 出错后等待30秒再继续
|
||
|
||
logger.info("调度器线程结束")
|
||
|
||
def _run_click_cycle(self):
|
||
"""执行一次点击循环"""
|
||
# 检查工作时间
|
||
if not self.is_working_time():
|
||
current_time = datetime.now().strftime('%H:%M')
|
||
logger.info(f"当前时间 {current_time} 不在工作时间 ({self.work_start_hour}:00-{self.work_end_hour}:00),跳过")
|
||
return
|
||
|
||
self.last_cycle_time = datetime.now()
|
||
|
||
logger.info("-" * 50)
|
||
logger.info(f"开始点击循环 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||
logger.info("-" * 50)
|
||
|
||
# 获取待点击站点(已按优先级排序)
|
||
pending_sites = self.get_pending_sites()
|
||
|
||
if not pending_sites:
|
||
logger.info("没有待点击的站点")
|
||
return
|
||
|
||
# 显示优先级信息
|
||
never_clicked = sum(1 for s in pending_sites if s.get('click_count', 0) == 0)
|
||
logger.info(f"找到 {len(pending_sites)} 个待点击站点,其中 {never_clicked} 个从未点击过(优先执行)")
|
||
|
||
# 根据并发数执行
|
||
if self.max_workers == 1:
|
||
# 串行执行
|
||
for site in pending_sites:
|
||
# 检查是否被停止
|
||
if self._stop_event.is_set() or not self.running:
|
||
logger.info("调度器已停止,终止点击循环")
|
||
break
|
||
|
||
# 检查工作时间
|
||
if not self.is_working_time():
|
||
current_time = datetime.now().strftime('%H:%M')
|
||
logger.info(f"当前时间 {current_time} 已超出工作时间,停止执行剩余任务")
|
||
break
|
||
|
||
# 执行点击
|
||
self.execute_click_task(site)
|
||
|
||
# 任务间随机间隔
|
||
if site != pending_sites[-1] and not self._stop_event.is_set():
|
||
wait_minutes = random.randint(
|
||
getattr(Config, 'MIN_TASK_INTERVAL_MINUTES', 3),
|
||
getattr(Config, 'MAX_TASK_INTERVAL_MINUTES', 5)
|
||
)
|
||
logger.info(f"等待 {wait_minutes} 分钟后执行下一个任务...")
|
||
self._stop_event.wait(timeout=wait_minutes * 60)
|
||
else:
|
||
# 并发执行
|
||
self._run_concurrent_cycle(pending_sites)
|
||
|
||
# 显示今日进度
|
||
completed = sum(1 for r in self.click_records.values() if r['today_count'] >= r['target_count'])
|
||
total = len(self.click_records)
|
||
total_clicks = sum(r['today_count'] for r in self.click_records.values())
|
||
target_clicks = sum(r['target_count'] for r in self.click_records.values())
|
||
|
||
logger.info("-" * 50)
|
||
logger.info(f"今日进度: {completed}/{total} 个站点完成")
|
||
logger.info(f"点击次数: {total_clicks}/{target_clicks} 次")
|
||
logger.info("-" * 50)
|
||
|
||
def _run_concurrent_cycle(self, pending_sites: List[Dict]):
|
||
"""
|
||
并发执行点击任务
|
||
|
||
Args:
|
||
pending_sites: 待点击的站点列表
|
||
"""
|
||
# 按 max_workers 分批
|
||
batch_size = self.max_workers
|
||
batches = [pending_sites[i:i+batch_size] for i in range(0, len(pending_sites), batch_size)]
|
||
|
||
logger.info(f"并发模式: 共 {len(pending_sites)} 个任务,分 {len(batches)} 批执行(每批最多 {batch_size} 个)")
|
||
|
||
for batch_idx, batch in enumerate(batches):
|
||
# 检查是否被停止
|
||
if self._stop_event.is_set() or not self.running:
|
||
logger.info("调度器已停止,终止点击循环")
|
||
break
|
||
|
||
# 检查工作时间
|
||
if not self.is_working_time():
|
||
current_time = datetime.now().strftime('%H:%M')
|
||
logger.info(f"当前时间 {current_time} 已超出工作时间,停止执行剩余批次")
|
||
break
|
||
|
||
logger.info(f"=" * 40)
|
||
logger.info(f"开始批次 {batch_idx+1}/{len(batches)}: {len(batch)} 个任务并发执行")
|
||
for i, site in enumerate(batch, 1):
|
||
logger.info(f" - [Worker {i}] Site {site['id']}: {site['site_url'][:50]}...")
|
||
|
||
batch_start_time = time.time()
|
||
success_count = 0
|
||
fail_count = 0
|
||
|
||
# 使用 ThreadPoolExecutor 并发执行
|
||
with ThreadPoolExecutor(max_workers=len(batch)) as executor:
|
||
futures = {
|
||
executor.submit(self._execute_click_task_wrapper, site, worker_id): site
|
||
for worker_id, site in enumerate(batch, 1)
|
||
}
|
||
|
||
# 等待所有任务完成
|
||
for future in as_completed(futures):
|
||
site = futures[future]
|
||
try:
|
||
result = future.result()
|
||
if result.get('success'):
|
||
success_count += 1
|
||
else:
|
||
fail_count += 1
|
||
except Exception as e:
|
||
fail_count += 1
|
||
logger.error(f"任务异常: Site {site['id']} - {e}")
|
||
|
||
batch_duration = (time.time() - batch_start_time) / 60
|
||
logger.info(f"批次 {batch_idx+1} 完成: 成功 {success_count}, 失败 {fail_count}, 耗时 {batch_duration:.1f} 分钟")
|
||
|
||
# 批次间随机间隔(不是最后一批)
|
||
if batch_idx < len(batches) - 1 and not self._stop_event.is_set():
|
||
# 再次检查工作时间和停止状态
|
||
if not self.is_working_time():
|
||
current_time = datetime.now().strftime('%H:%M')
|
||
logger.info(f"当前时间 {current_time} 已超出工作时间,停止执行")
|
||
break
|
||
|
||
wait_minutes = random.randint(
|
||
getattr(Config, 'MIN_TASK_INTERVAL_MINUTES', 3),
|
||
getattr(Config, 'MAX_TASK_INTERVAL_MINUTES', 5)
|
||
)
|
||
logger.info(f"等待 {wait_minutes} 分钟后执行下一批次...")
|
||
self._stop_event.wait(timeout=wait_minutes * 60)
|
||
|
||
def _execute_click_task_wrapper(self, site: Dict, worker_id: int) -> Dict:
|
||
"""
|
||
线程安全的任务执行包装器
|
||
|
||
Args:
|
||
site: 站点信息
|
||
worker_id: Worker编号(用于日志标识)
|
||
|
||
Returns:
|
||
执行结果字典
|
||
"""
|
||
from task_executor import TaskExecutor
|
||
|
||
site_id = site['id']
|
||
site_url = site['site_url']
|
||
|
||
# 错峰启动:每个 worker 间隔 5-10 秒,避免同时调用 AdsPower API 触发限频
|
||
if worker_id > 1:
|
||
stagger_delay = (worker_id - 1) * random.randint(5, 10)
|
||
logger.info(f"[Worker {worker_id}] [Site {site_id}] 错峰等待 {stagger_delay} 秒后启动...")
|
||
time.sleep(stagger_delay)
|
||
|
||
logger.info(f"[Worker {worker_id}] [Site {site_id}] 开始点击: {site_url[:50]}...")
|
||
|
||
executor = None
|
||
try:
|
||
# 创建独立的 TaskExecutor 实例
|
||
executor = TaskExecutor(max_workers=1, use_proxy=self.use_proxy)
|
||
|
||
# 创建浏览器环境(每个 worker 独立的 Profile 和代理)
|
||
profile_info = executor.create_browser_profile(worker_id)
|
||
if not profile_info:
|
||
logger.error(f"[Worker {worker_id}] [Site {site_id}] 创建浏览器环境失败")
|
||
with self._stats_lock:
|
||
self.error_count += 1
|
||
return {'success': False, 'error': '创建浏览器环境失败'}
|
||
|
||
time.sleep(2)
|
||
|
||
# 获取完整站点信息
|
||
all_sites = self.data_manager.get_active_urls()
|
||
target_site = next((s for s in all_sites if s.get('id') == site_id), None)
|
||
|
||
if not target_site:
|
||
logger.error(f"[Worker {worker_id}] [Site {site_id}] 未找到站点信息")
|
||
return {'success': False, 'error': '未找到站点信息'}
|
||
|
||
# 执行任务
|
||
result = executor.execute_single_task(target_site, worker_id, profile_info['profile_id'])
|
||
|
||
if result['success']:
|
||
# 线程安全更新点击记录
|
||
with self._click_records_lock:
|
||
if site_id in self.click_records:
|
||
self.click_records[site_id]['last_click'] = datetime.now()
|
||
self.click_records[site_id]['today_count'] += 1
|
||
self.click_records[site_id]['click_count'] = self.click_records[site_id].get('click_count', 0) + 1
|
||
with self._stats_lock:
|
||
self.total_clicks_today += 1
|
||
|
||
logger.info(f"[Worker {worker_id}] [Site {site_id}] ✅ 点击完成")
|
||
else:
|
||
with self._stats_lock:
|
||
self.error_count += 1
|
||
logger.warning(f"[Worker {worker_id}] [Site {site_id}] ⚠️ 点击失败: {result.get('error', '未知错误')}")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
with self._stats_lock:
|
||
self.error_count += 1
|
||
logger.error(f"[Worker {worker_id}] [Site {site_id}] ❌ 异常: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return {'success': False, 'error': str(e)}
|
||
finally:
|
||
# 确保资源清理
|
||
if executor:
|
||
try:
|
||
executor.close_browser()
|
||
except Exception as e:
|
||
logger.warning(f"[Worker {worker_id}] 清理资源失败: {e}")
|
||
|
||
# ========== 以下是原有的Web接口方法 ==========
|
||
|
||
def add_url(self, url: str) -> bool:
|
||
"""添加URL到调度队列"""
|
||
try:
|
||
site_id = self.data_manager.add_url(url)
|
||
if site_id:
|
||
# 初始化点击记录
|
||
self.click_records[site_id] = {
|
||
'last_click': None,
|
||
'today_count': 0,
|
||
'target_count': random.randint(
|
||
getattr(Config, 'MIN_CLICK_COUNT', 1),
|
||
getattr(Config, 'MAX_CLICK_COUNT', 3)
|
||
),
|
||
'click_count': 0
|
||
}
|
||
return True
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"添加URL失败: {str(e)}")
|
||
return False
|
||
|
||
def add_urls(self, urls: List[str]) -> int:
|
||
"""批量添加URL"""
|
||
count = 0
|
||
for url in urls:
|
||
if self.add_url(url):
|
||
count += 1
|
||
return count
|
||
|
||
def get_url_detail(self, url: str) -> Optional[Dict]:
|
||
"""获取URL详情"""
|
||
return self.data_manager.get_url_detail(url)
|
||
|
||
def get_statistics(self) -> Dict:
|
||
"""获取统计数据"""
|
||
return self.data_manager.get_statistics()
|
||
|
||
def get_queue_status(self) -> Dict:
|
||
"""获取任务队列状态"""
|
||
sites = self.data_manager.get_active_urls()
|
||
|
||
pending = []
|
||
completed = []
|
||
|
||
for site in sites:
|
||
site_id = site.get('id')
|
||
record = self.click_records.get(site_id, {})
|
||
|
||
site_info = {
|
||
'site_id': site_id,
|
||
'site_url': site.get('site_url'),
|
||
'site_name': site.get('site_name'),
|
||
'today_count': record.get('today_count', 0),
|
||
'target_count': record.get('target_count', 0),
|
||
'click_count': site.get('click_count', 0), # 历史总点击次数
|
||
'last_click': record.get('last_click')
|
||
}
|
||
|
||
if record.get('today_count', 0) >= record.get('target_count', 0):
|
||
completed.append(site_info)
|
||
else:
|
||
pending.append(site_info)
|
||
|
||
return {
|
||
'pending': pending[:20],
|
||
'running': self.current_task,
|
||
'completed': completed[:20],
|
||
'scheduler_status': 'running' if self.running else 'stopped',
|
||
'is_working_time': self.is_working_time(),
|
||
'total_pending': len(pending),
|
||
'total_completed': len(completed),
|
||
'total_clicks_today': self.total_clicks_today,
|
||
'error_count': self.error_count
|
||
}
|