Files
ai_baijiahao/taskworker_monitor.py

219 lines
7.1 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
TaskWorker 自动监控和恢复守护进程
用于生产环境中自动检测和修复任务卡住的问题
"""
import os
import sys
import time
import logging
import signal
import threading
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler('logs/taskworker_monitor.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class TaskWorkerMonitor:
"""TaskWorker 监控器"""
def __init__(self, check_interval=60):
"""
Args:
check_interval: 检查间隔默认60秒
"""
self.check_interval = check_interval
self.running = False
self.monitor_thread = None
def check_worker_status(self):
"""检查 TaskWorker 状态"""
try:
from task_worker import get_task_worker
from task_queue import get_task_queue
worker = get_task_worker()
queue = get_task_queue()
# 获取任务统计
tasks = queue.get_all_tasks()
pending_count = len([t for t in tasks if t.get('status') == 'pending'])
processing_count = len([t for t in tasks if t.get('status') == 'processing'])
# 检查 worker 状态
is_running = worker.running
alive_threads = sum(1 for t in worker.worker_threads if t and t.is_alive())
logger.info(f"状态检查 - 运行:{is_running} 活跃线程:{alive_threads} "
f"待处理:{pending_count} 处理中:{processing_count}")
# 判断是否需要修复
need_fix = False
reason = ""
if not is_running:
need_fix = True
reason = "TaskWorker 未运行"
elif alive_threads == 0 and pending_count > 0:
need_fix = True
reason = f"{pending_count} 个待处理任务,但没有活跃线程"
elif processing_count > 0:
# 检查处理中的任务是否长时间未更新
# 这里可以添加更复杂的逻辑
pass
return need_fix, reason, {
'running': is_running,
'alive_threads': alive_threads,
'pending_count': pending_count,
'processing_count': processing_count
}
except Exception as e:
logger.error(f"检查状态失败: {e}")
import traceback
logger.error(traceback.format_exc())
return True, f"检查失败: {e}", {}
def restart_worker(self):
"""重启 TaskWorker"""
logger.warning("正在重启 TaskWorker...")
try:
from task_worker import get_task_worker
worker = get_task_worker()
# 停止现有 worker
if worker.running:
logger.info("停止现有 TaskWorker...")
worker.stop()
time.sleep(2)
# 启动新的 worker
logger.info("启动新的 TaskWorker...")
worker.start()
time.sleep(2)
# 验证启动状态
if worker.running:
alive_threads = sum(1 for t in worker.worker_threads if t and t.is_alive())
logger.info(f"✅ TaskWorker 重启成功,活跃线程: {alive_threads}")
return True
else:
logger.error("❌ TaskWorker 重启后未运行")
return False
except Exception as e:
logger.error(f"重启 TaskWorker 失败: {e}")
import traceback
logger.error(traceback.format_exc())
return False
def monitor_loop(self):
"""监控循环"""
logger.info(f"监控循环启动,检查间隔: {self.check_interval}")
consecutive_failures = 0
max_consecutive_failures = 3
while self.running:
try:
# 检查状态
need_fix, reason, status = self.check_worker_status()
if need_fix:
logger.warning(f"⚠️ 检测到问题: {reason}")
logger.info(f"状态详情: {status}")
# 尝试修复
if self.restart_worker():
logger.info("✅ 自动修复成功")
consecutive_failures = 0
else:
consecutive_failures += 1
logger.error(f"❌ 自动修复失败 (连续失败 {consecutive_failures} 次)")
if consecutive_failures >= max_consecutive_failures:
logger.critical(f"连续修复失败 {consecutive_failures} 次,请人工介入!")
# 可以在这里发送告警通知
else:
consecutive_failures = 0
# 等待下次检查
time.sleep(self.check_interval)
except Exception as e:
logger.error(f"监控循环错误: {e}")
import traceback
logger.error(traceback.format_exc())
time.sleep(self.check_interval)
logger.info("监控循环已停止")
def start(self):
"""启动监控"""
if self.running:
logger.warning("监控已在运行")
return
self.running = True
self.monitor_thread = threading.Thread(target=self.monitor_loop, daemon=True)
self.monitor_thread.start()
logger.info("TaskWorker 监控器已启动")
def stop(self):
"""停止监控"""
self.running = False
if self.monitor_thread:
self.monitor_thread.join(timeout=5)
logger.info("TaskWorker 监控器已停止")
def signal_handler(signum, frame):
"""信号处理器"""
logger.info(f"收到信号 {signum},正在停止...")
if monitor:
monitor.stop()
sys.exit(0)
if __name__ == '__main__':
# 创建日志目录
os.makedirs('logs', exist_ok=True)
# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# 创建监控器
monitor = TaskWorkerMonitor(check_interval=60) # 每60秒检查一次
print("=" * 60)
print("TaskWorker 自动监控守护进程")
print("=" * 60)
print(f"检查间隔: {monitor.check_interval}")
print("按 Ctrl+C 停止")
print("=" * 60)
# 启动监控
monitor.start()
# 保持运行
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("用户中断")
monitor.stop()