Files
ai_baijiahao/taskworker_monitor.py

219 lines
7.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
TaskWorker 自动监控和恢复守护进程
用于生产环境中自动检测和修复任务卡住的问题
"""
import os
import sys
import time
import logging
import signal
import threading
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler('logs/taskworker_monitor.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class TaskWorkerMonitor:
"""TaskWorker 监控器"""
def __init__(self, check_interval=60):
"""
Args:
check_interval: 检查间隔默认60秒
"""
self.check_interval = check_interval
self.running = False
self.monitor_thread = None
def check_worker_status(self):
"""检查 TaskWorker 状态"""
try:
from task_worker import get_task_worker
from task_queue import get_task_queue
worker = get_task_worker()
queue = get_task_queue()
# 获取任务统计
tasks = queue.get_all_tasks()
pending_count = len([t for t in tasks if t.get('status') == 'pending'])
processing_count = len([t for t in tasks if t.get('status') == 'processing'])
# 检查 worker 状态
is_running = worker.running
alive_threads = sum(1 for t in worker.worker_threads if t and t.is_alive())
logger.info(f"状态检查 - 运行:{is_running} 活跃线程:{alive_threads} "
f"待处理:{pending_count} 处理中:{processing_count}")
# 判断是否需要修复
need_fix = False
reason = ""
if not is_running:
need_fix = True
reason = "TaskWorker 未运行"
elif alive_threads == 0 and pending_count > 0:
need_fix = True
reason = f"{pending_count} 个待处理任务,但没有活跃线程"
elif processing_count > 0:
# 检查处理中的任务是否长时间未更新
# 这里可以添加更复杂的逻辑
pass
return need_fix, reason, {
'running': is_running,
'alive_threads': alive_threads,
'pending_count': pending_count,
'processing_count': processing_count
}
except Exception as e:
logger.error(f"检查状态失败: {e}")
import traceback
logger.error(traceback.format_exc())
return True, f"检查失败: {e}", {}
def restart_worker(self):
"""重启 TaskWorker"""
logger.warning("正在重启 TaskWorker...")
try:
from task_worker import get_task_worker
worker = get_task_worker()
# 停止现有 worker
if worker.running:
logger.info("停止现有 TaskWorker...")
worker.stop()
time.sleep(2)
# 启动新的 worker
logger.info("启动新的 TaskWorker...")
worker.start()
time.sleep(2)
# 验证启动状态
if worker.running:
alive_threads = sum(1 for t in worker.worker_threads if t and t.is_alive())
logger.info(f"✅ TaskWorker 重启成功,活跃线程: {alive_threads}")
return True
else:
logger.error("❌ TaskWorker 重启后未运行")
return False
except Exception as e:
logger.error(f"重启 TaskWorker 失败: {e}")
import traceback
logger.error(traceback.format_exc())
return False
def monitor_loop(self):
"""监控循环"""
logger.info(f"监控循环启动,检查间隔: {self.check_interval}")
consecutive_failures = 0
max_consecutive_failures = 3
while self.running:
try:
# 检查状态
need_fix, reason, status = self.check_worker_status()
if need_fix:
logger.warning(f"⚠️ 检测到问题: {reason}")
logger.info(f"状态详情: {status}")
# 尝试修复
if self.restart_worker():
logger.info("✅ 自动修复成功")
consecutive_failures = 0
else:
consecutive_failures += 1
logger.error(f"❌ 自动修复失败 (连续失败 {consecutive_failures} 次)")
if consecutive_failures >= max_consecutive_failures:
logger.critical(f"连续修复失败 {consecutive_failures} 次,请人工介入!")
# 可以在这里发送告警通知
else:
consecutive_failures = 0
# 等待下次检查
time.sleep(self.check_interval)
except Exception as e:
logger.error(f"监控循环错误: {e}")
import traceback
logger.error(traceback.format_exc())
time.sleep(self.check_interval)
logger.info("监控循环已停止")
def start(self):
"""启动监控"""
if self.running:
logger.warning("监控已在运行")
return
self.running = True
self.monitor_thread = threading.Thread(target=self.monitor_loop, daemon=True)
self.monitor_thread.start()
logger.info("TaskWorker 监控器已启动")
def stop(self):
"""停止监控"""
self.running = False
if self.monitor_thread:
self.monitor_thread.join(timeout=5)
logger.info("TaskWorker 监控器已停止")
def signal_handler(signum, frame):
"""信号处理器"""
logger.info(f"收到信号 {signum},正在停止...")
if monitor:
monitor.stop()
sys.exit(0)
if __name__ == '__main__':
# 创建日志目录
os.makedirs('logs', exist_ok=True)
# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# 创建监控器
monitor = TaskWorkerMonitor(check_interval=60) # 每60秒检查一次
print("=" * 60)
print("TaskWorker 自动监控守护进程")
print("=" * 60)
print(f"检查间隔: {monitor.check_interval}")
print("按 Ctrl+C 停止")
print("=" * 60)
# 启动监控
monitor.start()
# 保持运行
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("用户中断")
monitor.stop()