219 lines
7.1 KiB
Python
219 lines
7.1 KiB
Python
|
|
#!/usr/bin/env python
|
|||
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
"""
|
|||
|
|
TaskWorker 自动监控和恢复守护进程
|
|||
|
|
用于生产环境中自动检测和修复任务卡住的问题
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import sys
|
|||
|
|
import time
|
|||
|
|
import logging
|
|||
|
|
import signal
|
|||
|
|
import threading
|
|||
|
|
from datetime import datetime
|
|||
|
|
|
|||
|
|
# 配置日志
|
|||
|
|
logging.basicConfig(
|
|||
|
|
level=logging.INFO,
|
|||
|
|
format='%(asctime)s [%(levelname)s] %(message)s',
|
|||
|
|
handlers=[
|
|||
|
|
logging.FileHandler('logs/taskworker_monitor.log'),
|
|||
|
|
logging.StreamHandler()
|
|||
|
|
]
|
|||
|
|
)
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TaskWorkerMonitor:
|
|||
|
|
"""TaskWorker 监控器"""
|
|||
|
|
|
|||
|
|
def __init__(self, check_interval=60):
|
|||
|
|
"""
|
|||
|
|
Args:
|
|||
|
|
check_interval: 检查间隔(秒),默认60秒
|
|||
|
|
"""
|
|||
|
|
self.check_interval = check_interval
|
|||
|
|
self.running = False
|
|||
|
|
self.monitor_thread = None
|
|||
|
|
|
|||
|
|
def check_worker_status(self):
|
|||
|
|
"""检查 TaskWorker 状态"""
|
|||
|
|
try:
|
|||
|
|
from task_worker import get_task_worker
|
|||
|
|
from task_queue import get_task_queue
|
|||
|
|
|
|||
|
|
worker = get_task_worker()
|
|||
|
|
queue = get_task_queue()
|
|||
|
|
|
|||
|
|
# 获取任务统计
|
|||
|
|
tasks = queue.get_all_tasks()
|
|||
|
|
pending_count = len([t for t in tasks if t.get('status') == 'pending'])
|
|||
|
|
processing_count = len([t for t in tasks if t.get('status') == 'processing'])
|
|||
|
|
|
|||
|
|
# 检查 worker 状态
|
|||
|
|
is_running = worker.running
|
|||
|
|
alive_threads = sum(1 for t in worker.worker_threads if t and t.is_alive())
|
|||
|
|
|
|||
|
|
logger.info(f"状态检查 - 运行:{is_running} 活跃线程:{alive_threads} "
|
|||
|
|
f"待处理:{pending_count} 处理中:{processing_count}")
|
|||
|
|
|
|||
|
|
# 判断是否需要修复
|
|||
|
|
need_fix = False
|
|||
|
|
reason = ""
|
|||
|
|
|
|||
|
|
if not is_running:
|
|||
|
|
need_fix = True
|
|||
|
|
reason = "TaskWorker 未运行"
|
|||
|
|
elif alive_threads == 0 and pending_count > 0:
|
|||
|
|
need_fix = True
|
|||
|
|
reason = f"有 {pending_count} 个待处理任务,但没有活跃线程"
|
|||
|
|
elif processing_count > 0:
|
|||
|
|
# 检查处理中的任务是否长时间未更新
|
|||
|
|
# 这里可以添加更复杂的逻辑
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
return need_fix, reason, {
|
|||
|
|
'running': is_running,
|
|||
|
|
'alive_threads': alive_threads,
|
|||
|
|
'pending_count': pending_count,
|
|||
|
|
'processing_count': processing_count
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"检查状态失败: {e}")
|
|||
|
|
import traceback
|
|||
|
|
logger.error(traceback.format_exc())
|
|||
|
|
return True, f"检查失败: {e}", {}
|
|||
|
|
|
|||
|
|
def restart_worker(self):
|
|||
|
|
"""重启 TaskWorker"""
|
|||
|
|
logger.warning("正在重启 TaskWorker...")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
from task_worker import get_task_worker
|
|||
|
|
worker = get_task_worker()
|
|||
|
|
|
|||
|
|
# 停止现有 worker
|
|||
|
|
if worker.running:
|
|||
|
|
logger.info("停止现有 TaskWorker...")
|
|||
|
|
worker.stop()
|
|||
|
|
time.sleep(2)
|
|||
|
|
|
|||
|
|
# 启动新的 worker
|
|||
|
|
logger.info("启动新的 TaskWorker...")
|
|||
|
|
worker.start()
|
|||
|
|
time.sleep(2)
|
|||
|
|
|
|||
|
|
# 验证启动状态
|
|||
|
|
if worker.running:
|
|||
|
|
alive_threads = sum(1 for t in worker.worker_threads if t and t.is_alive())
|
|||
|
|
logger.info(f"✅ TaskWorker 重启成功,活跃线程: {alive_threads}")
|
|||
|
|
return True
|
|||
|
|
else:
|
|||
|
|
logger.error("❌ TaskWorker 重启后未运行")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"重启 TaskWorker 失败: {e}")
|
|||
|
|
import traceback
|
|||
|
|
logger.error(traceback.format_exc())
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def monitor_loop(self):
|
|||
|
|
"""监控循环"""
|
|||
|
|
logger.info(f"监控循环启动,检查间隔: {self.check_interval}秒")
|
|||
|
|
|
|||
|
|
consecutive_failures = 0
|
|||
|
|
max_consecutive_failures = 3
|
|||
|
|
|
|||
|
|
while self.running:
|
|||
|
|
try:
|
|||
|
|
# 检查状态
|
|||
|
|
need_fix, reason, status = self.check_worker_status()
|
|||
|
|
|
|||
|
|
if need_fix:
|
|||
|
|
logger.warning(f"⚠️ 检测到问题: {reason}")
|
|||
|
|
logger.info(f"状态详情: {status}")
|
|||
|
|
|
|||
|
|
# 尝试修复
|
|||
|
|
if self.restart_worker():
|
|||
|
|
logger.info("✅ 自动修复成功")
|
|||
|
|
consecutive_failures = 0
|
|||
|
|
else:
|
|||
|
|
consecutive_failures += 1
|
|||
|
|
logger.error(f"❌ 自动修复失败 (连续失败 {consecutive_failures} 次)")
|
|||
|
|
|
|||
|
|
if consecutive_failures >= max_consecutive_failures:
|
|||
|
|
logger.critical(f"连续修复失败 {consecutive_failures} 次,请人工介入!")
|
|||
|
|
# 可以在这里发送告警通知
|
|||
|
|
else:
|
|||
|
|
consecutive_failures = 0
|
|||
|
|
|
|||
|
|
# 等待下次检查
|
|||
|
|
time.sleep(self.check_interval)
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"监控循环错误: {e}")
|
|||
|
|
import traceback
|
|||
|
|
logger.error(traceback.format_exc())
|
|||
|
|
time.sleep(self.check_interval)
|
|||
|
|
|
|||
|
|
logger.info("监控循环已停止")
|
|||
|
|
|
|||
|
|
def start(self):
|
|||
|
|
"""启动监控"""
|
|||
|
|
if self.running:
|
|||
|
|
logger.warning("监控已在运行")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
self.running = True
|
|||
|
|
self.monitor_thread = threading.Thread(target=self.monitor_loop, daemon=True)
|
|||
|
|
self.monitor_thread.start()
|
|||
|
|
logger.info("TaskWorker 监控器已启动")
|
|||
|
|
|
|||
|
|
def stop(self):
|
|||
|
|
"""停止监控"""
|
|||
|
|
self.running = False
|
|||
|
|
if self.monitor_thread:
|
|||
|
|
self.monitor_thread.join(timeout=5)
|
|||
|
|
logger.info("TaskWorker 监控器已停止")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def signal_handler(signum, frame):
|
|||
|
|
"""信号处理器"""
|
|||
|
|
logger.info(f"收到信号 {signum},正在停止...")
|
|||
|
|
if monitor:
|
|||
|
|
monitor.stop()
|
|||
|
|
sys.exit(0)
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == '__main__':
|
|||
|
|
# 创建日志目录
|
|||
|
|
os.makedirs('logs', exist_ok=True)
|
|||
|
|
|
|||
|
|
# 注册信号处理器
|
|||
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|||
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|||
|
|
|
|||
|
|
# 创建监控器
|
|||
|
|
monitor = TaskWorkerMonitor(check_interval=60) # 每60秒检查一次
|
|||
|
|
|
|||
|
|
print("=" * 60)
|
|||
|
|
print("TaskWorker 自动监控守护进程")
|
|||
|
|
print("=" * 60)
|
|||
|
|
print(f"检查间隔: {monitor.check_interval} 秒")
|
|||
|
|
print("按 Ctrl+C 停止")
|
|||
|
|
print("=" * 60)
|
|||
|
|
|
|||
|
|
# 启动监控
|
|||
|
|
monitor.start()
|
|||
|
|
|
|||
|
|
# 保持运行
|
|||
|
|
try:
|
|||
|
|
while True:
|
|||
|
|
time.sleep(1)
|
|||
|
|
except KeyboardInterrupt:
|
|||
|
|
logger.info("用户中断")
|
|||
|
|
monitor.stop()
|