Files
ai_baijiahao/check_taskworker.py

223 lines
6.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
TaskWorker 状态检查和修复工具
用于诊断和解决任务卡在等待中的问题
"""
import os
import sys
import logging
import psutil
import time
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
logger = logging.getLogger(__name__)
def check_taskworker_lock():
"""检查 TaskWorker 锁文件"""
lock_file = 'data/taskworker.lock'
if os.path.exists(lock_file):
try:
with open(lock_file, 'r') as f:
pid = f.read().strip()
logger.info(f"发现锁文件记录的PID: {pid}")
# 检查进程是否存在
try:
pid_int = int(pid)
if psutil.pid_exists(pid_int):
proc = psutil.Process(pid_int)
logger.info(f"进程 {pid} 存在: {proc.name()} - {proc.status()}")
return True, pid_int
else:
logger.warning(f"进程 {pid} 不存在,锁文件已失效")
return False, None
except ValueError:
logger.error(f"锁文件内容无效: {pid}")
return False, None
except Exception as e:
logger.error(f"读取锁文件失败: {e}")
return False, None
else:
logger.info("未发现锁文件")
return False, None
def check_pending_tasks():
"""检查等待中的任务数量"""
try:
from task_queue import get_task_queue
queue = get_task_queue()
tasks = queue.get_all_tasks()
pending_tasks = [t for t in tasks if t.get('status') == 'pending']
processing_tasks = [t for t in tasks if t.get('status') == 'processing']
logger.info(f"待处理任务: {len(pending_tasks)}")
logger.info(f"处理中任务: {len(processing_tasks)}")
if pending_tasks:
logger.info("待处理任务列表:")
for task in pending_tasks[:5]: # 只显示前5个
logger.info(f" - {task['task_id']}: {task.get('url', 'N/A')[:50]}")
return len(pending_tasks), len(processing_tasks)
except Exception as e:
logger.error(f"检查任务失败: {e}")
return 0, 0
def check_worker_threads():
"""检查 TaskWorker 线程是否运行"""
try:
from task_worker import get_task_worker
worker = get_task_worker()
logger.info(f"TaskWorker 运行状态: {worker.running}")
logger.info(f"当前并发数: {worker.current_workers}/{worker.max_workers}")
logger.info(f"工作线程数: {len(worker.worker_threads)}")
logger.info(f"正在处理的任务: {len(worker.processing_tasks)}")
# 检查线程是否活跃
alive_threads = sum(1 for t in worker.worker_threads if t and t.is_alive())
logger.info(f"活跃线程数: {alive_threads}")
return worker.running, alive_threads
except Exception as e:
logger.error(f"检查 TaskWorker 失败: {e}")
import traceback
logger.error(traceback.format_exc())
return False, 0
def restart_taskworker():
"""重启 TaskWorker"""
logger.info("正在重启 TaskWorker...")
try:
from task_worker import get_task_worker
worker = get_task_worker()
# 停止现有 worker
if worker.running:
logger.info("停止现有 TaskWorker...")
worker.stop()
time.sleep(2)
# 启动新的 worker
logger.info("启动新的 TaskWorker...")
worker.start()
time.sleep(1)
# 验证启动状态
running, alive_threads = check_worker_threads()
if running and alive_threads > 0:
logger.info("✅ TaskWorker 重启成功")
return True
else:
logger.error("❌ TaskWorker 重启失败")
return False
except Exception as e:
logger.error(f"重启 TaskWorker 失败: {e}")
import traceback
logger.error(traceback.format_exc())
return False
def clean_stale_lock():
"""清理失效的锁文件"""
lock_file = 'data/taskworker.lock'
if os.path.exists(lock_file):
try:
os.remove(lock_file)
logger.info("✅ 已清理失效的锁文件")
return True
except Exception as e:
logger.error(f"清理锁文件失败: {e}")
return False
return True
def main():
"""主函数"""
print("=" * 60)
print("TaskWorker 状态检查工具")
print("=" * 60)
# 1. 检查锁文件
print("\n[1] 检查锁文件...")
lock_exists, lock_pid = check_taskworker_lock()
# 2. 检查待处理任务
print("\n[2] 检查任务队列...")
pending_count, processing_count = check_pending_tasks()
# 3. 检查 Worker 线程
print("\n[3] 检查 TaskWorker 状态...")
try:
is_running, alive_threads = check_worker_threads()
except:
is_running, alive_threads = False, 0
# 4. 诊断和修复
print("\n[4] 诊断结果:")
print("-" * 60)
need_fix = False
if pending_count > 0 and alive_threads == 0:
print("❌ 问题: 有待处理任务,但没有活跃的工作线程")
need_fix = True
if lock_exists and not lock_pid:
print("⚠️ 警告: 锁文件存在但进程不存在(僵尸锁)")
need_fix = True
if not is_running:
print("❌ 问题: TaskWorker 未运行")
need_fix = True
if not need_fix:
print("✅ TaskWorker 运行正常")
return
# 5. 修复
print("\n[5] 开始修复...")
print("-" * 60)
if '--fix' in sys.argv or '--auto-fix' in sys.argv:
# 清理失效的锁文件
clean_stale_lock()
# 重启 TaskWorker
if restart_taskworker():
print("\n✅ 修复完成!")
print("\n重新检查状态...")
time.sleep(2)
check_worker_threads()
check_pending_tasks()
else:
print("\n❌ 修复失败,请手动重启服务")
else:
print("\n提示: 使用 --fix 参数自动修复问题")
print("示例: python check_taskworker.py --fix")
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print("\n\n用户中断")
except Exception as e:
logger.error(f"执行失败: {e}")
import traceback
traceback.print_exc()