From 79168dec7eee1f27905164f1c9ee0cd6e25a8f75 Mon Sep 17 00:00:00 2001 From: liangguodong Date: Thu, 5 Feb 2026 21:29:59 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E5=AE=88=E6=8A=A4?= =?UTF-8?q?=E8=BF=9B=E7=A8=8B=E6=A8=A1=E5=BC=8F=E5=92=8C=E9=83=A8=E7=BD=B2?= =?UTF-8?q?=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 46 ++++- image_tag_derive.py | 113 +++++++++++- start_tag_derive.sh | 415 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 562 insertions(+), 12 deletions(-) create mode 100644 start_tag_derive.sh diff --git a/README.md b/README.md index 5f1089d..4222d6c 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,7 @@ ## 功能概述 +- **守护进程模式**:持续监控数据库,自动处理新数据(默认10秒轮询) - **批量处理模式**:10张图片一个请求,多请求并发执行 - **内容审核处理**:自动识别审核失败图片,标记状态并记录原因 - **RESTful API 服务**:提供标签衍生的 HTTP 接口 @@ -49,7 +50,8 @@ ai_tagging_images/ ├── promt/ │ └── qwen_tag_derive_prompt.py ├── database_config.py # 数据库连接和 DAO -├── image_tag_derive.py # 离线批量处理脚本 +├── image_tag_derive.py # 标签衍生主程序(支持守护模式) +├── start_tag_derive.sh # 部署管理脚本 ├── logger.py # 日志模块 ├── retry_handler.py # 重试机制 ├── tag_derive_api.py # FastAPI 服务 @@ -81,8 +83,21 @@ export DB_HOST=localhost export DB_PASSWORD=your-password ``` -### 3. 运行离线脚本 +### 3. 运行标签衍生服务 +**守护模式(推荐):** +```bash +# 持续监控数据库,自动处理新数据 +python image_tag_derive.py --daemon + +# 指定轮询间隔(默认10秒) +python image_tag_derive.py --daemon --interval 10 + +# 并发配置 +python image_tag_derive.py --daemon --batch-size 50 --concurrency 3 +``` + +**单次执行模式:** ```bash # 处理全部待处理数据 python image_tag_derive.py @@ -107,6 +122,8 @@ python image_tag_derive.py --id 16495 16496 16497 **命令行参数:** | 参数 | 说明 | |------|------| +| `--daemon` | 守护模式:持续监控数据库 | +| `--interval` | 轮询间隔(秒),默认10秒 | | `--limit` | 限制处理数量(测试用) | | `--start-id` | 起始ID(断点续传) | | `--end-id` | 结束ID | @@ -114,7 +131,30 @@ python image_tag_derive.py --id 16495 16496 16497 | `--concurrency` | 并发请求数 | | `--id` | 指定处理的ID列表 | -### 4. 启动 API 服务 +### 4. 部署管理脚本 + +```bash +# 启动服务 +./start_tag_derive.sh start + +# 停止服务 +./start_tag_derive.sh stop + +# 强制停止 +./start_tag_derive.sh force-stop + +# 重启服务 +./start_tag_derive.sh restart + +# 查看状态 +./start_tag_derive.sh status + +# 查看日志 +./start_tag_derive.sh logs +./start_tag_derive.sh logs-follow +``` + +### 5. 启动 API 服务 ```bash python tag_derive_api.py diff --git a/image_tag_derive.py b/image_tag_derive.py index a505aa3..7859e6a 100644 --- a/image_tag_derive.py +++ b/image_tag_derive.py @@ -446,6 +446,97 @@ def print_summary(results: List[Dict]): logger.warning(f" [ID:{r.get('image_id')}] 失败: {r.get('error')}") +def run_once(batch_size=None, concurrency=None, start_id=None, end_id=None, ids=None, limit=None): + """执行一次标签衍生任务""" + results = batch_derive_tags( + batch_size=batch_size, + concurrency=concurrency, + start_id=start_id, + end_id=end_id, + ids=ids, + limit=limit + ) + + if results: + print_summary(results) + # 保存结果 + output_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "derive_results.json") + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(results, f, ensure_ascii=False, indent=2) + + return results + + +def run_daemon(batch_size=None, concurrency=None, interval=10): + """ + 守护模式:持续监控数据库,处理新数据 + + Args: + batch_size: 每批次处理数量 + concurrency: 并发数 + interval: 轮询间隔(秒) + """ + import time + import signal + + running = True + + def signal_handler(signum, frame): + nonlocal running + logger.info("\n收到停止信号,准备优雅退出...") + running = False + + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + + logger.info("=" * 60) + logger.info("千问视觉大模型 - 图片标签衍生服务") + logger.info(f"运行模式: 守护进程(持续监控)") + logger.info(f"轮询间隔: {interval} 秒") + logger.info(f"批次大小: {batch_size or settings.tag_derive.batch_size}") + logger.info(f"并发数: {concurrency or settings.tag_derive.concurrency}") + logger.info("=" * 60) + + round_count = 0 + total_success = 0 + total_failed = 0 + + while running: + round_count += 1 + logger.info(f"\n[第 {round_count} 轮] 检查待处理数据...") + + try: + results = batch_derive_tags( + batch_size=batch_size, + concurrency=concurrency + ) + + if results: + success = sum(1 for r in results if r.get('success')) + failed = len(results) - success + total_success += success + total_failed += failed + logger.info(f"[第 {round_count} 轮] 处理完成: 成功 {success}, 失败 {failed}") + else: + logger.info(f"[第 {round_count} 轮] 没有待处理的数据") + + except Exception as e: + logger.error(f"[第 {round_count} 轮] 处理异常: {e}") + + if running: + logger.info(f"等待 {interval} 秒后继续...") + # 分段sleep,便于响应信号 + for _ in range(interval): + if not running: + break + time.sleep(1) + + logger.info("=" * 60) + logger.info("服务已停止") + logger.info(f"统计: 共运行 {round_count} 轮, 成功 {total_success} 条, 失败 {total_failed} 条") + logger.info("=" * 60) + + def main(): import argparse @@ -456,11 +547,23 @@ def main(): parser.add_argument('--concurrency', type=int, default=None, help='并发请求数(同时发出的API请求数)') parser.add_argument('--id', type=int, nargs='+', default=None, help='指定ID,只处理这些ID(可指定多个)') parser.add_argument('--limit', type=int, default=None, help='限制处理的总数量(用于测试,如 --limit 10)') + parser.add_argument('--daemon', action='store_true', help='守护模式:持续监控数据库,自动处理新数据') + parser.add_argument('--interval', type=int, default=10, help='守护模式轮询间隔(秒),默认10秒') args = parser.parse_args() batch_size = args.batch_size or settings.tag_derive.batch_size concurrency = args.concurrency or settings.tag_derive.concurrency + # 守护模式 + if args.daemon: + run_daemon( + batch_size=args.batch_size, + concurrency=args.concurrency, + interval=args.interval + ) + return + + # 单次执行模式 logger.info("=" * 60) logger.info("千问视觉大模型 - 图片标签衍生生成器") logger.info(f"模式: 每批 {batch_size} 张,并发 {concurrency} 个请求") @@ -473,7 +576,7 @@ def main(): logger.info(f"ID范围: {id_range}") logger.info("=" * 60) - results = batch_derive_tags( + run_once( batch_size=args.batch_size, concurrency=args.concurrency, start_id=args.start_id, @@ -481,14 +584,6 @@ def main(): ids=args.id, limit=args.limit ) - - if results: - print_summary(results) - - output_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "derive_results.json") - with open(output_file, 'w', encoding='utf-8') as f: - json.dump(results, f, ensure_ascii=False, indent=2) - logger.info(f"结果已保存到: {output_file}") if __name__ == "__main__": diff --git a/start_tag_derive.sh b/start_tag_derive.sh new file mode 100644 index 0000000..b749168 --- /dev/null +++ b/start_tag_derive.sh @@ -0,0 +1,415 @@ +#!/bin/bash + +# ============================================ +# 图片标签衍生系统管理脚本 +# 支持进程数量控制 +# ============================================ + +# 配置区 +BASE_DIR="/home/work/ai_tagging_images" +VENV_PYTHON="/home/work/keyword_crawl/venv/bin/python" + +# image_tag_derive 配置 +DERIVE_SCRIPT="${BASE_DIR}/image_tag_derive.py" +DERIVE_PID_FILE="${BASE_DIR}/image_tag_derive.pid" +DERIVE_LOG_FILE="${BASE_DIR}/image_tag_derive.log" +DERIVE_MAX_PROCESSES=1 # 限制最多1个进程 + +# 颜色定义 +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# 获取脚本正在运行的进程数量 +get_process_count() { + local script_name=$(basename "$1") + pgrep -f "$script_name" 2>/dev/null | wc -l +} + +# 获取所有相关进程的PID +get_all_pids() { + local script_name=$(basename "$1") + pgrep -f "$script_name" 2>/dev/null | tr '\n' ' ' +} + +# 循环间隔(秒) +LOOP_INTERVAL=10 + +# 启动服务(带进程数量控制) +start_single() { + local script=$1 + local pid_file=$2 + local log_file=$3 + local name=$4 + local max_processes=$5 + shift 5 + local extra_args="$@" + + local script_name=$(basename "$script") + local current_count=$(get_process_count "$script") + + # 检查是否超过最大进程数 + if [ $current_count -ge $max_processes ]; then + echo -e "${YELLOW}${name} 已达到最大进程数 (${current_count}/${max_processes}),跳过启动${NC}" + local first_pid=$(pgrep -f "$script_name" | head -n1) + if [ -n "$first_pid" ]; then + echo "$first_pid" > "$pid_file" + fi + return 0 + fi + + # 检查PID文件记录的进程 + if [ -f "$pid_file" ]; then + local pid=$(cat "$pid_file" 2>/dev/null) + if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then + echo -e "${YELLOW}${name} 已在运行(PID文件记录),PID: ${pid}${NC}" + return 0 + fi + fi + + echo -e "${BLUE}正在启动 ${name}(守护模式)...${NC}" + if [ -n "$extra_args" ]; then + echo -e "${BLUE}额外参数: ${extra_args}${NC}" + fi + + # 确保日志目录存在 + mkdir -p "$(dirname "$log_file")" + + # 备份旧日志 + if [ -f "$log_file" ]; then + local backup_log="${log_file}.$(date +%Y%m%d_%H%M%S).bak" + cp "$log_file" "$backup_log" + echo -e "${BLUE}旧日志已备份到: ${backup_log}${NC}" + fi + + # 启动守护进程(使用 --daemon 参数) + nohup "$VENV_PYTHON" "$script" --daemon --interval "$LOOP_INTERVAL" $extra_args >> "$log_file" 2>&1 & + local new_pid=$! + + # 等待进程真正启动 + sleep 2 + + # 验证进程是否启动成功 + if kill -0 "$new_pid" 2>/dev/null; then + echo "$new_pid" > "$pid_file" + echo -e "${GREEN}${name} 已启动,PID: ${new_pid}${NC}" + echo -e "${BLUE}日志文件: ${log_file}${NC}" + echo -e "${BLUE}轮询间隔: ${LOOP_INTERVAL}秒${NC}" + return 0 + else + echo -e "${RED}${name} 启动失败,请检查日志${NC}" + tail -20 "$log_file" + rm -f "$pid_file" + return 1 + fi +} + +# 停止服务的所有实例 +stop_single_all() { + local script=$1 + local name=$2 + local pid_file=$3 + + local script_name=$(basename "$script") + local pids=$(get_all_pids "$script") + local count=$(get_process_count "$script") + + if [ $count -eq 0 ]; then + echo -e "${YELLOW}${name} 没有运行中的进程${NC}" + rm -f "$pid_file" + return 0 + fi + + echo -e "${BLUE}正在停止 ${name} (${count}个进程)...${NC}" + echo -e "进程PIDs: ${pids}" + + # 首先尝试优雅终止 + for pid in $pids; do + if kill -0 "$pid" 2>/dev/null; then + echo -e " 发送SIGTERM到 PID $pid..." + kill "$pid" + fi + done + + # 等待优雅退出 + local wait_time=10 + local remaining=$count + for i in $(seq 1 $wait_time); do + remaining=$(get_process_count "$script") + if [ $remaining -eq 0 ]; then + break + fi + echo -n "." + sleep 1 + done + + echo "" + + # 检查是否还有进程残留 + remaining=$(get_process_count "$script") + if [ $remaining -gt 0 ]; then + echo -e "${YELLOW}还有 ${remaining} 个进程未退出,强制终止...${NC}" + pids=$(get_all_pids "$script") + for pid in $pids; do + if kill -0 "$pid" 2>/dev/null; then + kill -9 "$pid" 2>/dev/null + fi + done + sleep 2 + fi + + # 验证所有进程都已停止 + remaining=$(get_process_count "$script") + if [ $remaining -eq 0 ]; then + echo -e "${GREEN}${name} 所有进程已停止${NC}" + rm -f "$pid_file" + return 0 + else + echo -e "${RED}警告:仍有 ${remaining} 个进程无法终止${NC}" + return 1 + fi +} + +# 启动服务 +start() { + shift # 移除 'start' 参数 + local extra_args="$@" + + echo -e "${BLUE}========== 启动图片标签衍生系统 ==========${NC}" + echo -e "${YELLOW}进程限制:最多启动1个实例${NC}" + echo "" + + start_single "$DERIVE_SCRIPT" "$DERIVE_PID_FILE" "$DERIVE_LOG_FILE" "image_tag_derive" "$DERIVE_MAX_PROCESSES" $extra_args + + echo -e "${BLUE}========================================${NC}" +} + +# 停止服务 +stop() { + echo -e "${BLUE}========== 停止图片标签衍生系统 ==========${NC}" + + stop_single_all "$DERIVE_SCRIPT" "image_tag_derive" "$DERIVE_PID_FILE" + + echo -e "${BLUE}========================================${NC}" +} + +# 强制停止 +force-stop() { + echo -e "${RED}========== 强制停止标签衍生进程 ==========${NC}" + + # 停止守护进程 + if [ -f "$DERIVE_PID_FILE" ]; then + local pid=$(cat "$DERIVE_PID_FILE" 2>/dev/null) + if [ -n "$pid" ]; then + kill -9 "$pid" 2>/dev/null + fi + fi + + # 停止所有相关进程 + pkill -9 -f "image_tag_derive.py" 2>/dev/null + pkill -9 -f "start_tag_derive.sh" 2>/dev/null + + sleep 2 + + rm -f "$DERIVE_PID_FILE" + + local remaining=$(pgrep -f "image_tag_derive" | wc -l) + if [ $remaining -eq 0 ]; then + echo -e "${GREEN}✅ 所有进程已强制停止${NC}" + else + echo -e "${RED}❌ 仍有 ${remaining} 个进程存活${NC}" + pgrep -f "image_tag_derive" | xargs ps -fp 2>/dev/null + fi + + echo -e "${RED}==========================================${NC}" +} + +# 重启服务 +restart() { + shift # 移除 'restart' 参数 + local extra_args="$@" + + echo -e "${BLUE}========== 重启图片标签衍生系统 ==========${NC}" + + stop + if [ $? -eq 0 ]; then + sleep 3 + start start $extra_args + else + echo -e "${RED}停止服务失败,请使用 force-restart${NC}" + return 1 + fi + + echo -e "${BLUE}========================================${NC}" +} + +# 强制重启 +force-restart() { + shift # 移除 'force-restart' 参数 + local extra_args="$@" + + echo -e "${YELLOW}========== 强制重启图片标签衍生系统 ==========${NC}" + + force-stop + sleep 3 + start start $extra_args + + echo -e "${YELLOW}============================================${NC}" +} + +# 显示状态 +status() { + echo -e "${BLUE}========== 图片标签衍生系统状态 ==========${NC}" + echo -e "${BLUE}系统时间: $(date)${NC}" + echo -e "${BLUE}工作目录: ${BASE_DIR}${NC}" + echo "" + + local count=$(get_process_count "$DERIVE_SCRIPT") + echo -e "${YELLOW}📊 进程状态:${NC}" + echo -e " 进程数: ${count}" + + if [ $count -gt 0 ]; then + local pids=$(get_all_pids "$DERIVE_SCRIPT") + echo -e " 进程PIDs: ${pids}" + + # 显示CPU和内存使用 + for pid in $pids; do + local cpu=$(ps -p $pid -o %cpu --no-headers 2>/dev/null | tr -d ' ') + local mem=$(ps -p $pid -o %mem --no-headers 2>/dev/null | tr -d ' ') + local runtime=$(ps -p $pid -o etime --no-headers 2>/dev/null | tr -d ' ') + echo -e " PID ${pid}: CPU ${cpu}%, 内存 ${mem}%, 运行时间 ${runtime}" + done + else + echo -e "${YELLOW}没有运行中的进程${NC}" + fi + + echo "" + echo -e "${YELLOW}📁 PID文件状态:${NC}" + if [ -f "$DERIVE_PID_FILE" ]; then + local pid=$(cat "$DERIVE_PID_FILE" 2>/dev/null) + if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then + echo -e " ${GREEN}✓ image_tag_derive.pid: 有效 (PID: $pid)${NC}" + else + echo -e " ${RED}✗ image_tag_derive.pid: 无效或进程不存在${NC}" + fi + else + echo -e " ${YELLOW}○ image_tag_derive.pid: 不存在${NC}" + fi + + echo "" + echo -e "${YELLOW}📝 最近日志:${NC}" + if [ -f "$DERIVE_LOG_FILE" ]; then + echo -e "${BLUE}--- image_tag_derive.log (最后10行) ---${NC}" + tail -10 "$DERIVE_LOG_FILE" 2>/dev/null + else + echo -e "${YELLOW}日志文件不存在${NC}" + fi + + echo -e "${BLUE}========================================${NC}" +} + +# 查看日志 +logs() { + local lines=${1:-50} + echo -e "${BLUE}========== 查看日志 (最后 ${lines} 行) ==========${NC}" + + if [ -f "$DERIVE_LOG_FILE" ]; then + tail -$lines "$DERIVE_LOG_FILE" + else + echo -e "${YELLOW}日志文件不存在${NC}" + fi + + echo -e "${BLUE}============================================${NC}" +} + +# 实时查看日志 +logs-follow() { + echo -e "${BLUE}========== 实时查看日志 (Ctrl+C 退出) ==========${NC}" + tail -f "$DERIVE_LOG_FILE" +} + +# 显示帮助 +show_help() { + echo -e "${GREEN}图片标签衍生系统管理脚本${NC}" + echo "" + echo -e "${YELLOW}当前配置:${NC}" + echo -e " 工作目录: ${BASE_DIR}" + echo -e " 最大进程数: ${DERIVE_MAX_PROCESSES}" + echo "" + echo -e "${BLUE}用法: $0 {命令} [参数]${NC}" + echo "" + echo -e "${GREEN}服务管理:${NC}" + echo -e " ${YELLOW}start [args]${NC} 启动服务(循环模式,每${LOOP_INTERVAL}秒执行一次)" + echo -e " ${YELLOW}stop${NC} 停止服务" + echo -e " ${YELLOW}force-stop${NC} 强制停止所有进程" + echo -e " ${YELLOW}restart [args]${NC} 重启服务" + echo -e " ${YELLOW}force-restart [args]${NC} 强制重启" + echo "" + echo -e "${GREEN}状态查看:${NC}" + echo -e " ${YELLOW}status${NC} 显示进程状态" + echo -e " ${YELLOW}logs [N]${NC} 查看最后N行日志(默认50)" + echo -e " ${YELLOW}logs-follow${NC} 实时查看日志" + echo "" + echo -e "${GREEN}其他:${NC}" + echo -e " ${YELLOW}help${NC} 显示帮助" + echo "" + echo -e "${YELLOW}可用参数 (传递给 image_tag_derive.py):${NC}" + echo -e " --start-id N 起始ID(断点续传)" + echo -e " --end-id N 结束ID" + echo -e " --batch-size N 每批次图片数量" + echo -e " --concurrency N 并发请求数" + echo -e " --id N [N ...] 指定处理的ID" + echo -e " --limit N 限制处理总数(测试用)" + echo "" + echo -e "${GREEN}示例:${NC}" + echo -e " $0 start # 启动处理所有待处理数据" + echo -e " $0 start --limit 10 # 测试模式,只处理10条" + echo -e " $0 start --start-id 1000 # 从ID 1000开始处理" + echo -e " $0 start --id 100 101 102 # 只处理指定ID" + echo -e " $0 start --concurrency 3 # 使用3个并发" + echo "" + echo -e "${RED}注意:${NC}" + echo -e " - 服务以循环模式运行,每${LOOP_INTERVAL}秒执行一次" + echo -e " - 脚本会限制每个服务最多启动1个实例" +} + +# 主逻辑 +case "$1" in + start) + start "$@" + ;; + stop) + stop + ;; + force-stop) + force-stop + ;; + restart) + restart "$@" + ;; + force-restart) + force-restart "$@" + ;; + status) + status + ;; + logs) + logs $2 + ;; + logs-follow) + logs-follow + ;; + help|--help|-h) + show_help + ;; + *) + echo -e "${RED}错误:未知命令 '$1'${NC}" + echo "" + show_help + exit 1 + ;; +esac + +exit 0