Files
ai_mip/main.py
2026-02-24 12:46:35 +08:00

834 lines
32 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
MIP广告点击自动化服务
这是一个完整的后台服务,提供以下功能:
1. 自动轮询点击数据库中的广告链接
2. 智能调度每个链接每天随机点击1-10次
3. 间隔控制同一链接点击间隔≥30分钟
4. 时间窗口仅在09:00-21:00执行
5. 进程管理:防重复启动、优雅停止
6. 健康检查提供HTTP API监控服务状态
7. 日志管理:自动分割、持久化存储
使用方法:
python main.py # 前台运行
python main.py --daemon # 后台运行Linux
健康检查:
curl http://localhost:8888/health
"""
import argparse
import atexit
import os
import random
import signal
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
from typing import Dict, List
import schedule
from flask import Flask, jsonify
from loguru import logger
from config import Config
from data_manager import DataManager
from task_executor import TaskExecutor
from baidu_crawler import BaiduSearchCrawler
# 配置日志
logger.remove()
# 控制台输出
logger.add(
sys.stdout,
format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <level>{message}</level>",
level="INFO"
)
# 文件输出
log_dir = Path("./logs")
log_dir.mkdir(exist_ok=True)
logger.add(
log_dir / "scheduler_{time:YYYY-MM-DD}.log",
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {message}",
level="INFO",
rotation="00:00",
retention="30 days",
encoding="utf-8"
)
class ClickScheduler:
"""
MIP广告点击服务
负责管理广告点击任务的调度和执行。
提供完整的服务生命周期管理,包括启动、运行、监控和停止。
"""
# 进程锁文件
LOCK_FILE = Path("./scheduler.lock")
def __init__(self, max_workers: int = 1, use_proxy: bool = True, health_port: int = 8888):
"""
初始化调度器
Args:
max_workers: 最大并发数
use_proxy: 是否使用代理
health_port: 健康检查API端口
"""
self.max_workers = max_workers
self.use_proxy = use_proxy
self.health_port = health_port
self.dm = DataManager()
# 爬虫实例(如果启用)
self.crawler = BaiduSearchCrawler() if Config.CRAWLER_ENABLED else None
# 点击记录:{site_id: {'last_click': datetime, 'today_count': int, 'target_count': int}}
self.click_records = {}
# 工作时间配置
self.work_start_hour = 9 # 09:00
self.work_end_hour = 21 # 21:00
self.click_interval_minutes = 30 # 点击间陠30分钟
# 服务状态
self.running = False
self.start_time = None
self.last_cycle_time = None
self.total_clicks_today = 0
self.error_count = 0
# 线程安全锁(用于并发执行)
self._click_records_lock = threading.Lock()
self._stats_lock = threading.Lock()
# 健康检查API
self.health_app = Flask(__name__)
self.health_app.logger.disabled = True # 禁用Flask日志
self._setup_health_api()
logger.info(f"调度器初始化完成")
logger.info(f"工作时间: {self.work_start_hour:02d}:00 - {self.work_end_hour:02d}:00")
logger.info(f"点击间隔: {self.click_interval_minutes} 分钟")
logger.info(f"并发模式: {'启用' if max_workers > 1 else '禁用'} (最大并发数: {max_workers})")
def _setup_health_api(self):
"""配置健康检查API和调度器控制API"""
@self.health_app.route('/health', methods=['GET'])
def health_check():
"""健康检查端点"""
uptime = None
if self.start_time:
uptime = str(datetime.now() - self.start_time)
return jsonify({
'status': 'running' if self.running else 'stopped',
'uptime': uptime,
'start_time': self.start_time.isoformat() if self.start_time else None,
'last_cycle': self.last_cycle_time.isoformat() if self.last_cycle_time else None,
'total_sites': len(self.click_records),
'completed_sites': sum(1 for r in self.click_records.values() if r['today_count'] >= r['target_count']),
'total_clicks_today': sum(r['today_count'] for r in self.click_records.values()),
'target_clicks_today': sum(r['target_count'] for r in self.click_records.values()),
'error_count': self.error_count,
'work_hours': f"{self.work_start_hour:02d}:00-{self.work_end_hour:02d}:00",
'is_working_time': self.is_working_time()
})
@self.health_app.route('/scheduler/status', methods=['GET'])
def scheduler_status():
"""获取调度器状态供远程Web服务调用"""
return jsonify({
'success': True,
'data': {
'status': 'running' if self.running else 'stopped',
'is_working_time': self.is_working_time(),
'total_sites': len(self.click_records),
'completed_sites': sum(1 for r in self.click_records.values() if r['today_count'] >= r['target_count']),
'total_clicks_today': sum(r['today_count'] for r in self.click_records.values()),
'error_count': self.error_count
}
})
@self.health_app.route('/scheduler/start', methods=['POST'])
def scheduler_start():
"""启动调度器供远程Web服务调用"""
if self.running:
return jsonify({'success': True, 'message': '调度器已在运行中'})
# 重新初始化并启动
self.running = True
self.start_time = datetime.now()
self.reset_daily_records()
logger.info("调度器已通过远程API启动")
return jsonify({'success': True, 'message': '调度器已启动'})
@self.health_app.route('/scheduler/stop', methods=['POST'])
def scheduler_stop():
"""停止调度器供远程Web服务调用"""
if not self.running:
return jsonify({'success': True, 'message': '调度器未运行'})
self.running = False
logger.info("调度器已通过远程API停止")
return jsonify({'success': True, 'message': '调度器已停止'})
def _acquire_lock(self) -> bool:
"""
获取进程锁,防止重复启动
Returns:
是否成功获取锁
"""
if self.LOCK_FILE.exists():
try:
# 检查锁文件中pid是否还在运行
with open(self.LOCK_FILE, 'r') as f:
old_pid = int(f.read().strip())
# 检查进程是否存在
try:
os.kill(old_pid, 0) # 不发送信号,只检查进程是否存在
logger.error(f"调度器已经在运行 (PID: {old_pid})")
return False
except OSError:
# 进程不存在,删除旧锁
logger.warning(f"检测到失效的锁文件,清理中...")
self.LOCK_FILE.unlink()
except Exception as e:
logger.warning(f"检查锁文件异常: {str(e)},删除旧锁")
self.LOCK_FILE.unlink()
# 创建新锁
try:
with open(self.LOCK_FILE, 'w') as f:
f.write(str(os.getpid()))
logger.info(f"获取进程锁成功 (PID: {os.getpid()})")
return True
except Exception as e:
logger.error(f"创建锁文件失败: {str(e)}")
return False
def _release_lock(self):
"""释放进程锁"""
try:
if self.LOCK_FILE.exists():
self.LOCK_FILE.unlink()
logger.info("已释放进程锁")
except Exception as e:
logger.error(f"释放锁文件失败: {str(e)}")
def _cleanup(self):
"""清理资源"""
logger.info("正在清理资源...")
self.running = False
self._release_lock()
logger.info("资源清理完成")
def is_working_time(self) -> bool:
"""
检查当前是否在工作时间
Returns:
是否在工作时间
"""
now = datetime.now()
current_hour = now.hour
return self.work_start_hour <= current_hour < self.work_end_hour
def reset_daily_records(self):
"""重置每日点击记录"""
logger.info("=" * 60)
logger.info("重置每日点击记录")
logger.info("=" * 60)
# 获取所有活跃站点
sites = self.dm.get_active_urls()
# 为每个站点随机生成今日目标点击次数(使用配置文件中的范围)
self.click_records = {}
for site in sites:
site_id = site.get('id')
target_count = random.randint(Config.MIN_CLICK_COUNT, Config.MAX_CLICK_COUNT)
self.click_records[site_id] = {
'last_click': None,
'today_count': 0,
'target_count': target_count,
'site_url': site.get('site_url'),
'click_count': site.get('click_count', 0) # 历史总点击次数
}
logger.info(f"站点 {site_id}: {site.get('site_url')} - 今日目标 {target_count}")
logger.info(f"{len(sites)} 个站点,总目标点击次数: {sum(r['target_count'] for r in self.click_records.values())}")
def get_pending_sites(self) -> List[Dict]:
"""
获取待点击的站点列表
Returns:
待点击的站点列表(优先返回今日未点击的站点)
"""
if not self.click_records:
logger.warning("点击记录为空,执行重置")
self.reset_daily_records()
# 动态检测新导入的站点
current_sites = self.dm.get_active_urls()
current_site_ids = {site.get('id') for site in current_sites}
existing_site_ids = set(self.click_records.keys())
# 发现新站点,自动添加到点击记录
new_site_ids = current_site_ids - existing_site_ids
if new_site_ids:
logger.info(f"发现 {len(new_site_ids)} 个新导入的站点,加入点击队列")
for site in current_sites:
site_id = site.get('id')
if site_id in new_site_ids:
target_count = random.randint(Config.MIN_CLICK_COUNT, Config.MAX_CLICK_COUNT)
self.click_records[site_id] = {
'last_click': None,
'today_count': 0,
'target_count': target_count,
'site_url': site.get('site_url'),
'click_count': site.get('click_count', 0) # 历史总点击次数
}
logger.info(f"新站点 {site_id}: {site.get('site_url')} - 今日目标 {target_count}")
# 移除已删除的站点
removed_site_ids = existing_site_ids - current_site_ids
for site_id in removed_site_ids:
del self.click_records[site_id]
logger.info(f"站点 {site_id} 已从数据库删除,移除出点击队列")
now = datetime.now()
pending_sites = []
for site_id, record in self.click_records.items():
# 检查是否已完成今日目标
if record['today_count'] >= record['target_count']:
continue
# 检查点击间隔≥30分钟
if record['last_click']:
elapsed = (now - record['last_click']).total_seconds() / 60
if elapsed < self.click_interval_minutes:
continue
pending_sites.append({
'id': site_id,
'site_url': record['site_url'],
'today_count': record['today_count'],
'target_count': record['target_count'],
'click_count': record.get('click_count', 0), # 历史总点击次数
'last_click': record['last_click']
})
# 排序优先级:
# 1. 历史从未点击的(click_count=0)最优先
# 2. 今日未点击的(today_count=0)次优先
# 3. 按上次点击时间升序(最久未点击的优先)
pending_sites.sort(key=lambda x: (
x['click_count'] > 0, # False(历史0次) 排在最前面
x['today_count'] > 0, # False(今日0次) 排在前面
x['last_click'] or datetime.min # 从未点击的排在前面
))
return pending_sites
def execute_click_task(self, site: Dict):
"""
执行单个站点的点击任务
Args:
site: 站点信息
"""
site_id = site['id']
site_url = site['site_url']
logger.info(f"[站点 {site_id}] 开始点击: {site_url} ({site['today_count'] + 1}/{site['target_count']})")
try:
# 创建任务执行器(每次创建新实例)
executor = TaskExecutor(
max_workers=1, # 单个任务使用单线程
use_proxy=self.use_proxy
)
# 直接执行单个站点任务
# 获取完整站点信息
all_sites = self.dm.get_active_urls()
target_site = next((s for s in all_sites if s.get('id') == site_id), None)
if not target_site:
logger.error(f"[站点 {site_id}] 未找到站点信息")
return
# 创建浏览器环境
profile_info = executor.create_browser_profile(1)
if not profile_info:
logger.error(f"[站点 {site_id}] 创建浏览器环境失败")
return
time.sleep(2)
# 执行点击任务
result = executor.execute_single_task(target_site, 1, profile_info['profile_id'])
if result['success']:
# 更新点击记录
self.click_records[site_id]['last_click'] = datetime.now()
self.click_records[site_id]['today_count'] += 1
self.total_clicks_today += 1
logger.info(f"[站点 {site_id}] ✅ 点击完成: {self.click_records[site_id]['today_count']}/{self.click_records[site_id]['target_count']}")
else:
self.error_count += 1
logger.warning(f"[站点 {site_id}] ⚠️ 点击失败: {result.get('error', '未知错误')}")
except Exception as e:
self.error_count += 1
logger.error(f"[站点 {site_id}] ❌ 点击异常: {str(e)}")
import traceback
traceback.print_exc()
def run_click_cycle(self):
"""执行一次点击循环"""
# 检查工作时间
if not self.is_working_time():
current_time = datetime.now().strftime('%H:%M')
logger.info(f"当前时间 {current_time} 不在工作时间内,跳过")
return
self.last_cycle_time = datetime.now()
logger.info("-" * 60)
logger.info(f"开始点击循环 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info("-" * 60)
# 获取待点击站点
pending_sites = self.get_pending_sites()
if not pending_sites:
logger.info("没有待点击的站点")
return
logger.info(f"找到 {len(pending_sites)} 个待点击站点")
# 分组:历史从未点击的 vs 历史点击过的
never_clicked = [s for s in pending_sites if s.get('click_count', 0) == 0]
has_clicked = [s for s in pending_sites if s.get('click_count', 0) > 0]
# 各组内随机打乱,但保持"历史从未点击优先"的整体顺序
random.shuffle(never_clicked)
random.shuffle(has_clicked)
pending_sites = never_clicked + has_clicked
if never_clicked:
logger.info(f"其中 {len(never_clicked)} 个站点历史从未点击(最优先处理)")
# 根据并发数执行
if self.max_workers == 1:
# 串行执行
for site in pending_sites:
# 每次执行前检查工作时间
if not self.is_working_time():
current_time = datetime.now().strftime('%H:%M')
logger.info(f"当前时间 {current_time} 已超出工作时间,停止执行剩余任务")
break
self.execute_click_task(site)
# 任务间随机间隔(使用配置文件中的范围)
if site != pending_sites[-1]:
wait_minutes = random.randint(Config.MIN_TASK_INTERVAL_MINUTES, Config.MAX_TASK_INTERVAL_MINUTES)
logger.info(f"等待 {wait_minutes} 分钟后执行下一个任务...")
time.sleep(wait_minutes * 60)
else:
# 并发执行
self._run_concurrent_cycle(pending_sites)
# 显示今日进度
completed = sum(1 for r in self.click_records.values() if r['today_count'] >= r['target_count'])
total = len(self.click_records)
total_clicks = sum(r['today_count'] for r in self.click_records.values())
target_clicks = sum(r['target_count'] for r in self.click_records.values())
logger.info("-" * 60)
logger.info(f"今日进度: {completed}/{total} 个站点完成")
logger.info(f"点击次数: {total_clicks}/{target_clicks}")
logger.info("-" * 60)
def _run_concurrent_cycle(self, pending_sites: List[Dict]):
"""
并发执行点击任务
Args:
pending_sites: 待点击的站点列表
"""
# 按 max_workers 分批
batch_size = self.max_workers
batches = [pending_sites[i:i+batch_size] for i in range(0, len(pending_sites), batch_size)]
logger.info(f"并发模式: 共 {len(pending_sites)} 个任务,分 {len(batches)} 批执行(每批最多 {batch_size} 个)")
for batch_idx, batch in enumerate(batches):
# 检查工作时间
if not self.is_working_time():
current_time = datetime.now().strftime('%H:%M')
logger.info(f"当前时间 {current_time} 已超出工作时间,停止执行剩余批次")
break
logger.info(f"=" * 40)
logger.info(f"开始批次 {batch_idx+1}/{len(batches)}: {len(batch)} 个任务并发执行")
for i, site in enumerate(batch, 1):
logger.info(f" - [Worker {i}] Site {site['id']}: {site['site_url'][:50]}...")
batch_start_time = time.time()
success_count = 0
fail_count = 0
# 使用 ThreadPoolExecutor 并发执行
with ThreadPoolExecutor(max_workers=len(batch)) as executor:
futures = {
executor.submit(self._execute_click_task_wrapper, site, worker_id): site
for worker_id, site in enumerate(batch, 1)
}
# 等待所有任务完成
for future in as_completed(futures):
site = futures[future]
try:
result = future.result()
if result.get('success'):
success_count += 1
else:
fail_count += 1
except Exception as e:
fail_count += 1
logger.error(f"任务异常: Site {site['id']} - {e}")
batch_duration = (time.time() - batch_start_time) / 60
logger.info(f"批次 {batch_idx+1} 完成: 成功 {success_count}, 失败 {fail_count}, 耗时 {batch_duration:.1f} 分钟")
# 批次间随机间隔(不是最后一批)
if batch_idx < len(batches) - 1:
# 再次检查工作时间
if not self.is_working_time():
current_time = datetime.now().strftime('%H:%M')
logger.info(f"当前时间 {current_time} 已超出工作时间,停止执行")
break
wait_minutes = random.randint(Config.MIN_TASK_INTERVAL_MINUTES, Config.MAX_TASK_INTERVAL_MINUTES)
logger.info(f"等待 {wait_minutes} 分钟后执行下一批次...")
time.sleep(wait_minutes * 60)
def _execute_click_task_wrapper(self, site: Dict, worker_id: int) -> Dict:
"""
线程安全的任务执行包装器
Args:
site: 站点信息
worker_id: Worker编号用于日志标识
Returns:
执行结果字典
"""
site_id = site['id']
site_url = site['site_url']
# 错峰启动:每个 worker 间隔 5-10 秒,避免同时调用 AdsPower API 触发限频
if worker_id > 1:
stagger_delay = (worker_id - 1) * random.randint(5, 10)
logger.info(f"[Worker {worker_id}] [Site {site_id}] 错峰等待 {stagger_delay} 秒后启动...")
time.sleep(stagger_delay)
logger.info(f"[Worker {worker_id}] [Site {site_id}] 开始点击: {site_url[:50]}...")
executor = None
try:
# 创建独立的 TaskExecutor 实例
executor = TaskExecutor(max_workers=1, use_proxy=self.use_proxy)
# 创建浏览器环境(每个 worker 独立的 Profile 和代理)
profile_info = executor.create_browser_profile(worker_id)
if not profile_info:
logger.error(f"[Worker {worker_id}] [Site {site_id}] 创建浏览器环境失败")
with self._stats_lock:
self.error_count += 1
return {'success': False, 'error': '创建浏览器环境失败'}
time.sleep(2)
# 获取完整站点信息
all_sites = self.dm.get_active_urls()
target_site = next((s for s in all_sites if s.get('id') == site_id), None)
if not target_site:
logger.error(f"[Worker {worker_id}] [Site {site_id}] 未找到站点信息")
return {'success': False, 'error': '未找到站点信息'}
# 执行任务
result = executor.execute_single_task(target_site, worker_id, profile_info['profile_id'])
if result['success']:
# 线程安全更新点击记录
with self._click_records_lock:
if site_id in self.click_records:
self.click_records[site_id]['last_click'] = datetime.now()
self.click_records[site_id]['today_count'] += 1
with self._stats_lock:
self.total_clicks_today += 1
logger.info(f"[Worker {worker_id}] [Site {site_id}] ✅ 点击完成")
else:
with self._stats_lock:
self.error_count += 1
logger.warning(f"[Worker {worker_id}] [Site {site_id}] ⚠️ 点击失败: {result.get('error', '未知错误')}")
return result
except Exception as e:
with self._stats_lock:
self.error_count += 1
logger.error(f"[Worker {worker_id}] [Site {site_id}] ❌ 异常: {str(e)}")
import traceback
traceback.print_exc()
return {'success': False, 'error': str(e)}
finally:
# 确保资源清理
if executor:
try:
executor.close_browser()
except Exception as e:
logger.warning(f"[Worker {worker_id}] 清理资源失败: {e}")
def run_crawler_cycle(self):
"""执行一次爬虫循环"""
if not self.crawler:
logger.warning("爬虫未启用,跳过")
return
logger.info("=" * 60)
logger.info(f"开始网址爬取 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info("=" * 60)
try:
# 执行爬虫任务
result = self.crawler.crawl_tasks(limit=Config.CRAWLER_BATCH_SIZE)
logger.info("-" * 60)
logger.info(f"爬取完成: 总任务={result['total_tasks']}, 成功={result['success_count']}, 失败={result['failed_count']}")
logger.info(f"新增网址: {result['total_new_urls']}")
logger.info("-" * 60)
except Exception as e:
logger.error(f"爬虫执行失败: {str(e)}")
import traceback
traceback.print_exc()
def run_query_import_scan(self):
"""扫描Query上传目录导入待处理文件"""
try:
from query_keyword_importer import QueryKeywordImporter
importer = QueryKeywordImporter()
importer.scan_and_import()
except Exception as e:
logger.error(f"Query导入扫描异常: {e}")
def start(self):
"""启动调度器"""
# 获取进程锁
if not self._acquire_lock():
logger.error("无法启动,请检查是否已有实例在运行")
sys.exit(1)
# 注册清理函数
atexit.register(self._cleanup)
# 注册信号处理(优雅停止)
def signal_handler(signum, frame):
logger.info(f"\n收到信号 {signum},正在优雅停止...")
self._cleanup()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # kill
logger.info("=" * 60)
logger.info("MIP广告点击调度器启动")
logger.info("=" * 60)
logger.info(f"当前环境: {Config.ENV}")
logger.info(f"当前时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"PID: {os.getpid()}")
logger.info("")
self.running = True
self.start_time = datetime.now()
# 启动健康检查API后台线程
health_thread = threading.Thread(
target=lambda: self.health_app.run(host='0.0.0.0', port=self.health_port, debug=False, use_reloader=False),
daemon=True
)
health_thread.start()
logger.info(f"健康检查API已启动: http://0.0.0.0:{self.health_port}/health")
logger.info("")
# 初始化每日记录
self.reset_daily_records()
# 配置定时任务
# 1. 每天00:01重置点击记录
schedule.every().day.at("00:01").do(self.reset_daily_records)
# 2. 每10分钟执行一次点击循环仅在工作时间内实际执行
schedule.every(10).minutes.do(self.run_click_cycle)
logger.info("定时任务已配置:")
logger.info(" - 每天 00:01 重置点击记录")
logger.info(" - 每 10 分钟执行点击循环09:00-21:00")
# 3. 爬虫定时任务(如果启用)
if Config.CRAWLER_ENABLED and self.crawler:
schedule.every().day.at(Config.CRAWLER_SCHEDULE_TIME).do(self.run_crawler_cycle)
logger.info(f" - 每天 {Config.CRAWLER_SCHEDULE_TIME} 执行网址爬取(批量: {Config.CRAWLER_BATCH_SIZE}")
else:
logger.info(" - 网址爬取未启用")
# 4. Query挖掘目录扫描每15分钟
schedule.every(15).minutes.do(self.run_query_import_scan)
logger.info(" - 每 15 分钟扫描Query上传目录")
logger.info("")
# 立即执行一次(如果在工作时间内)
if self.is_working_time():
logger.info("立即执行首次点击循环...")
self.run_click_cycle()
else:
logger.info(f"当前不在工作时间,等待下次调度...")
# 进入调度循环
logger.info("\n调度器运行中,按 Ctrl+C 优雅停止...\n")
try:
while self.running:
schedule.run_pending()
time.sleep(30) # 每30秒检查一次
except KeyboardInterrupt:
logger.info("\n收到中断信号")
finally:
self._cleanup()
def parse_args():
"""解析命令行参数"""
parser = argparse.ArgumentParser(
description='MIP广告点击自动化服务',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
使用示例:
python main.py # 前台运行
python main.py --workers 3 # 3个并发
python main.py --no-proxy # 不使用代理
python main.py --health-port 9999 # 自定义健康检查端口
健康检查:
curl http://localhost:8888/health
'''
)
parser.add_argument(
'--workers',
type=int,
default=1,
help='最大并发数(默认: 1建议使用以避免资源冲突'
)
parser.add_argument(
'--no-proxy',
action='store_true',
help='禁用代理(默认启用)'
)
parser.add_argument(
'--health-port',
type=int,
default=8888,
help='健康检查API端口默认: 8888'
)
parser.add_argument(
'--work-start',
type=int,
default=9,
help='工作开始时间(小时,默认: 9'
)
parser.add_argument(
'--work-end',
type=int,
default=21,
help='工作结束时间(小时,默认: 21'
)
parser.add_argument(
'--version',
action='version',
version='MIP Ad Click Service v1.0.0'
)
return parser.parse_args()
def main():
"""主入口函数"""
# 解析命令行参数
args = parse_args()
# 显示启动信息
logger.info("=" * 70)
logger.info(" __ __ ___ ____ _ _ ____ _ _ _ ")
logger.info(" | \\/ |_ _| _ \\ / \\ __| | / ___| (_) ___| | __")
logger.info(" | |\\/| || || |_) | / _ \\ / _` | | | | | |/ __| |/ /")
logger.info(" | | | || || __/ / ___ \\ (_| | | |___| | | (__| < ")
logger.info(" |_| |_|___|_| /_/ \\_\\__,_| \\____|_|_|\\___|_|\\_\\")
logger.info("")
logger.info(" 广告点击自动化服务 v1.0.0")
logger.info("=" * 70)
logger.info("")
# 创建服务实例
service = ClickScheduler(
max_workers=args.workers,
use_proxy=not args.no_proxy,
health_port=args.health_port
)
# 设置工作时间
service.work_start_hour = args.work_start
service.work_end_hour = args.work_end
# 启动服务
try:
service.start()
except Exception as e:
logger.error(f"服务启动失败: {str(e)}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()