feat: 添加守护进程模式和部署脚本

This commit is contained in:
2026-02-05 21:29:59 +08:00
parent 48644c662f
commit 79168dec7e
3 changed files with 562 additions and 12 deletions

View File

@@ -4,6 +4,7 @@
## 功能概述
- **守护进程模式**持续监控数据库自动处理新数据默认10秒轮询
- **批量处理模式**10张图片一个请求多请求并发执行
- **内容审核处理**:自动识别审核失败图片,标记状态并记录原因
- **RESTful API 服务**:提供标签衍生的 HTTP 接口
@@ -49,7 +50,8 @@ ai_tagging_images/
├── promt/
│ └── qwen_tag_derive_prompt.py
├── database_config.py # 数据库连接和 DAO
├── image_tag_derive.py # 离线批量处理脚本
├── image_tag_derive.py # 标签衍生主程序(支持守护模式)
├── start_tag_derive.sh # 部署管理脚本
├── logger.py # 日志模块
├── retry_handler.py # 重试机制
├── tag_derive_api.py # FastAPI 服务
@@ -81,8 +83,21 @@ export DB_HOST=localhost
export DB_PASSWORD=your-password
```
### 3. 运行离线脚本
### 3. 运行标签衍生服务
**守护模式(推荐):**
```bash
# 持续监控数据库,自动处理新数据
python image_tag_derive.py --daemon
# 指定轮询间隔默认10秒
python image_tag_derive.py --daemon --interval 10
# 并发配置
python image_tag_derive.py --daemon --batch-size 50 --concurrency 3
```
**单次执行模式:**
```bash
# 处理全部待处理数据
python image_tag_derive.py
@@ -107,6 +122,8 @@ python image_tag_derive.py --id 16495 16496 16497
**命令行参数:**
| 参数 | 说明 |
|------|------|
| `--daemon` | 守护模式:持续监控数据库 |
| `--interval` | 轮询间隔默认10秒 |
| `--limit` | 限制处理数量(测试用) |
| `--start-id` | 起始ID断点续传 |
| `--end-id` | 结束ID |
@@ -114,7 +131,30 @@ python image_tag_derive.py --id 16495 16496 16497
| `--concurrency` | 并发请求数 |
| `--id` | 指定处理的ID列表 |
### 4. 启动 API 服务
### 4. 部署管理脚本
```bash
# 启动服务
./start_tag_derive.sh start
# 停止服务
./start_tag_derive.sh stop
# 强制停止
./start_tag_derive.sh force-stop
# 重启服务
./start_tag_derive.sh restart
# 查看状态
./start_tag_derive.sh status
# 查看日志
./start_tag_derive.sh logs
./start_tag_derive.sh logs-follow
```
### 5. 启动 API 服务
```bash
python tag_derive_api.py

View File

@@ -446,6 +446,97 @@ def print_summary(results: List[Dict]):
logger.warning(f" [ID:{r.get('image_id')}] 失败: {r.get('error')}")
def run_once(batch_size=None, concurrency=None, start_id=None, end_id=None, ids=None, limit=None):
"""执行一次标签衍生任务"""
results = batch_derive_tags(
batch_size=batch_size,
concurrency=concurrency,
start_id=start_id,
end_id=end_id,
ids=ids,
limit=limit
)
if results:
print_summary(results)
# 保存结果
output_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "derive_results.json")
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
return results
def run_daemon(batch_size=None, concurrency=None, interval=10):
"""
守护模式:持续监控数据库,处理新数据
Args:
batch_size: 每批次处理数量
concurrency: 并发数
interval: 轮询间隔(秒)
"""
import time
import signal
running = True
def signal_handler(signum, frame):
nonlocal running
logger.info("\n收到停止信号,准备优雅退出...")
running = False
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
logger.info("=" * 60)
logger.info("千问视觉大模型 - 图片标签衍生服务")
logger.info(f"运行模式: 守护进程(持续监控)")
logger.info(f"轮询间隔: {interval}")
logger.info(f"批次大小: {batch_size or settings.tag_derive.batch_size}")
logger.info(f"并发数: {concurrency or settings.tag_derive.concurrency}")
logger.info("=" * 60)
round_count = 0
total_success = 0
total_failed = 0
while running:
round_count += 1
logger.info(f"\n[第 {round_count} 轮] 检查待处理数据...")
try:
results = batch_derive_tags(
batch_size=batch_size,
concurrency=concurrency
)
if results:
success = sum(1 for r in results if r.get('success'))
failed = len(results) - success
total_success += success
total_failed += failed
logger.info(f"[第 {round_count} 轮] 处理完成: 成功 {success}, 失败 {failed}")
else:
logger.info(f"[第 {round_count} 轮] 没有待处理的数据")
except Exception as e:
logger.error(f"[第 {round_count} 轮] 处理异常: {e}")
if running:
logger.info(f"等待 {interval} 秒后继续...")
# 分段sleep便于响应信号
for _ in range(interval):
if not running:
break
time.sleep(1)
logger.info("=" * 60)
logger.info("服务已停止")
logger.info(f"统计: 共运行 {round_count} 轮, 成功 {total_success} 条, 失败 {total_failed}")
logger.info("=" * 60)
def main():
import argparse
@@ -456,11 +547,23 @@ def main():
parser.add_argument('--concurrency', type=int, default=None, help='并发请求数同时发出的API请求数')
parser.add_argument('--id', type=int, nargs='+', default=None, help='指定ID只处理这些ID可指定多个')
parser.add_argument('--limit', type=int, default=None, help='限制处理的总数量(用于测试,如 --limit 10')
parser.add_argument('--daemon', action='store_true', help='守护模式:持续监控数据库,自动处理新数据')
parser.add_argument('--interval', type=int, default=10, help='守护模式轮询间隔默认10秒')
args = parser.parse_args()
batch_size = args.batch_size or settings.tag_derive.batch_size
concurrency = args.concurrency or settings.tag_derive.concurrency
# 守护模式
if args.daemon:
run_daemon(
batch_size=args.batch_size,
concurrency=args.concurrency,
interval=args.interval
)
return
# 单次执行模式
logger.info("=" * 60)
logger.info("千问视觉大模型 - 图片标签衍生生成器")
logger.info(f"模式: 每批 {batch_size} 张,并发 {concurrency} 个请求")
@@ -473,7 +576,7 @@ def main():
logger.info(f"ID范围: {id_range}")
logger.info("=" * 60)
results = batch_derive_tags(
run_once(
batch_size=args.batch_size,
concurrency=args.concurrency,
start_id=args.start_id,
@@ -481,14 +584,6 @@ def main():
ids=args.id,
limit=args.limit
)
if results:
print_summary(results)
output_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "derive_results.json")
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
logger.info(f"结果已保存到: {output_file}")
if __name__ == "__main__":

415
start_tag_derive.sh Normal file
View File

@@ -0,0 +1,415 @@
#!/bin/bash
# ============================================
# 图片标签衍生系统管理脚本
# 支持进程数量控制
# ============================================
# 配置区
BASE_DIR="/home/work/ai_tagging_images"
VENV_PYTHON="/home/work/keyword_crawl/venv/bin/python"
# image_tag_derive 配置
DERIVE_SCRIPT="${BASE_DIR}/image_tag_derive.py"
DERIVE_PID_FILE="${BASE_DIR}/image_tag_derive.pid"
DERIVE_LOG_FILE="${BASE_DIR}/image_tag_derive.log"
DERIVE_MAX_PROCESSES=1 # 限制最多1个进程
# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# 获取脚本正在运行的进程数量
get_process_count() {
local script_name=$(basename "$1")
pgrep -f "$script_name" 2>/dev/null | wc -l
}
# 获取所有相关进程的PID
get_all_pids() {
local script_name=$(basename "$1")
pgrep -f "$script_name" 2>/dev/null | tr '\n' ' '
}
# 循环间隔(秒)
LOOP_INTERVAL=10
# 启动服务(带进程数量控制)
start_single() {
local script=$1
local pid_file=$2
local log_file=$3
local name=$4
local max_processes=$5
shift 5
local extra_args="$@"
local script_name=$(basename "$script")
local current_count=$(get_process_count "$script")
# 检查是否超过最大进程数
if [ $current_count -ge $max_processes ]; then
echo -e "${YELLOW}${name} 已达到最大进程数 (${current_count}/${max_processes}),跳过启动${NC}"
local first_pid=$(pgrep -f "$script_name" | head -n1)
if [ -n "$first_pid" ]; then
echo "$first_pid" > "$pid_file"
fi
return 0
fi
# 检查PID文件记录的进程
if [ -f "$pid_file" ]; then
local pid=$(cat "$pid_file" 2>/dev/null)
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
echo -e "${YELLOW}${name} 已在运行PID文件记录PID: ${pid}${NC}"
return 0
fi
fi
echo -e "${BLUE}正在启动 ${name}(守护模式)...${NC}"
if [ -n "$extra_args" ]; then
echo -e "${BLUE}额外参数: ${extra_args}${NC}"
fi
# 确保日志目录存在
mkdir -p "$(dirname "$log_file")"
# 备份旧日志
if [ -f "$log_file" ]; then
local backup_log="${log_file}.$(date +%Y%m%d_%H%M%S).bak"
cp "$log_file" "$backup_log"
echo -e "${BLUE}旧日志已备份到: ${backup_log}${NC}"
fi
# 启动守护进程(使用 --daemon 参数)
nohup "$VENV_PYTHON" "$script" --daemon --interval "$LOOP_INTERVAL" $extra_args >> "$log_file" 2>&1 &
local new_pid=$!
# 等待进程真正启动
sleep 2
# 验证进程是否启动成功
if kill -0 "$new_pid" 2>/dev/null; then
echo "$new_pid" > "$pid_file"
echo -e "${GREEN}${name} 已启动PID: ${new_pid}${NC}"
echo -e "${BLUE}日志文件: ${log_file}${NC}"
echo -e "${BLUE}轮询间隔: ${LOOP_INTERVAL}${NC}"
return 0
else
echo -e "${RED}${name} 启动失败,请检查日志${NC}"
tail -20 "$log_file"
rm -f "$pid_file"
return 1
fi
}
# 停止服务的所有实例
stop_single_all() {
local script=$1
local name=$2
local pid_file=$3
local script_name=$(basename "$script")
local pids=$(get_all_pids "$script")
local count=$(get_process_count "$script")
if [ $count -eq 0 ]; then
echo -e "${YELLOW}${name} 没有运行中的进程${NC}"
rm -f "$pid_file"
return 0
fi
echo -e "${BLUE}正在停止 ${name} (${count}个进程)...${NC}"
echo -e "进程PIDs: ${pids}"
# 首先尝试优雅终止
for pid in $pids; do
if kill -0 "$pid" 2>/dev/null; then
echo -e " 发送SIGTERM到 PID $pid..."
kill "$pid"
fi
done
# 等待优雅退出
local wait_time=10
local remaining=$count
for i in $(seq 1 $wait_time); do
remaining=$(get_process_count "$script")
if [ $remaining -eq 0 ]; then
break
fi
echo -n "."
sleep 1
done
echo ""
# 检查是否还有进程残留
remaining=$(get_process_count "$script")
if [ $remaining -gt 0 ]; then
echo -e "${YELLOW}还有 ${remaining} 个进程未退出,强制终止...${NC}"
pids=$(get_all_pids "$script")
for pid in $pids; do
if kill -0 "$pid" 2>/dev/null; then
kill -9 "$pid" 2>/dev/null
fi
done
sleep 2
fi
# 验证所有进程都已停止
remaining=$(get_process_count "$script")
if [ $remaining -eq 0 ]; then
echo -e "${GREEN}${name} 所有进程已停止${NC}"
rm -f "$pid_file"
return 0
else
echo -e "${RED}警告:仍有 ${remaining} 个进程无法终止${NC}"
return 1
fi
}
# 启动服务
start() {
shift # 移除 'start' 参数
local extra_args="$@"
echo -e "${BLUE}========== 启动图片标签衍生系统 ==========${NC}"
echo -e "${YELLOW}进程限制最多启动1个实例${NC}"
echo ""
start_single "$DERIVE_SCRIPT" "$DERIVE_PID_FILE" "$DERIVE_LOG_FILE" "image_tag_derive" "$DERIVE_MAX_PROCESSES" $extra_args
echo -e "${BLUE}========================================${NC}"
}
# 停止服务
stop() {
echo -e "${BLUE}========== 停止图片标签衍生系统 ==========${NC}"
stop_single_all "$DERIVE_SCRIPT" "image_tag_derive" "$DERIVE_PID_FILE"
echo -e "${BLUE}========================================${NC}"
}
# 强制停止
force-stop() {
echo -e "${RED}========== 强制停止标签衍生进程 ==========${NC}"
# 停止守护进程
if [ -f "$DERIVE_PID_FILE" ]; then
local pid=$(cat "$DERIVE_PID_FILE" 2>/dev/null)
if [ -n "$pid" ]; then
kill -9 "$pid" 2>/dev/null
fi
fi
# 停止所有相关进程
pkill -9 -f "image_tag_derive.py" 2>/dev/null
pkill -9 -f "start_tag_derive.sh" 2>/dev/null
sleep 2
rm -f "$DERIVE_PID_FILE"
local remaining=$(pgrep -f "image_tag_derive" | wc -l)
if [ $remaining -eq 0 ]; then
echo -e "${GREEN}✅ 所有进程已强制停止${NC}"
else
echo -e "${RED}❌ 仍有 ${remaining} 个进程存活${NC}"
pgrep -f "image_tag_derive" | xargs ps -fp 2>/dev/null
fi
echo -e "${RED}==========================================${NC}"
}
# 重启服务
restart() {
shift # 移除 'restart' 参数
local extra_args="$@"
echo -e "${BLUE}========== 重启图片标签衍生系统 ==========${NC}"
stop
if [ $? -eq 0 ]; then
sleep 3
start start $extra_args
else
echo -e "${RED}停止服务失败,请使用 force-restart${NC}"
return 1
fi
echo -e "${BLUE}========================================${NC}"
}
# 强制重启
force-restart() {
shift # 移除 'force-restart' 参数
local extra_args="$@"
echo -e "${YELLOW}========== 强制重启图片标签衍生系统 ==========${NC}"
force-stop
sleep 3
start start $extra_args
echo -e "${YELLOW}============================================${NC}"
}
# 显示状态
status() {
echo -e "${BLUE}========== 图片标签衍生系统状态 ==========${NC}"
echo -e "${BLUE}系统时间: $(date)${NC}"
echo -e "${BLUE}工作目录: ${BASE_DIR}${NC}"
echo ""
local count=$(get_process_count "$DERIVE_SCRIPT")
echo -e "${YELLOW}📊 进程状态:${NC}"
echo -e " 进程数: ${count}"
if [ $count -gt 0 ]; then
local pids=$(get_all_pids "$DERIVE_SCRIPT")
echo -e " 进程PIDs: ${pids}"
# 显示CPU和内存使用
for pid in $pids; do
local cpu=$(ps -p $pid -o %cpu --no-headers 2>/dev/null | tr -d ' ')
local mem=$(ps -p $pid -o %mem --no-headers 2>/dev/null | tr -d ' ')
local runtime=$(ps -p $pid -o etime --no-headers 2>/dev/null | tr -d ' ')
echo -e " PID ${pid}: CPU ${cpu}%, 内存 ${mem}%, 运行时间 ${runtime}"
done
else
echo -e "${YELLOW}没有运行中的进程${NC}"
fi
echo ""
echo -e "${YELLOW}📁 PID文件状态${NC}"
if [ -f "$DERIVE_PID_FILE" ]; then
local pid=$(cat "$DERIVE_PID_FILE" 2>/dev/null)
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
echo -e " ${GREEN}✓ image_tag_derive.pid: 有效 (PID: $pid)${NC}"
else
echo -e " ${RED}✗ image_tag_derive.pid: 无效或进程不存在${NC}"
fi
else
echo -e " ${YELLOW}○ image_tag_derive.pid: 不存在${NC}"
fi
echo ""
echo -e "${YELLOW}📝 最近日志:${NC}"
if [ -f "$DERIVE_LOG_FILE" ]; then
echo -e "${BLUE}--- image_tag_derive.log (最后10行) ---${NC}"
tail -10 "$DERIVE_LOG_FILE" 2>/dev/null
else
echo -e "${YELLOW}日志文件不存在${NC}"
fi
echo -e "${BLUE}========================================${NC}"
}
# 查看日志
logs() {
local lines=${1:-50}
echo -e "${BLUE}========== 查看日志 (最后 ${lines} 行) ==========${NC}"
if [ -f "$DERIVE_LOG_FILE" ]; then
tail -$lines "$DERIVE_LOG_FILE"
else
echo -e "${YELLOW}日志文件不存在${NC}"
fi
echo -e "${BLUE}============================================${NC}"
}
# 实时查看日志
logs-follow() {
echo -e "${BLUE}========== 实时查看日志 (Ctrl+C 退出) ==========${NC}"
tail -f "$DERIVE_LOG_FILE"
}
# 显示帮助
show_help() {
echo -e "${GREEN}图片标签衍生系统管理脚本${NC}"
echo ""
echo -e "${YELLOW}当前配置:${NC}"
echo -e " 工作目录: ${BASE_DIR}"
echo -e " 最大进程数: ${DERIVE_MAX_PROCESSES}"
echo ""
echo -e "${BLUE}用法: $0 {命令} [参数]${NC}"
echo ""
echo -e "${GREEN}服务管理:${NC}"
echo -e " ${YELLOW}start [args]${NC} 启动服务(循环模式,每${LOOP_INTERVAL}秒执行一次)"
echo -e " ${YELLOW}stop${NC} 停止服务"
echo -e " ${YELLOW}force-stop${NC} 强制停止所有进程"
echo -e " ${YELLOW}restart [args]${NC} 重启服务"
echo -e " ${YELLOW}force-restart [args]${NC} 强制重启"
echo ""
echo -e "${GREEN}状态查看:${NC}"
echo -e " ${YELLOW}status${NC} 显示进程状态"
echo -e " ${YELLOW}logs [N]${NC} 查看最后N行日志(默认50)"
echo -e " ${YELLOW}logs-follow${NC} 实时查看日志"
echo ""
echo -e "${GREEN}其他:${NC}"
echo -e " ${YELLOW}help${NC} 显示帮助"
echo ""
echo -e "${YELLOW}可用参数 (传递给 image_tag_derive.py)${NC}"
echo -e " --start-id N 起始ID断点续传"
echo -e " --end-id N 结束ID"
echo -e " --batch-size N 每批次图片数量"
echo -e " --concurrency N 并发请求数"
echo -e " --id N [N ...] 指定处理的ID"
echo -e " --limit N 限制处理总数(测试用)"
echo ""
echo -e "${GREEN}示例:${NC}"
echo -e " $0 start # 启动处理所有待处理数据"
echo -e " $0 start --limit 10 # 测试模式只处理10条"
echo -e " $0 start --start-id 1000 # 从ID 1000开始处理"
echo -e " $0 start --id 100 101 102 # 只处理指定ID"
echo -e " $0 start --concurrency 3 # 使用3个并发"
echo ""
echo -e "${RED}注意:${NC}"
echo -e " - 服务以循环模式运行,每${LOOP_INTERVAL}秒执行一次"
echo -e " - 脚本会限制每个服务最多启动1个实例"
}
# 主逻辑
case "$1" in
start)
start "$@"
;;
stop)
stop
;;
force-stop)
force-stop
;;
restart)
restart "$@"
;;
force-restart)
force-restart "$@"
;;
status)
status
;;
logs)
logs $2
;;
logs-follow)
logs-follow
;;
help|--help|-h)
show_help
;;
*)
echo -e "${RED}错误:未知命令 '$1'${NC}"
echo ""
show_help
exit 1
;;
esac
exit 0