4.6 KiB
4.6 KiB
TaskWorker 卡住问题解决方案
问题现象
线上部署时,所有任务都在"等待中"状态卡住,无法被处理。
根本原因
在使用 Gunicorn 部署时,TaskWorker 可能因为以下原因未能正常启动或中途崩溃:
- 多进程竞争:多个 worker 进程同时启动导致冲突
- 锁文件失效:进程异常退出后锁文件未清理
- 线程崩溃:工作线程因异常而停止
解决方案
方案1:使用诊断工具(推荐)
已创建专门的诊断和修复工具 check_taskworker.py
检查状态
python check_taskworker.py
自动修复
python check_taskworker.py --fix
方案2:手动重启服务
# 停止 Gunicorn
kill -TERM $(cat gunicorn.pid)
# 清理锁文件
rm -f data/taskworker.lock
# 重新启动
gunicorn -c gunicorn_config.py app:app
方案3:使用自动监控守护进程(生产环境推荐)
启动自动监控程序,会定期检查并自动修复:
# 后台运行
nohup python taskworker_monitor.py > logs/monitor.out 2>&1 &
# 或使用 systemd 管理(推荐)
sudo systemctl start baijiahao-monitor
预防措施
1. 优化的 Gunicorn 配置
已更新 gunicorn_config.py,使用文件锁(fcntl)替代简单的存在性检查,避免竞争条件。
2. 添加健康检查
在 app.py 中添加健康检查接口:
@app.route('/health/taskworker')
def health_taskworker():
"""TaskWorker 健康检查"""
try:
from task_worker import get_task_worker
worker = get_task_worker()
alive_threads = sum(1 for t in worker.worker_threads if t and t.is_alive())
return jsonify({
'status': 'healthy' if worker.running and alive_threads > 0 else 'unhealthy',
'running': worker.running,
'alive_threads': alive_threads,
'current_workers': worker.current_workers,
'processing_tasks': len(worker.processing_tasks)
})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
3. 使用 Supervisor 管理(可选)
创建 supervisor.conf:
[program:baijiahao]
command=/path/to/venv/bin/gunicorn -c gunicorn_config.py app:app
directory=/path/to/ai_baijiahao
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/path/to/ai_baijiahao/logs/supervisor.log
[program:baijiahao-monitor]
command=/path/to/venv/bin/python taskworker_monitor.py
directory=/path/to/ai_baijiahao
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/path/to/ai_baijiahao/logs/monitor_supervisor.log
日常维护
查看日志
# TaskWorker 日志
tail -f logs/gunicorn_error.log | grep TaskWorker
# 监控日志
tail -f logs/taskworker_monitor.log
定期清理
# 清理旧的任务结果(保留最近30天)
find data/results -name "*.xlsx" -mtime +30 -delete
# 清理旧日志
find logs -name "*.log" -mtime +30 -delete
监控告警
可以结合监控系统(如 Prometheus + Grafana)监控以下指标:
- TaskWorker 运行状态:
/health/taskworker - 待处理任务数:通过 API 获取队列统计
- 处理中任务数
- 平均任务处理时间
常见问题
Q1: 重启后任务会丢失吗?
A: 不会。所有任务都存储在 SQLite 数据库中,重启后会自动继续处理。
Q2: 如何调整并发数?
A: 修改 task_worker.py 中的 TaskWorker 初始化参数:
worker = TaskWorker(min_workers=1, max_workers=3) # 调整这两个参数
Q3: 监控程序占用资源吗?
A: 极低。监控程序每60秒检查一次,几乎不占用 CPU 和内存。
升级说明
本次更新包含以下改进:
- ✅ 更健壮的文件锁机制:使用
fcntl替代简单的文件存在性检查 - ✅ 状态验证:启动后验证 TaskWorker 是否真正运行
- ✅ 诊断工具:
check_taskworker.py快速定位问题 - ✅ 自动监控:
taskworker_monitor.py自动检测和修复 - ✅ 详细日志:记录启动过程和异常信息
联系支持
如果问题仍然存在,请提供以下信息:
logs/gunicorn_error.log的最近日志python check_taskworker.py的输出- 数据库中待处理任务的数量和状态
# 快速诊断命令
echo "=== Gunicorn 进程 ==="
ps aux | grep gunicorn
echo "=== TaskWorker 锁文件 ==="
ls -lh data/taskworker.lock
echo "=== 任务统计 ==="
python check_taskworker.py
echo "=== 最近日志 ==="
tail -n 50 logs/gunicorn_error.log