Files
ai_mip/main.py

834 lines
32 KiB
Python
Raw Permalink Normal View History

2026-01-21 14:33:10 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
MIP广告点击自动化服务
这是一个完整的后台服务提供以下功能
1. 自动轮询点击数据库中的广告链接
2. 智能调度每个链接每天随机点击1-10
3. 间隔控制同一链接点击间隔30分钟
4. 时间窗口仅在09:00-21:00执行
5. 进程管理防重复启动优雅停止
6. 健康检查提供HTTP API监控服务状态
7. 日志管理自动分割持久化存储
使用方法
python main.py # 前台运行
python main.py --daemon # 后台运行Linux
健康检查
curl http://localhost:8888/health
"""
import argparse
import atexit
import os
import random
import signal
import sys
import threading
import time
2026-02-24 12:46:35 +08:00
from concurrent.futures import ThreadPoolExecutor, as_completed
2026-01-21 14:33:10 +08:00
from datetime import datetime
from pathlib import Path
from typing import Dict, List
import schedule
from flask import Flask, jsonify
from loguru import logger
from config import Config
from data_manager import DataManager
from task_executor import TaskExecutor
from baidu_crawler import BaiduSearchCrawler
# 配置日志
logger.remove()
# 控制台输出
logger.add(
sys.stdout,
format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <level>{message}</level>",
level="INFO"
)
# 文件输出
log_dir = Path("./logs")
log_dir.mkdir(exist_ok=True)
logger.add(
log_dir / "scheduler_{time:YYYY-MM-DD}.log",
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {message}",
level="INFO",
rotation="00:00",
retention="30 days",
encoding="utf-8"
)
class ClickScheduler:
"""
MIP广告点击服务
负责管理广告点击任务的调度和执行
提供完整的服务生命周期管理包括启动运行监控和停止
"""
# 进程锁文件
LOCK_FILE = Path("./scheduler.lock")
def __init__(self, max_workers: int = 1, use_proxy: bool = True, health_port: int = 8888):
"""
初始化调度器
Args:
max_workers: 最大并发数
use_proxy: 是否使用代理
health_port: 健康检查API端口
"""
self.max_workers = max_workers
self.use_proxy = use_proxy
self.health_port = health_port
self.dm = DataManager()
# 爬虫实例(如果启用)
self.crawler = BaiduSearchCrawler() if Config.CRAWLER_ENABLED else None
# 点击记录:{site_id: {'last_click': datetime, 'today_count': int, 'target_count': int}}
self.click_records = {}
# 工作时间配置
self.work_start_hour = 9 # 09:00
self.work_end_hour = 21 # 21:00
self.click_interval_minutes = 30 # 点击间陠30分钟
# 服务状态
self.running = False
self.start_time = None
self.last_cycle_time = None
self.total_clicks_today = 0
self.error_count = 0
2026-02-24 12:46:35 +08:00
# 线程安全锁(用于并发执行)
self._click_records_lock = threading.Lock()
self._stats_lock = threading.Lock()
2026-01-21 14:33:10 +08:00
# 健康检查API
self.health_app = Flask(__name__)
self.health_app.logger.disabled = True # 禁用Flask日志
self._setup_health_api()
logger.info(f"调度器初始化完成")
logger.info(f"工作时间: {self.work_start_hour:02d}:00 - {self.work_end_hour:02d}:00")
logger.info(f"点击间隔: {self.click_interval_minutes} 分钟")
2026-02-24 12:46:35 +08:00
logger.info(f"并发模式: {'启用' if max_workers > 1 else '禁用'} (最大并发数: {max_workers})")
2026-01-21 14:33:10 +08:00
def _setup_health_api(self):
2026-02-24 12:46:35 +08:00
"""配置健康检查API和调度器控制API"""
2026-01-21 14:33:10 +08:00
@self.health_app.route('/health', methods=['GET'])
def health_check():
"""健康检查端点"""
uptime = None
if self.start_time:
uptime = str(datetime.now() - self.start_time)
return jsonify({
'status': 'running' if self.running else 'stopped',
'uptime': uptime,
'start_time': self.start_time.isoformat() if self.start_time else None,
'last_cycle': self.last_cycle_time.isoformat() if self.last_cycle_time else None,
'total_sites': len(self.click_records),
'completed_sites': sum(1 for r in self.click_records.values() if r['today_count'] >= r['target_count']),
'total_clicks_today': sum(r['today_count'] for r in self.click_records.values()),
'target_clicks_today': sum(r['target_count'] for r in self.click_records.values()),
'error_count': self.error_count,
'work_hours': f"{self.work_start_hour:02d}:00-{self.work_end_hour:02d}:00",
'is_working_time': self.is_working_time()
})
2026-02-24 12:46:35 +08:00
@self.health_app.route('/scheduler/status', methods=['GET'])
def scheduler_status():
"""获取调度器状态供远程Web服务调用"""
return jsonify({
'success': True,
'data': {
'status': 'running' if self.running else 'stopped',
'is_working_time': self.is_working_time(),
'total_sites': len(self.click_records),
'completed_sites': sum(1 for r in self.click_records.values() if r['today_count'] >= r['target_count']),
'total_clicks_today': sum(r['today_count'] for r in self.click_records.values()),
'error_count': self.error_count
}
})
@self.health_app.route('/scheduler/start', methods=['POST'])
def scheduler_start():
"""启动调度器供远程Web服务调用"""
if self.running:
return jsonify({'success': True, 'message': '调度器已在运行中'})
# 重新初始化并启动
self.running = True
self.start_time = datetime.now()
self.reset_daily_records()
logger.info("调度器已通过远程API启动")
return jsonify({'success': True, 'message': '调度器已启动'})
@self.health_app.route('/scheduler/stop', methods=['POST'])
def scheduler_stop():
"""停止调度器供远程Web服务调用"""
if not self.running:
return jsonify({'success': True, 'message': '调度器未运行'})
self.running = False
logger.info("调度器已通过远程API停止")
return jsonify({'success': True, 'message': '调度器已停止'})
2026-01-21 14:33:10 +08:00
def _acquire_lock(self) -> bool:
"""
获取进程锁防止重复启动
Returns:
是否成功获取锁
"""
if self.LOCK_FILE.exists():
try:
# 检查锁文件中pid是否还在运行
with open(self.LOCK_FILE, 'r') as f:
old_pid = int(f.read().strip())
# 检查进程是否存在
try:
os.kill(old_pid, 0) # 不发送信号,只检查进程是否存在
logger.error(f"调度器已经在运行 (PID: {old_pid})")
return False
except OSError:
# 进程不存在,删除旧锁
logger.warning(f"检测到失效的锁文件,清理中...")
self.LOCK_FILE.unlink()
except Exception as e:
logger.warning(f"检查锁文件异常: {str(e)},删除旧锁")
self.LOCK_FILE.unlink()
# 创建新锁
try:
with open(self.LOCK_FILE, 'w') as f:
f.write(str(os.getpid()))
logger.info(f"获取进程锁成功 (PID: {os.getpid()})")
return True
except Exception as e:
logger.error(f"创建锁文件失败: {str(e)}")
return False
def _release_lock(self):
"""释放进程锁"""
try:
if self.LOCK_FILE.exists():
self.LOCK_FILE.unlink()
logger.info("已释放进程锁")
except Exception as e:
logger.error(f"释放锁文件失败: {str(e)}")
def _cleanup(self):
"""清理资源"""
logger.info("正在清理资源...")
self.running = False
self._release_lock()
logger.info("资源清理完成")
def is_working_time(self) -> bool:
"""
检查当前是否在工作时间
Returns:
是否在工作时间
"""
now = datetime.now()
current_hour = now.hour
return self.work_start_hour <= current_hour < self.work_end_hour
def reset_daily_records(self):
"""重置每日点击记录"""
logger.info("=" * 60)
logger.info("重置每日点击记录")
logger.info("=" * 60)
# 获取所有活跃站点
sites = self.dm.get_active_urls()
# 为每个站点随机生成今日目标点击次数(使用配置文件中的范围)
self.click_records = {}
for site in sites:
site_id = site.get('id')
target_count = random.randint(Config.MIN_CLICK_COUNT, Config.MAX_CLICK_COUNT)
self.click_records[site_id] = {
'last_click': None,
'today_count': 0,
'target_count': target_count,
2026-02-24 12:46:35 +08:00
'site_url': site.get('site_url'),
'click_count': site.get('click_count', 0) # 历史总点击次数
2026-01-21 14:33:10 +08:00
}
logger.info(f"站点 {site_id}: {site.get('site_url')} - 今日目标 {target_count}")
logger.info(f"{len(sites)} 个站点,总目标点击次数: {sum(r['target_count'] for r in self.click_records.values())}")
def get_pending_sites(self) -> List[Dict]:
"""
获取待点击的站点列表
Returns:
2026-02-24 12:46:35 +08:00
待点击的站点列表优先返回今日未点击的站点
2026-01-21 14:33:10 +08:00
"""
if not self.click_records:
logger.warning("点击记录为空,执行重置")
self.reset_daily_records()
2026-02-24 12:46:35 +08:00
# 动态检测新导入的站点
current_sites = self.dm.get_active_urls()
current_site_ids = {site.get('id') for site in current_sites}
existing_site_ids = set(self.click_records.keys())
# 发现新站点,自动添加到点击记录
new_site_ids = current_site_ids - existing_site_ids
if new_site_ids:
logger.info(f"发现 {len(new_site_ids)} 个新导入的站点,加入点击队列")
for site in current_sites:
site_id = site.get('id')
if site_id in new_site_ids:
target_count = random.randint(Config.MIN_CLICK_COUNT, Config.MAX_CLICK_COUNT)
self.click_records[site_id] = {
'last_click': None,
'today_count': 0,
'target_count': target_count,
'site_url': site.get('site_url'),
'click_count': site.get('click_count', 0) # 历史总点击次数
}
logger.info(f"新站点 {site_id}: {site.get('site_url')} - 今日目标 {target_count}")
# 移除已删除的站点
removed_site_ids = existing_site_ids - current_site_ids
for site_id in removed_site_ids:
del self.click_records[site_id]
logger.info(f"站点 {site_id} 已从数据库删除,移除出点击队列")
2026-01-21 14:33:10 +08:00
now = datetime.now()
pending_sites = []
for site_id, record in self.click_records.items():
# 检查是否已完成今日目标
if record['today_count'] >= record['target_count']:
continue
# 检查点击间隔≥30分钟
if record['last_click']:
elapsed = (now - record['last_click']).total_seconds() / 60
if elapsed < self.click_interval_minutes:
continue
pending_sites.append({
'id': site_id,
'site_url': record['site_url'],
'today_count': record['today_count'],
2026-02-24 12:46:35 +08:00
'target_count': record['target_count'],
'click_count': record.get('click_count', 0), # 历史总点击次数
'last_click': record['last_click']
2026-01-21 14:33:10 +08:00
})
2026-02-24 12:46:35 +08:00
# 排序优先级:
# 1. 历史从未点击的(click_count=0)最优先
# 2. 今日未点击的(today_count=0)次优先
# 3. 按上次点击时间升序(最久未点击的优先)
pending_sites.sort(key=lambda x: (
x['click_count'] > 0, # False(历史0次) 排在最前面
x['today_count'] > 0, # False(今日0次) 排在前面
x['last_click'] or datetime.min # 从未点击的排在前面
))
2026-01-21 14:33:10 +08:00
return pending_sites
def execute_click_task(self, site: Dict):
"""
执行单个站点的点击任务
Args:
site: 站点信息
"""
site_id = site['id']
site_url = site['site_url']
logger.info(f"[站点 {site_id}] 开始点击: {site_url} ({site['today_count'] + 1}/{site['target_count']})")
try:
# 创建任务执行器(每次创建新实例)
executor = TaskExecutor(
max_workers=1, # 单个任务使用单线程
use_proxy=self.use_proxy
)
# 直接执行单个站点任务
# 获取完整站点信息
all_sites = self.dm.get_active_urls()
target_site = next((s for s in all_sites if s.get('id') == site_id), None)
if not target_site:
logger.error(f"[站点 {site_id}] 未找到站点信息")
return
# 创建浏览器环境
profile_info = executor.create_browser_profile(1)
if not profile_info:
logger.error(f"[站点 {site_id}] 创建浏览器环境失败")
return
time.sleep(2)
# 执行点击任务
result = executor.execute_single_task(target_site, 1, profile_info['profile_id'])
if result['success']:
# 更新点击记录
self.click_records[site_id]['last_click'] = datetime.now()
self.click_records[site_id]['today_count'] += 1
self.total_clicks_today += 1
logger.info(f"[站点 {site_id}] ✅ 点击完成: {self.click_records[site_id]['today_count']}/{self.click_records[site_id]['target_count']}")
else:
self.error_count += 1
logger.warning(f"[站点 {site_id}] ⚠️ 点击失败: {result.get('error', '未知错误')}")
except Exception as e:
self.error_count += 1
logger.error(f"[站点 {site_id}] ❌ 点击异常: {str(e)}")
import traceback
traceback.print_exc()
def run_click_cycle(self):
"""执行一次点击循环"""
# 检查工作时间
if not self.is_working_time():
current_time = datetime.now().strftime('%H:%M')
logger.info(f"当前时间 {current_time} 不在工作时间内,跳过")
return
self.last_cycle_time = datetime.now()
logger.info("-" * 60)
logger.info(f"开始点击循环 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info("-" * 60)
# 获取待点击站点
pending_sites = self.get_pending_sites()
if not pending_sites:
logger.info("没有待点击的站点")
return
logger.info(f"找到 {len(pending_sites)} 个待点击站点")
2026-02-24 12:46:35 +08:00
# 分组:历史从未点击的 vs 历史点击过的
never_clicked = [s for s in pending_sites if s.get('click_count', 0) == 0]
has_clicked = [s for s in pending_sites if s.get('click_count', 0) > 0]
# 各组内随机打乱,但保持"历史从未点击优先"的整体顺序
random.shuffle(never_clicked)
random.shuffle(has_clicked)
pending_sites = never_clicked + has_clicked
if never_clicked:
logger.info(f"其中 {len(never_clicked)} 个站点历史从未点击(最优先处理)")
2026-01-21 14:33:10 +08:00
# 根据并发数执行
if self.max_workers == 1:
# 串行执行
for site in pending_sites:
2026-02-24 12:46:35 +08:00
# 每次执行前检查工作时间
if not self.is_working_time():
current_time = datetime.now().strftime('%H:%M')
logger.info(f"当前时间 {current_time} 已超出工作时间,停止执行剩余任务")
break
2026-01-21 14:33:10 +08:00
self.execute_click_task(site)
# 任务间随机间隔(使用配置文件中的范围)
if site != pending_sites[-1]:
wait_minutes = random.randint(Config.MIN_TASK_INTERVAL_MINUTES, Config.MAX_TASK_INTERVAL_MINUTES)
logger.info(f"等待 {wait_minutes} 分钟后执行下一个任务...")
time.sleep(wait_minutes * 60)
else:
2026-02-24 12:46:35 +08:00
# 并发执行
self._run_concurrent_cycle(pending_sites)
2026-01-21 14:33:10 +08:00
# 显示今日进度
completed = sum(1 for r in self.click_records.values() if r['today_count'] >= r['target_count'])
total = len(self.click_records)
total_clicks = sum(r['today_count'] for r in self.click_records.values())
target_clicks = sum(r['target_count'] for r in self.click_records.values())
logger.info("-" * 60)
logger.info(f"今日进度: {completed}/{total} 个站点完成")
logger.info(f"点击次数: {total_clicks}/{target_clicks}")
logger.info("-" * 60)
2026-02-24 12:46:35 +08:00
def _run_concurrent_cycle(self, pending_sites: List[Dict]):
"""
并发执行点击任务
Args:
pending_sites: 待点击的站点列表
"""
# 按 max_workers 分批
batch_size = self.max_workers
batches = [pending_sites[i:i+batch_size] for i in range(0, len(pending_sites), batch_size)]
logger.info(f"并发模式: 共 {len(pending_sites)} 个任务,分 {len(batches)} 批执行(每批最多 {batch_size} 个)")
for batch_idx, batch in enumerate(batches):
# 检查工作时间
if not self.is_working_time():
current_time = datetime.now().strftime('%H:%M')
logger.info(f"当前时间 {current_time} 已超出工作时间,停止执行剩余批次")
break
logger.info(f"=" * 40)
logger.info(f"开始批次 {batch_idx+1}/{len(batches)}: {len(batch)} 个任务并发执行")
for i, site in enumerate(batch, 1):
logger.info(f" - [Worker {i}] Site {site['id']}: {site['site_url'][:50]}...")
batch_start_time = time.time()
success_count = 0
fail_count = 0
# 使用 ThreadPoolExecutor 并发执行
with ThreadPoolExecutor(max_workers=len(batch)) as executor:
futures = {
executor.submit(self._execute_click_task_wrapper, site, worker_id): site
for worker_id, site in enumerate(batch, 1)
}
# 等待所有任务完成
for future in as_completed(futures):
site = futures[future]
try:
result = future.result()
if result.get('success'):
success_count += 1
else:
fail_count += 1
except Exception as e:
fail_count += 1
logger.error(f"任务异常: Site {site['id']} - {e}")
batch_duration = (time.time() - batch_start_time) / 60
logger.info(f"批次 {batch_idx+1} 完成: 成功 {success_count}, 失败 {fail_count}, 耗时 {batch_duration:.1f} 分钟")
# 批次间随机间隔(不是最后一批)
if batch_idx < len(batches) - 1:
# 再次检查工作时间
if not self.is_working_time():
current_time = datetime.now().strftime('%H:%M')
logger.info(f"当前时间 {current_time} 已超出工作时间,停止执行")
break
wait_minutes = random.randint(Config.MIN_TASK_INTERVAL_MINUTES, Config.MAX_TASK_INTERVAL_MINUTES)
logger.info(f"等待 {wait_minutes} 分钟后执行下一批次...")
time.sleep(wait_minutes * 60)
def _execute_click_task_wrapper(self, site: Dict, worker_id: int) -> Dict:
"""
线程安全的任务执行包装器
Args:
site: 站点信息
worker_id: Worker编号用于日志标识
Returns:
执行结果字典
"""
site_id = site['id']
site_url = site['site_url']
# 错峰启动:每个 worker 间隔 5-10 秒,避免同时调用 AdsPower API 触发限频
if worker_id > 1:
stagger_delay = (worker_id - 1) * random.randint(5, 10)
logger.info(f"[Worker {worker_id}] [Site {site_id}] 错峰等待 {stagger_delay} 秒后启动...")
time.sleep(stagger_delay)
logger.info(f"[Worker {worker_id}] [Site {site_id}] 开始点击: {site_url[:50]}...")
executor = None
try:
# 创建独立的 TaskExecutor 实例
executor = TaskExecutor(max_workers=1, use_proxy=self.use_proxy)
# 创建浏览器环境(每个 worker 独立的 Profile 和代理)
profile_info = executor.create_browser_profile(worker_id)
if not profile_info:
logger.error(f"[Worker {worker_id}] [Site {site_id}] 创建浏览器环境失败")
with self._stats_lock:
self.error_count += 1
return {'success': False, 'error': '创建浏览器环境失败'}
time.sleep(2)
# 获取完整站点信息
all_sites = self.dm.get_active_urls()
target_site = next((s for s in all_sites if s.get('id') == site_id), None)
if not target_site:
logger.error(f"[Worker {worker_id}] [Site {site_id}] 未找到站点信息")
return {'success': False, 'error': '未找到站点信息'}
# 执行任务
result = executor.execute_single_task(target_site, worker_id, profile_info['profile_id'])
if result['success']:
# 线程安全更新点击记录
with self._click_records_lock:
if site_id in self.click_records:
self.click_records[site_id]['last_click'] = datetime.now()
self.click_records[site_id]['today_count'] += 1
with self._stats_lock:
self.total_clicks_today += 1
logger.info(f"[Worker {worker_id}] [Site {site_id}] ✅ 点击完成")
else:
with self._stats_lock:
self.error_count += 1
logger.warning(f"[Worker {worker_id}] [Site {site_id}] ⚠️ 点击失败: {result.get('error', '未知错误')}")
return result
except Exception as e:
with self._stats_lock:
self.error_count += 1
logger.error(f"[Worker {worker_id}] [Site {site_id}] ❌ 异常: {str(e)}")
import traceback
traceback.print_exc()
return {'success': False, 'error': str(e)}
finally:
# 确保资源清理
if executor:
try:
executor.close_browser()
except Exception as e:
logger.warning(f"[Worker {worker_id}] 清理资源失败: {e}")
2026-01-21 14:33:10 +08:00
def run_crawler_cycle(self):
"""执行一次爬虫循环"""
if not self.crawler:
logger.warning("爬虫未启用,跳过")
return
logger.info("=" * 60)
logger.info(f"开始网址爬取 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info("=" * 60)
try:
# 执行爬虫任务
result = self.crawler.crawl_tasks(limit=Config.CRAWLER_BATCH_SIZE)
logger.info("-" * 60)
logger.info(f"爬取完成: 总任务={result['total_tasks']}, 成功={result['success_count']}, 失败={result['failed_count']}")
logger.info(f"新增网址: {result['total_new_urls']}")
logger.info("-" * 60)
except Exception as e:
logger.error(f"爬虫执行失败: {str(e)}")
import traceback
traceback.print_exc()
2026-02-24 12:46:35 +08:00
def run_query_import_scan(self):
"""扫描Query上传目录导入待处理文件"""
try:
from query_keyword_importer import QueryKeywordImporter
importer = QueryKeywordImporter()
importer.scan_and_import()
except Exception as e:
logger.error(f"Query导入扫描异常: {e}")
2026-01-21 14:33:10 +08:00
def start(self):
"""启动调度器"""
# 获取进程锁
if not self._acquire_lock():
logger.error("无法启动,请检查是否已有实例在运行")
sys.exit(1)
# 注册清理函数
atexit.register(self._cleanup)
# 注册信号处理(优雅停止)
def signal_handler(signum, frame):
logger.info(f"\n收到信号 {signum},正在优雅停止...")
self._cleanup()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # kill
logger.info("=" * 60)
logger.info("MIP广告点击调度器启动")
logger.info("=" * 60)
logger.info(f"当前环境: {Config.ENV}")
logger.info(f"当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"PID: {os.getpid()}")
logger.info("")
self.running = True
self.start_time = datetime.now()
# 启动健康检查API后台线程
health_thread = threading.Thread(
target=lambda: self.health_app.run(host='0.0.0.0', port=self.health_port, debug=False, use_reloader=False),
daemon=True
)
health_thread.start()
logger.info(f"健康检查API已启动: http://0.0.0.0:{self.health_port}/health")
logger.info("")
# 初始化每日记录
self.reset_daily_records()
# 配置定时任务
# 1. 每天00:01重置点击记录
schedule.every().day.at("00:01").do(self.reset_daily_records)
# 2. 每10分钟执行一次点击循环仅在工作时间内实际执行
schedule.every(10).minutes.do(self.run_click_cycle)
logger.info("定时任务已配置:")
logger.info(" - 每天 00:01 重置点击记录")
logger.info(" - 每 10 分钟执行点击循环09:00-21:00")
# 3. 爬虫定时任务(如果启用)
if Config.CRAWLER_ENABLED and self.crawler:
schedule.every().day.at(Config.CRAWLER_SCHEDULE_TIME).do(self.run_crawler_cycle)
logger.info(f" - 每天 {Config.CRAWLER_SCHEDULE_TIME} 执行网址爬取(批量: {Config.CRAWLER_BATCH_SIZE}")
else:
logger.info(" - 网址爬取未启用")
2026-02-24 12:46:35 +08:00
# 4. Query挖掘目录扫描每15分钟
schedule.every(15).minutes.do(self.run_query_import_scan)
logger.info(" - 每 15 分钟扫描Query上传目录")
2026-01-21 14:33:10 +08:00
logger.info("")
# 立即执行一次(如果在工作时间内)
if self.is_working_time():
logger.info("立即执行首次点击循环...")
self.run_click_cycle()
else:
logger.info(f"当前不在工作时间,等待下次调度...")
# 进入调度循环
logger.info("\n调度器运行中,按 Ctrl+C 优雅停止...\n")
try:
while self.running:
schedule.run_pending()
time.sleep(30) # 每30秒检查一次
except KeyboardInterrupt:
logger.info("\n收到中断信号")
finally:
self._cleanup()
def parse_args():
"""解析命令行参数"""
parser = argparse.ArgumentParser(
description='MIP广告点击自动化服务',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
使用示例
python main.py # 前台运行
python main.py --workers 3 # 3个并发
python main.py --no-proxy # 不使用代理
python main.py --health-port 9999 # 自定义健康检查端口
健康检查
curl http://localhost:8888/health
'''
)
parser.add_argument(
'--workers',
type=int,
default=1,
help='最大并发数(默认: 1建议使用以避免资源冲突'
)
parser.add_argument(
'--no-proxy',
action='store_true',
help='禁用代理(默认启用)'
)
parser.add_argument(
'--health-port',
type=int,
default=8888,
help='健康检查API端口默认: 8888'
)
parser.add_argument(
'--work-start',
type=int,
default=9,
help='工作开始时间(小时,默认: 9'
)
parser.add_argument(
'--work-end',
type=int,
default=21,
help='工作结束时间(小时,默认: 21'
)
parser.add_argument(
'--version',
action='version',
version='MIP Ad Click Service v1.0.0'
)
return parser.parse_args()
def main():
"""主入口函数"""
# 解析命令行参数
args = parse_args()
# 显示启动信息
logger.info("=" * 70)
logger.info(" __ __ ___ ____ _ _ ____ _ _ _ ")
logger.info(" | \\/ |_ _| _ \\ / \\ __| | / ___| (_) ___| | __")
logger.info(" | |\\/| || || |_) | / _ \\ / _` | | | | | |/ __| |/ /")
logger.info(" | | | || || __/ / ___ \\ (_| | | |___| | | (__| < ")
logger.info(" |_| |_|___|_| /_/ \\_\\__,_| \\____|_|_|\\___|_|\\_\\")
logger.info("")
logger.info(" 广告点击自动化服务 v1.0.0")
logger.info("=" * 70)
logger.info("")
# 创建服务实例
service = ClickScheduler(
max_workers=args.workers,
use_proxy=not args.no_proxy,
health_port=args.health_port
)
# 设置工作时间
service.work_start_hour = args.work_start
service.work_end_hour = args.work_end
# 启动服务
try:
service.start()
except Exception as e:
logger.error(f"服务启动失败: {str(e)}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()