# -*- coding: utf-8 -*- """ Gunicorn 配置文件 """ import multiprocessing import os # 服务器绑定地址 bind = "0.0.0.0:8030" # 工作进程数(建议:CPU核心数 * 2 + 1) workers = multiprocessing.cpu_count() * 2 + 1 # 工作模式(gevent 适合 I/O 密集型应用,如爬虫) # 需要安装: pip install gevent # worker_class = 'gevent' # 或使用线程模式(适合任务队列) worker_class = 'gthread' threads = 2 # 最大并发请求数 worker_connections = 1000 # 工作进程超时时间(秒) timeout = 300 # 优雅重启超时时间 graceful_timeout = 30 # Keep-alive 时间 keepalive = 5 # 守护进程模式(后台运行) # 注意:调试时可以设置为False查看详细日志 daemon = False # 进程 PID 文件 pidfile = 'gunicorn.pid' # 日志配置 accesslog = 'logs/gunicorn_access.log' errorlog = 'logs/gunicorn_error.log' loglevel = 'info' # 访问日志格式 access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s' # 进程名称 proc_name = 'baijiahao_scraper' # 最大请求数(防止内存泄漏) max_requests = 1000 max_requests_jitter = 50 # 预加载应用(节省内存) # 注意:由于TaskWorker需要在worker进程中启动,设置为False preload_app = False # 环境变量 raw_env = [ 'FLASK_ENV=production', ] # 工作进程启动时的回调 def on_starting(server): """服务器启动时""" import os print("=" * 50) print("Gunicorn 服务启动中...") print(f"绑定地址: {bind}") print(f"工作进程数: {workers}") print(f"工作模式: {worker_class}") # 清理旧的TaskWorker锁文件 lock_file = 'data/taskworker.lock' if os.path.exists(lock_file): try: os.remove(lock_file) print("✓ 已清理旧的TaskWorker锁文件") except: pass print("=" * 50) def when_ready(server): """服务器就绪时""" print("✓ 服务器已就绪,可以接受请求") def post_worker_init(worker): """worker进程初始化后的钩子 - 只在第一个worker中启动TaskWorker""" import os import sys import logging import time import fcntl # 用于文件锁 # 设置日志,直接输出到gunicorn error log logger = logging.getLogger('gunicorn.error') # 创建必要的目录 os.makedirs('exports', exist_ok=True) os.makedirs('data', exist_ok=True) os.makedirs('data/results', exist_ok=True) os.makedirs('logs', exist_ok=True) # 使用文件锁确保只有一个worker启动TaskWorker lock_file_path = 'data/taskworker.lock' lock_file = None try: # 打开锁文件(不存在则创建) lock_file = open(lock_file_path, 'w') # 尝试获取排他锁(非阻塞) try: fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) # 成功获得锁,启动TaskWorker logger.info(f"[Worker {worker.pid}] 获得锁,准备启动TaskWorker...") lock_file.write(str(worker.pid)) lock_file.flush() try: from task_worker import start_task_worker, get_task_worker start_task_worker() # 验证启动状态 time.sleep(1) task_worker = get_task_worker() if task_worker.running: logger.info(f"[Worker {worker.pid}] ✅ TaskWorker已成功启动(主 worker)") logger.info(f"[Worker {worker.pid}] 并发数: {task_worker.current_workers}/{task_worker.max_workers}") else: logger.error(f"[Worker {worker.pid}] ⚠️ TaskWorker启动后未运行") except Exception as e: logger.error(f"[Worker {worker.pid}] TaskWorker启动失败: {e}") import traceback logger.error(traceback.format_exc()) # 释放锁 fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) lock_file.close() except IOError: # 锁已被其他进程持有 logger.info(f"[Worker {worker.pid}] 跳过TaskWorker启动(其他worker已启动)") lock_file.close() except Exception as e: logger.error(f"[Worker {worker.pid}] TaskWorker启动异常: {e}") import traceback logger.error(traceback.format_exc()) if lock_file: lock_file.close() def on_exit(server): """服务器退出时""" import os # 清理TaskWorker锁文件 lock_file = 'data/taskworker.lock' if os.path.exists(lock_file): try: os.remove(lock_file) except: pass print("✓ Gunicorn 服务已停止")