Files
ai_baijiahao/check_taskworker.py

223 lines
6.6 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
TaskWorker 状态检查和修复工具
用于诊断和解决任务卡在等待中的问题
"""
import os
import sys
import logging
import psutil
import time
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger(__name__)
def check_taskworker_lock():
"""检查 TaskWorker 锁文件"""
lock_file = 'data/taskworker.lock'
if os.path.exists(lock_file):
try:
with open(lock_file, 'r') as f:
pid = f.read().strip()
logger.info(f"发现锁文件记录的PID: {pid}")
# 检查进程是否存在
try:
pid_int = int(pid)
if psutil.pid_exists(pid_int):
proc = psutil.Process(pid_int)
logger.info(f"进程 {pid} 存在: {proc.name()} - {proc.status()}")
return True, pid_int
else:
logger.warning(f"进程 {pid} 不存在,锁文件已失效")
return False, None
except ValueError:
logger.error(f"锁文件内容无效: {pid}")
return False, None
except Exception as e:
logger.error(f"读取锁文件失败: {e}")
return False, None
else:
logger.info("未发现锁文件")
return False, None
def check_pending_tasks():
"""检查等待中的任务数量"""
try:
from task_queue import get_task_queue
queue = get_task_queue()
tasks = queue.get_all_tasks()
pending_tasks = [t for t in tasks if t.get('status') == 'pending']
processing_tasks = [t for t in tasks if t.get('status') == 'processing']
logger.info(f"待处理任务: {len(pending_tasks)}")
logger.info(f"处理中任务: {len(processing_tasks)}")
if pending_tasks:
logger.info("待处理任务列表:")
for task in pending_tasks[:5]: # 只显示前5个
logger.info(f" - {task['task_id']}: {task.get('url', 'N/A')[:50]}")
return len(pending_tasks), len(processing_tasks)
except Exception as e:
logger.error(f"检查任务失败: {e}")
return 0, 0
def check_worker_threads():
"""检查 TaskWorker 线程是否运行"""
try:
from task_worker import get_task_worker
worker = get_task_worker()
logger.info(f"TaskWorker 运行状态: {worker.running}")
logger.info(f"当前并发数: {worker.current_workers}/{worker.max_workers}")
logger.info(f"工作线程数: {len(worker.worker_threads)}")
logger.info(f"正在处理的任务: {len(worker.processing_tasks)}")
# 检查线程是否活跃
alive_threads = sum(1 for t in worker.worker_threads if t and t.is_alive())
logger.info(f"活跃线程数: {alive_threads}")
return worker.running, alive_threads
except Exception as e:
logger.error(f"检查 TaskWorker 失败: {e}")
import traceback
logger.error(traceback.format_exc())
return False, 0
def restart_taskworker():
"""重启 TaskWorker"""
logger.info("正在重启 TaskWorker...")
try:
from task_worker import get_task_worker
worker = get_task_worker()
# 停止现有 worker
if worker.running:
logger.info("停止现有 TaskWorker...")
worker.stop()
time.sleep(2)
# 启动新的 worker
logger.info("启动新的 TaskWorker...")
worker.start()
time.sleep(1)
# 验证启动状态
running, alive_threads = check_worker_threads()
if running and alive_threads > 0:
logger.info("✅ TaskWorker 重启成功")
return True
else:
logger.error("❌ TaskWorker 重启失败")
return False
except Exception as e:
logger.error(f"重启 TaskWorker 失败: {e}")
import traceback
logger.error(traceback.format_exc())
return False
def clean_stale_lock():
"""清理失效的锁文件"""
lock_file = 'data/taskworker.lock'
if os.path.exists(lock_file):
try:
os.remove(lock_file)
logger.info("✅ 已清理失效的锁文件")
return True
except Exception as e:
logger.error(f"清理锁文件失败: {e}")
return False
return True
def main():
"""主函数"""
print("=" * 60)
print("TaskWorker 状态检查工具")
print("=" * 60)
# 1. 检查锁文件
print("\n[1] 检查锁文件...")
lock_exists, lock_pid = check_taskworker_lock()
# 2. 检查待处理任务
print("\n[2] 检查任务队列...")
pending_count, processing_count = check_pending_tasks()
# 3. 检查 Worker 线程
print("\n[3] 检查 TaskWorker 状态...")
try:
is_running, alive_threads = check_worker_threads()
except:
is_running, alive_threads = False, 0
# 4. 诊断和修复
print("\n[4] 诊断结果:")
print("-" * 60)
need_fix = False
if pending_count > 0 and alive_threads == 0:
print("❌ 问题: 有待处理任务,但没有活跃的工作线程")
need_fix = True
if lock_exists and not lock_pid:
print("⚠️ 警告: 锁文件存在但进程不存在(僵尸锁)")
need_fix = True
if not is_running:
print("❌ 问题: TaskWorker 未运行")
need_fix = True
if not need_fix:
print("✅ TaskWorker 运行正常")
return
# 5. 修复
print("\n[5] 开始修复...")
print("-" * 60)
if '--fix' in sys.argv or '--auto-fix' in sys.argv:
# 清理失效的锁文件
clean_stale_lock()
# 重启 TaskWorker
if restart_taskworker():
print("\n✅ 修复完成!")
print("\n重新检查状态...")
time.sleep(2)
check_worker_threads()
check_pending_tasks()
else:
print("\n❌ 修复失败,请手动重启服务")
else:
print("\n提示: 使用 --fix 参数自动修复问题")
print("示例: python check_taskworker.py --fix")
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print("\n\n用户中断")
except Exception as e:
logger.error(f"执行失败: {e}")
import traceback
traceback.print_exc()