From 1a74b42f828543f44270694ecbabb2c475a9e739 Mon Sep 17 00:00:00 2001 From: liangguodong Date: Thu, 5 Feb 2026 23:52:26 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E7=A7=BB=E9=99=A4=E4=BA=8B?= =?UTF-8?q?=E5=8A=A1=E3=80=81=E7=AE=80=E5=8C=96=E6=97=A5=E5=BF=97=E3=80=81?= =?UTF-8?q?=E8=BD=AE=E8=AF=A2=E6=94=B92=E7=A7=92=E3=80=81=E5=88=A0?= =?UTF-8?q?=E9=99=A4=E9=87=8D=E5=BB=BA=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 8 +- database_config.py | 4 - image_tag_derive.py | 14 +- rebuild_database.py | 310 ---------------------------------------- restore_database.py | 335 -------------------------------------------- start_tag_derive.sh | 2 +- 6 files changed, 11 insertions(+), 662 deletions(-) delete mode 100644 rebuild_database.py delete mode 100644 restore_database.py diff --git a/README.md b/README.md index 4222d6c..c6df9b1 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ ## 功能概述 -- **守护进程模式**:持续监控数据库,自动处理新数据(默认10秒轮询) +- **守护进程模式**:持续监控数据库,自动处理新数据(默认2秒轮询) - **批量处理模式**:10张图片一个请求,多请求并发执行 - **内容审核处理**:自动识别审核失败图片,标记状态并记录原因 - **RESTful API 服务**:提供标签衍生的 HTTP 接口 @@ -90,8 +90,8 @@ export DB_PASSWORD=your-password # 持续监控数据库,自动处理新数据 python image_tag_derive.py --daemon -# 指定轮询间隔(默认10秒) -python image_tag_derive.py --daemon --interval 10 +# 指定轮询间隔(默认2秒) +python image_tag_derive.py --daemon --interval 2 # 并发配置 python image_tag_derive.py --daemon --batch-size 50 --concurrency 3 @@ -123,7 +123,7 @@ python image_tag_derive.py --id 16495 16496 16497 | 参数 | 说明 | |------|------| | `--daemon` | 守护模式:持续监控数据库 | -| `--interval` | 轮询间隔(秒),默认10秒 | +| `--interval` | 轮询间隔(秒),默认2秒 | | `--limit` | 限制处理数量(测试用) | | `--start-id` | 起始ID(断点续传) | | `--end-id` | 结束ID | diff --git a/database_config.py b/database_config.py index ba46eb9..cabd279 100644 --- a/database_config.py +++ b/database_config.py @@ -64,10 +64,6 @@ class DatabaseManager: cursor = conn.cursor(dictionary=dictionary) try: yield cursor - conn.commit() - except Exception as e: - conn.rollback() - raise e finally: cursor.close() conn.close() diff --git a/image_tag_derive.py b/image_tag_derive.py index 2ba6581..785eb42 100644 --- a/image_tag_derive.py +++ b/image_tag_derive.py @@ -497,13 +497,11 @@ def run_daemon(batch_size=None, concurrency=None, interval=10): logger.info(f"并发数: {concurrency or settings.tag_derive.concurrency}") logger.info("=" * 60) - round_count = 0 total_success = 0 total_failed = 0 while running: - round_count += 1 - logger.info(f"[{round_count}] 检查待处理数据...") + logger.info("检查待处理数据...") try: results = batch_derive_tags( @@ -516,12 +514,12 @@ def run_daemon(batch_size=None, concurrency=None, interval=10): failed = len(results) - success total_success += success total_failed += failed - logger.info(f"[{round_count}] 处理完成: 成功 {success}, 失败 {failed}") + logger.info(f"处理完成: 成功 {success}, 失败 {failed}") else: - logger.info(f"[{round_count}] 没有待处理的数据") + logger.info("没有待处理的数据") except Exception as e: - logger.error(f"[{round_count}] 处理异常: {e}") + logger.error(f"处理异常: {e}") if running: logger.info(f"等待 {interval} 秒后继续...") @@ -533,7 +531,7 @@ def run_daemon(batch_size=None, concurrency=None, interval=10): logger.info("=" * 60) logger.info("服务已停止") - logger.info(f"统计: 共运行 {round_count} 次, 成功 {total_success} 条, 失败 {total_failed} 条") + logger.info(f"统计: 成功 {total_success} 条, 失败 {total_failed} 条") logger.info("=" * 60) @@ -548,7 +546,7 @@ def main(): parser.add_argument('--id', type=int, nargs='+', default=None, help='指定ID,只处理这些ID(可指定多个)') parser.add_argument('--limit', type=int, default=None, help='限制处理的总数量(用于测试,如 --limit 10)') parser.add_argument('--daemon', action='store_true', help='守护模式:持续监控数据库,自动处理新数据') - parser.add_argument('--interval', type=int, default=10, help='守护模式轮询间隔(秒),默认10秒') + parser.add_argument('--interval', type=int, default=2, help='守护模式轮询间隔(秒),默认2秒') args = parser.parse_args() batch_size = args.batch_size or settings.tag_derive.batch_size diff --git a/rebuild_database.py b/rebuild_database.py deleted file mode 100644 index bc2afe6..0000000 --- a/rebuild_database.py +++ /dev/null @@ -1,310 +0,0 @@ -# -*- coding: utf-8 -*- -""" -数据库重建脚本 -根据 ai_article.sql 重建数据库结构,从 ai_image_tags.txt 导入数据 -""" - -import mysql.connector -import os - - -# 数据库配置 -DB_CONFIG = { - "host": "localhost", - "port": 3306, - "user": "root", - "password": "liang20020523", - "charset": "utf8mb4" -} - -DATABASE_NAME = "ai_article" -SQL_FILE = "ai_article.sql" -DATA_FILE = "ai_image_tags.txt" - - -def get_connection(with_database=False): - """获取数据库连接""" - config = DB_CONFIG.copy() - if with_database: - config["database"] = DATABASE_NAME - return mysql.connector.connect(**config) - - -def rebuild_database_structure(): - """重建数据库结构""" - print("=" * 60) - print("步骤1: 重建数据库结构") - print("=" * 60) - - # 读取SQL文件 - sql_path = os.path.join(os.path.dirname(__file__), SQL_FILE) - print(f"读取SQL文件: {sql_path}") - - with open(sql_path, "r", encoding="utf-8") as f: - sql_content = f.read() - - # 连接MySQL(不指定数据库) - conn = get_connection(with_database=False) - cursor = conn.cursor() - - try: - # 删除并重新创建数据库 - print(f"\n删除数据库 {DATABASE_NAME}(如果存在)...") - cursor.execute(f"DROP DATABASE IF EXISTS `{DATABASE_NAME}`") - - print(f"创建数据库 {DATABASE_NAME}...") - cursor.execute(f"CREATE DATABASE `{DATABASE_NAME}` CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci") - - print(f"切换到数据库 {DATABASE_NAME}...") - cursor.execute(f"USE `{DATABASE_NAME}`") - - # 分割并执行SQL语句 - print("\n执行SQL脚本...") - - # 移除注释并分割SQL语句 - statements = [] - current_statement = "" - in_comment = False - - for line in sql_content.split("\n"): - stripped = line.strip() - - # 跳过空行 - if not stripped: - continue - - # 跳过单行注释 - if stripped.startswith("--"): - continue - - # 处理多行注释开始 - if stripped.startswith("/*"): - in_comment = True - continue - - # 处理多行注释结束 - if "*/" in stripped: - in_comment = False - continue - - # 跳过注释中的内容 - if in_comment: - continue - - current_statement += line + "\n" - - # 检查语句是否结束 - if stripped.endswith(";"): - statements.append(current_statement.strip()) - current_statement = "" - - # 执行每条SQL语句 - success_count = 0 - error_count = 0 - - for i, stmt in enumerate(statements): - if not stmt or stmt.strip() == ";": - continue - try: - cursor.execute(stmt) - conn.commit() - success_count += 1 - # 打印表创建信息 - if "CREATE TABLE" in stmt.upper(): - table_name = stmt.split("`")[1] if "`" in stmt else "unknown" - print(f" ✓ 创建表: {table_name}") - except mysql.connector.Error as e: - error_count += 1 - # 只打印关键错误 - if "already exists" not in str(e).lower(): - print(f" ✗ SQL执行错误: {str(e)[:100]}") - - print(f"\nSQL执行完成: 成功 {success_count} 条, 失败 {error_count} 条") - - finally: - cursor.close() - conn.close() - - -def import_image_tags_data(): - """从 ai_image_tags.txt 导入数据""" - print("\n" + "=" * 60) - print("步骤2: 导入 ai_image_tags 数据") - print("=" * 60) - - data_path = os.path.join(os.path.dirname(__file__), DATA_FILE) - print(f"读取数据文件: {data_path}") - - if not os.path.exists(data_path): - print(f"数据文件不存在: {data_path}") - return - - with open(data_path, "r", encoding="utf-8") as f: - lines = f.readlines() - - # 解析数据 - # 文件格式:第9行是列头,从第10行开始是数据 - data_rows = [] - header_line = None - - for i, line in enumerate(lines): - stripped = line.strip() - if not stripped: - continue - - # 找到列头行(包含 id, image_id 等) - if stripped.startswith("id\t"): - header_line = stripped - print(f"找到列头(第{i+1}行): {stripped[:80]}...") - continue - - # 跳过非数据行 - if header_line is None: - continue - - # 解析数据行(以数字开头) - parts = stripped.split("\t") - if len(parts) >= 10 and parts[0].isdigit(): - data_rows.append(parts) - - print(f"解析到 {len(data_rows)} 条数据") - - if not data_rows: - print("没有数据需要导入") - return - - # 连接数据库 - conn = get_connection(with_database=True) - cursor = conn.cursor() - - try: - # 禁用外键检查 - cursor.execute("SET FOREIGN_KEY_CHECKS = 0") - print("已禁用外键检查") - - # 插入数据 - insert_sql = """ - INSERT INTO ai_image_tags - (id, image_id, image_name, image_url, image_thumb_url, tag_id, tag_name, - default_tag_id, default_tag_name, keywords_id, keywords_name, - department_id, department_name, image_source, created_user_id, - created_at, updated_at, image_attached_article_count, status, blocking_reason) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - """ - - success_count = 0 - error_count = 0 - - for row in data_rows: - try: - # 处理数据,确保长度匹配 - while len(row) < 20: - row.append("") - - # 转换数据类型 - values = ( - int(row[0]), # id - int(row[1]), # image_id - row[2], # image_name - row[3], # image_url - row[4], # image_thumb_url - int(row[5]), # tag_id - row[6], # tag_name - int(row[7]) if row[7] else 0, # default_tag_id - row[8], # default_tag_name - int(row[9]), # keywords_id - row[10], # keywords_name - int(row[11]), # department_id - row[12], # department_name - int(row[13]) if row[13] else 1, # image_source - int(row[14]) if row[14] else 0, # created_user_id - row[15], # created_at - row[16], # updated_at - int(row[17]) if row[17] else 0, # image_attached_article_count - row[18] if row[18] else "draft", # status - row[19] if len(row) > 19 else "" # blocking_reason - ) - - cursor.execute(insert_sql, values) - success_count += 1 - - except Exception as e: - error_count += 1 - if error_count <= 3: - print(f" 插入错误 (id={row[0]}): {e}") - - conn.commit() - print(f"\n数据导入完成: 成功 {success_count} 条, 失败 {error_count} 条") - - # 恢复外键检查 - cursor.execute("SET FOREIGN_KEY_CHECKS = 1") - print("已恢复外键检查") - - finally: - cursor.close() - conn.close() - - -def verify_database(): - """验证数据库""" - print("\n" + "=" * 60) - print("步骤3: 验证数据库") - print("=" * 60) - - conn = get_connection(with_database=True) - cursor = conn.cursor() - - try: - # 检查表数量 - cursor.execute("SHOW TABLES") - tables = cursor.fetchall() - print(f"\n数据库中共有 {len(tables)} 张表:") - for t in tables[:10]: - print(f" - {t[0]}") - if len(tables) > 10: - print(f" ... 还有 {len(tables) - 10} 张表") - - # 检查 ai_image_tags 表数据 - cursor.execute("SELECT COUNT(*) FROM ai_image_tags") - count = cursor.fetchone()[0] - print(f"\nai_image_tags 表共有 {count} 条记录") - - if count > 0: - cursor.execute("SELECT id, tag_name, department_name FROM ai_image_tags LIMIT 3") - rows = cursor.fetchall() - print("示例数据:") - for row in rows: - print(f" ID: {row[0]}, 标签: {row[1][:30]}..., 科室: {row[2]}") - - finally: - cursor.close() - conn.close() - - -def main(): - print("\n" + "=" * 60) - print(" 数据库重建脚本") - print("=" * 60) - print(f"数据库: {DATABASE_NAME}") - print(f"SQL文件: {SQL_FILE}") - print(f"数据文件: {DATA_FILE}") - print("=" * 60) - - # 确认操作 - confirm = input("\n警告: 此操作将删除并重建数据库,所有数据将丢失!\n确认继续? (输入 'yes' 确认): ") - if confirm.lower() != "yes": - print("操作已取消") - return - - # 执行重建 - rebuild_database_structure() - import_image_tags_data() - verify_database() - - print("\n" + "=" * 60) - print("数据库重建完成!") - print("=" * 60) - - -if __name__ == "__main__": - main() diff --git a/restore_database.py b/restore_database.py deleted file mode 100644 index be7f75f..0000000 --- a/restore_database.py +++ /dev/null @@ -1,335 +0,0 @@ -# -*- coding: utf-8 -*- -""" -数据库恢复脚本 -根据 ai_article.sql 重建数据库结构,从 backup_data.json 恢复数据 -""" - -import mysql.connector -import json -import os - - -# 数据库配置 -DB_CONFIG = { - "host": "localhost", - "port": 3306, - "user": "root", - "password": "liang20020523", - "charset": "utf8mb4" -} - -DATABASE_NAME = "ai_article" -SQL_FILE = "ai_article.sql" -BACKUP_FILE = "backup_data.json" - - -def get_connection(with_database=False): - """获取数据库连接""" - config = DB_CONFIG.copy() - if with_database: - config["database"] = DATABASE_NAME - return mysql.connector.connect(**config) - - -def rebuild_database_structure(): - """重建数据库结构""" - print("=" * 60) - print("步骤1: 重建数据库结构") - print("=" * 60) - - sql_path = os.path.join(os.path.dirname(__file__), SQL_FILE) - print(f"读取SQL文件: {sql_path}") - - with open(sql_path, "r", encoding="utf-8") as f: - sql_content = f.read() - - conn = get_connection(with_database=False) - cursor = conn.cursor() - - try: - print(f"\n删除数据库 {DATABASE_NAME}(如果存在)...") - cursor.execute(f"DROP DATABASE IF EXISTS `{DATABASE_NAME}`") - - print(f"创建数据库 {DATABASE_NAME}...") - cursor.execute(f"CREATE DATABASE `{DATABASE_NAME}` CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci") - - print(f"切换到数据库 {DATABASE_NAME}...") - cursor.execute(f"USE `{DATABASE_NAME}`") - - print("\n执行SQL脚本...") - - statements = [] - current_statement = "" - in_comment = False - - for line in sql_content.split("\n"): - stripped = line.strip() - if not stripped: - continue - if stripped.startswith("--"): - continue - if stripped.startswith("/*"): - in_comment = True - continue - if "*/" in stripped: - in_comment = False - continue - if in_comment: - continue - - current_statement += line + "\n" - - if stripped.endswith(";"): - statements.append(current_statement.strip()) - current_statement = "" - - success_count = 0 - table_count = 0 - - for stmt in statements: - if not stmt or stmt.strip() == ";": - continue - try: - cursor.execute(stmt) - conn.commit() - success_count += 1 - if "CREATE TABLE" in stmt.upper(): - table_name = stmt.split("`")[1] if "`" in stmt else "unknown" - print(f" ✓ 创建表: {table_name}") - table_count += 1 - except mysql.connector.Error as e: - if "already exists" not in str(e).lower(): - print(f" ✗ SQL执行错误: {str(e)[:80]}") - - print(f"\n表结构创建完成: {table_count} 张表") - - finally: - cursor.close() - conn.close() - - -def restore_data(): - """从备份文件恢复数据""" - print("\n" + "=" * 60) - print("步骤2: 恢复表数据") - print("=" * 60) - - backup_path = os.path.join(os.path.dirname(__file__), BACKUP_FILE) - print(f"读取备份文件: {backup_path}") - - if not os.path.exists(backup_path): - print(f"备份文件不存在: {backup_path}") - return - - with open(backup_path, "r", encoding="utf-8") as f: - data = json.load(f) - - conn = get_connection(with_database=True) - cursor = conn.cursor() - - try: - # 禁用外键检查 - cursor.execute("SET FOREIGN_KEY_CHECKS = 0") - print("已禁用外键检查\n") - - # 恢复 ai_tags - tags = data.get("ai_tags", []) - print(f"恢复 ai_tags 表 ({len(tags)} 条)...") - - if tags: - for tag in tags: - columns = ", ".join(f"`{k}`" for k in tag.keys()) - placeholders = ", ".join(["%s"] * len(tag)) - sql = f"INSERT INTO ai_tags ({columns}) VALUES ({placeholders})" - try: - cursor.execute(sql, list(tag.values())) - except mysql.connector.Error as e: - print(f" 插入错误: {e}") - conn.commit() - print(f" ✓ ai_tags 恢复完成") - - # 恢复 ai_image_tags - image_tags = data.get("ai_image_tags", []) - print(f"恢复 ai_image_tags 表 ({len(image_tags)} 条)...") - - # 先插入一条URL错误的测试数据(id=1,第一条) - error_url_data = { - "id": 1, - "image_id": 99999, - "image_name": "error_test_image.png", - "image_url": "https://invalid-url-test/not-exist/error_image.png", - "image_thumb_url": "https://invalid-url-test/not-exist/error_image_thumb.png", - "tag_id": 12679, - "tag_name": "#测试错误URL#", - "default_tag_id": 0, - "default_tag_name": "", - "keywords_id": 186, - "keywords_name": "测试", - "department_id": 11, - "department_name": "妇科", - "image_source": 1, - "created_user_id": 0, - "created_at": "2025-01-01T00:00:00", - "updated_at": "2025-01-01T00:00:00", - "image_attached_article_count": 0, - "status": "draft", - "blocking_reason": "", - "similarity": "draft", - "similarity_image_tags_id": 0, - "similarity score": 0.0 - } - columns = ", ".join(f"`{k}`" for k in error_url_data.keys()) - placeholders = ", ".join(["%s"] * len(error_url_data)) - sql = f"INSERT INTO ai_image_tags ({columns}) VALUES ({placeholders})" - try: - cursor.execute(sql, list(error_url_data.values())) - conn.commit() - print(f" ✓ 已插入URL错误的测试数据 (id=1)") - except mysql.connector.Error as e: - print(f" 插入错误URL测试数据失败: {e}") - - if image_tags: - success = 0 - for item in image_tags: - columns = ", ".join(f"`{k}`" for k in item.keys()) - placeholders = ", ".join(["%s"] * len(item)) - sql = f"INSERT INTO ai_image_tags ({columns}) VALUES ({placeholders})" - try: - cursor.execute(sql, list(item.values())) - success += 1 - except mysql.connector.Error as e: - print(f" 插入错误 (id={item.get('id')}): {e}") - conn.commit() - print(f" ✓ ai_image_tags 恢复完成 ({success} 条)") - - # 恢复外键检查 - cursor.execute("SET FOREIGN_KEY_CHECKS = 1") - print("\n已恢复外键检查") - - finally: - cursor.close() - conn.close() - - -def verify_database(): - """验证数据库""" - print("\n" + "=" * 60) - print("步骤3: 验证数据库") - print("=" * 60) - - conn = get_connection(with_database=True) - cursor = conn.cursor() - - try: - cursor.execute("SHOW TABLES") - tables = cursor.fetchall() - print(f"\n数据库共有 {len(tables)} 张表") - - cursor.execute("SELECT COUNT(*) FROM ai_tags") - count = cursor.fetchone()[0] - print(f"ai_tags 表: {count} 条记录") - - cursor.execute("SELECT COUNT(*) FROM ai_image_tags") - count = cursor.fetchone()[0] - print(f"ai_image_tags 表: {count} 条记录") - - # 显示示例数据 - cursor.execute("SELECT id, tag_name, department FROM ai_tags LIMIT 3") - rows = cursor.fetchall() - if rows: - print("\nai_tags 示例:") - for row in rows: - print(f" ID: {row[0]}, 标签: {row[1][:40]}..., 科室: {row[2]}") - - finally: - cursor.close() - conn.close() - - -def backup_current_data(): - """备份当前数据到 backup_data.json""" - print("=" * 60) - print("备份当前数据") - print("=" * 60) - - conn = get_connection(with_database=True) - cursor = conn.cursor(dictionary=True) - - try: - # 导出 ai_tags - cursor.execute('SELECT * FROM ai_tags') - tags = cursor.fetchall() - print(f"ai_tags: {len(tags)} 条") - - # 导出 ai_image_tags - cursor.execute('SELECT * FROM ai_image_tags') - image_tags = cursor.fetchall() - print(f"ai_image_tags: {len(image_tags)} 条") - - # 转换datetime为字符串 - def convert_datetime(obj): - for key, value in obj.items(): - if hasattr(value, 'isoformat'): - obj[key] = value.isoformat() - return obj - - tags = [convert_datetime(t) for t in tags] - image_tags = [convert_datetime(t) for t in image_tags] - - # 保存为JSON - backup_path = os.path.join(os.path.dirname(__file__), BACKUP_FILE) - data = {'ai_tags': tags, 'ai_image_tags': image_tags} - with open(backup_path, 'w', encoding='utf-8') as f: - json.dump(data, f, ensure_ascii=False, indent=2) - - print(f"\n数据已保存到 {backup_path}") - - finally: - cursor.close() - conn.close() - - -def main(): - print("\n" + "=" * 60) - print(" 数据库恢复脚本") - print("=" * 60) - print(f"数据库: {DATABASE_NAME}") - print(f"SQL文件: {SQL_FILE}") - print(f"备份文件: {BACKUP_FILE}") - print("=" * 60) - - print("\n请选择操作:") - print(" 1. 完整恢复 (重建结构 + 恢复数据)") - print(" 2. 仅恢复数据 (保留现有结构)") - print(" 3. 备份当前数据") - print(" 0. 取消") - - choice = input("\n请输入选项 (0-3): ").strip() - - if choice == "1": - confirm = input("\n警告: 此操作将删除并重建数据库!确认? (输入 'yes'): ") - if confirm.lower() == "yes": - rebuild_database_structure() - restore_data() - verify_database() - print("\n" + "=" * 60) - print("数据库恢复完成!") - print("=" * 60) - else: - print("操作已取消") - - elif choice == "2": - restore_data() - verify_database() - print("\n数据恢复完成!") - - elif choice == "3": - backup_current_data() - print("\n备份完成!") - - else: - print("操作已取消") - - -if __name__ == "__main__": - main() diff --git a/start_tag_derive.sh b/start_tag_derive.sh index b749168..182b305 100644 --- a/start_tag_derive.sh +++ b/start_tag_derive.sh @@ -35,7 +35,7 @@ get_all_pids() { } # 循环间隔(秒) -LOOP_INTERVAL=10 +LOOP_INTERVAL=2 # 启动服务(带进程数量控制) start_single() {