From 6f66e7c10f7c38b949402399f0d1d3eab275afb4 Mon Sep 17 00:00:00 2001 From: liangguodong Date: Thu, 5 Feb 2026 18:58:59 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=89=B9=E9=87=8F=E5=A4=84=E7=90=86?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F(10=E5=BC=A0/=E8=AF=B7=E6=B1=82)=20+=20?= =?UTF-8?q?=E5=86=85=E5=AE=B9=E5=AE=A1=E6=A0=B8=E5=A4=B1=E8=B4=A5=E5=A4=84?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ai_article.sql | 5 +- config/settings.py | 2 +- database_config.py | 5 + derive_results.json | 207 +---------------------------------------- image_tag_derive.py | 220 +++++++++++++++++++++++++++++++------------- logger.py | 2 +- restore_database.py | 36 ++++++++ test_db_select.py | 73 +++++++++++++++ 8 files changed, 276 insertions(+), 274 deletions(-) create mode 100644 test_db_select.py diff --git a/ai_article.sql b/ai_article.sql index 0a40562..c1c05ec 100644 --- a/ai_article.sql +++ b/ai_article.sql @@ -241,7 +241,8 @@ CREATE TABLE `ai_image_tags` ( `blocking_reason` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '审核不通过原因', `similarity` enum('draft','yes','calc','recalc') CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT 'draft' COMMENT 'yes=是相似|calc=已计算|recalc=需要重新计算', `similarity_image_tags_id` int NOT NULL DEFAULT 0 COMMENT 'yes=是相似|把image_tags_id写入', - `similarity score` float NOT NULL DEFAULT 0 COMMENT '相似时候,计算相似度值', + `similarity_score` float NOT NULL DEFAULT 0 COMMENT '相似时候,计算相似度值', + `automated_review_failed_reason` varchar(64) NOT NULL DEFAULT '' COMMENT '千问大模型审核图不符合原因', PRIMARY KEY (`id`) USING BTREE, UNIQUE INDEX `uk_image_tag`(`image_id` ASC, `tag_id` ASC) USING BTREE, INDEX `tag_id`(`tag_id` ASC) USING BTREE, @@ -932,7 +933,7 @@ CREATE TABLE `baidu_keyword` ( `similarity` enum('draft','yes','calc','recalc') CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT 'draft' COMMENT 'yes=是相似|calc=已计算|recalc=需要重新计算', `similarity_query` int NOT NULL DEFAULT 0 COMMENT 'yes=是相似|把query_id写入', `similarity_query_keyword` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT 'yes=是相似|把query写入', - `similarity score` float NOT NULL DEFAULT 0 COMMENT '相似时候,计算相似度值', + `similarity_score` float NOT NULL DEFAULT 0 COMMENT '相似时候,计算相似度值', `reviewed_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '审核日期', `fast_track` tinyint(1) NOT NULL DEFAULT 0 COMMENT '加急|0=否|1=是', `automated_review_failed_reason` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '千问大模型审核query不符合原因', diff --git a/config/settings.py b/config/settings.py index f012e74..00e4258 100644 --- a/config/settings.py +++ b/config/settings.py @@ -36,7 +36,7 @@ class QwenConfig: class TagDeriveConfig: """标签衍生配置""" batch_size: int = 50 # 每批次从数据库读取的图片数量 - concurrency: int = 10 # 并发请求数(同时发出的API请求数) + concurrency: int = 2 # 并发请求数(同时发出的API请求数) min_derived_tags: int = 5 # 最少衍生标签数 max_derived_tags: int = 10 # 最多衍生标签数 max_tag_length: int = 10 # 单个标签最大长度 diff --git a/database_config.py b/database_config.py index 3c6729e..ba46eb9 100644 --- a/database_config.py +++ b/database_config.py @@ -9,10 +9,14 @@ import mysql.connector from mysql.connector import pooling from contextlib import contextmanager from typing import List, Dict, Any, Optional +import time # 导入统一配置 from config.settings import settings +# 数据库操作间隔(s),防止hang住 +DB_SLEEP = 0.01 + class DatabaseManager: """数据库管理器 - 单例模式""" @@ -67,6 +71,7 @@ class DatabaseManager: finally: cursor.close() conn.close() + time.sleep(DB_SLEEP) # 防止数据库hang住 def execute_query(self, sql: str, params: tuple = None) -> List[Dict[str, Any]]: """执行查询SQL,返回结果列表""" diff --git a/derive_results.json b/derive_results.json index 6c59064..0637a08 100644 --- a/derive_results.json +++ b/derive_results.json @@ -1,206 +1 @@ -[ - { - "success": true, - "image_id": 16495, - "original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#", - "derived_tags": [ - "阴道炎", - "白带异常", - "妇科感染", - "盆腔炎", - "宫颈炎", - "分泌物增多", - "抗炎治疗", - "妇科检查" - ], - "merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##阴道炎##白带异常##妇科感染##盆腔炎#", - "new_tag_id": 13434 - }, - { - "success": true, - "image_id": 16508, - "original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#", - "derived_tags": [ - "阴道炎", - "宫颈炎", - "盆腔炎", - "白带异常", - "外阴瘙痒", - "妇科检查", - "抗炎治疗", - "女性健康" - ], - "merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##阴道炎##宫颈炎##盆腔炎##白带异常#", - "new_tag_id": 13435 - }, - { - "success": true, - "image_id": 16506, - "original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#", - "derived_tags": [ - "阴道炎", - "盆腔炎", - "白带异常", - "妇科检查", - "抗生素治疗", - "私处护理", - "月经不调", - "感染预防" - ], - "merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##阴道炎##盆腔炎##白带异常##妇科检查#", - "new_tag_id": 13436 - }, - { - "success": true, - "image_id": 16503, - "original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#", - "derived_tags": [ - "阴道炎", - "宫颈炎", - "盆腔炎", - "白带异常", - "外阴瘙痒", - "妇科感染", - "炎症治疗", - "妇科疾病" - ], - "merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##阴道炎##宫颈炎##盆腔炎##白带异常#", - "new_tag_id": 13435 - }, - { - "success": true, - "image_id": 16515, - "original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#", - "derived_tags": [ - "妇科疾病", - "阴道炎", - "盆腔炎", - "白带异常", - "月经不调", - "抗炎治疗", - "个人卫生", - "免疫力" - ], - "merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##妇科疾病##阴道炎##盆腔炎##白带异常#", - "new_tag_id": 13437 - }, - { - "success": true, - "image_id": 16512, - "original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#", - "derived_tags": [ - "阴道炎", - "盆腔炎", - "白带异常", - "月经不调", - "抗生素治疗", - "妇科检查", - "免疫力下降", - "激素变化" - ], - "merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##阴道炎##盆腔炎##白带异常##月经不调#", - "new_tag_id": 13438 - }, - { - "success": true, - "image_id": 16514, - "original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#", - "derived_tags": [ - "妇科疾病", - "阴道炎", - "盆腔炎", - "白带异常", - "抗生素治疗", - "个人卫生", - "免疫力下降", - "月经不调" - ], - "merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##妇科疾病##阴道炎##盆腔炎##白带异常#", - "new_tag_id": 13437 - }, - { - "success": true, - "image_id": 16513, - "original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#", - "derived_tags": [ - "阴道炎", - "宫颈炎", - "盆腔炎", - "白带异常", - "瘙痒", - "抗生素治疗", - "个人卫生", - "妇科检查" - ], - "merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##阴道炎##宫颈炎##盆腔炎##白带异常#", - "new_tag_id": 13435 - }, - { - "success": true, - "image_id": 16496, - "original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#", - "derived_tags": [ - "阴道炎", - "宫颈炎", - "盆腔炎", - "白带异常", - "抗生素治疗", - "妇科检查", - "免疫力下降", - "性传播疾病" - ], - "merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##阴道炎##宫颈炎##盆腔炎##白带异常#", - "new_tag_id": 13435 - }, - { - "success": true, - "image_id": 16500, - "original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#", - "derived_tags": [ - "妇科疾病", - "阴道炎", - "盆腔炎", - "白带异常", - "私处护理", - "抗生素治疗", - "免疫力提升", - "月经不调" - ], - "merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##妇科疾病##阴道炎##盆腔炎##白带异常#", - "new_tag_id": 13437 - }, - { - "success": true, - "image_id": 16516, - "original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#", - "derived_tags": [ - "更年期症状", - "激素变化", - "月经紊乱", - "潮热出汗", - "骨质疏松", - "情绪波动", - "妇科保健", - "内分泌失调" - ], - "merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##更年期症状##激素变化##月经紊乱##潮热出汗#", - "new_tag_id": 13439 - }, - { - "success": true, - "image_id": 16518, - "original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#", - "derived_tags": [ - "阴道炎", - "盆腔炎", - "白带异常", - "抗生素治疗", - "个人卫生", - "妇科检查", - "免疫力下降", - "性传播疾病" - ], - "merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##阴道炎##盆腔炎##白带异常##抗生素治疗#", - "new_tag_id": 13440 - } -] \ No newline at end of file +[] \ No newline at end of file diff --git a/image_tag_derive.py b/image_tag_derive.py index 028c988..a505aa3 100644 --- a/image_tag_derive.py +++ b/image_tag_derive.py @@ -30,27 +30,38 @@ dashscope.api_key = settings.qwen.api_key # ============== Prompt模板 ============== -# 单张图片的Prompt -SINGLE_DERIVE_PROMPT = """你是一个专业的医疗健康内容标签分析专家。 +# 批量图片的Prompt(10张一批) +BATCH_DERIVE_PROMPT = """你是一个专业的医疗健康内容标签分析专家。 ## 任务 -我提供了一张医疗健康相关图片,原始标签为「{original_tag}」。请分析图片内容,生成衍生标签。 +我提供了{count}张医疗健康相关图片,每张图片都有对应的序号、原始标签和关键字。请逐一分析每张图片,为每张图片生成衍生标签。 + +## 图片信息 +{image_info} ## 要求 -1. 分析图片内容,结合原始标签 -2. 生成 5-8 个衍生标签 +1. 分别分析每张图片内容,结合其原始标签和关键字 +2. 每张图片生成 5-8 个衍生标签 3. 衍生标签包括:同义词、上位概念、下位概念、相关症状/治疗等 4. 标签简洁,每个不超过10个字 ## 输出格式 -请严格以JSON格式输出: +请严格以JSON格式输出,包含每张图片的衍生结果: ```json -{{"derived_tags": ["衍生1", "衍生2", "衍生3", "衍生4", "衍生5"]}} +{{ + "results": [ + {{"id": 图片ID, "derived_tags": ["衍生1", "衍生2", "衍生3"]}}, + {{"id": 图片ID, "derived_tags": ["衍生1", "衍生2", "衍生3"]}} + ] +}} ``` -注意:只输出JSON,不要输出其他内容。 +注意:只输出JSON,不要输出其他内容。results数组的顺序和数量必须与输入图片一致。 """ +# 每批图片数量 +BATCH_IMAGE_COUNT = 10 + class TagsDAO: def __init__(self): @@ -110,20 +121,43 @@ def merge_tags(original_tag: str, derived_tags: List[str], max_total_tags: int = @retry(max_retries=settings.qwen.max_retries, delay=settings.qwen.retry_delay, backoff=2.0) -def derive_tags_single(item: Dict) -> Dict: +def derive_tags_batch(items: List[Dict]) -> List[Dict]: """ - 单张图片调用千问大模型获取衍生标签 - item: {"id": 1, "image_url": "...", "tag_name": "高血压", ...} - 返回: {"success": True/False, "item": item, "derived_tags": [...], "error": "..."} + 批量调用千问大模型获取衍生标签(10张图片一个请求) + items: [{"id": 1, "image_url": "...", "default_tag_name": "高血压", ...}, ...] + 返回: [{"success": True/False, "item": item, "derived_tags": [...], "error": "..."}, ...] """ - logger.debug(f" 处理 ID:{item['id']} - {item['tag_name']}") + if not items: + return [] - prompt = SINGLE_DERIVE_PROMPT.format(original_tag=item['tag_name']) + ids = [item['id'] for item in items] + logger.info(f" 批量处理 {len(items)} 张图片, IDs: {ids}") - content = [ - {"image": item['image_url']}, - {"text": prompt} - ] + # 构建图片信息文本 + image_info_lines = [] + for i, item in enumerate(items, 1): + keywords = item.get('keywords_name', '') + # 如果关键字为"废止",提示大模型忽略 + if keywords == '废止': + keywords_desc = "(关键字已废止,请忽略)" + else: + keywords_desc = f"关键字「{keywords}」" if keywords else "关键字「无」" + + image_info_lines.append( + f"图片{i}: ID={item['id']}, 原始标签「{item['default_tag_name']}」, {keywords_desc}" + ) + image_info = "\n".join(image_info_lines) + + prompt = BATCH_DERIVE_PROMPT.format( + count=len(items), + image_info=image_info + ) + + # 构建多图片内容 + content = [] + for item in items: + content.append({"image": item['image_url']}) + content.append({"text": prompt}) messages = [{"role": "user", "content": content}] @@ -132,9 +166,11 @@ def derive_tags_single(item: Dict) -> Dict: messages=messages ) + results = [] + if response.status_code == HTTPStatus.OK: result_text = response.output.choices[0].message.content[0]["text"] - logger.debug(f" ID:{item['id']} 原始响应: {result_text[:200]}...") + logger.debug(f" 批量响应: {result_text[:300]}...") try: json_start = result_text.find('{') @@ -142,26 +178,54 @@ def derive_tags_single(item: Dict) -> Dict: if json_start != -1 and json_end > json_start: json_str = result_text[json_start:json_end] result_json = json.loads(json_str) - derived_tags = result_json.get('derived_tags', []) - if not derived_tags: - logger.warning(f" ID:{item['id']} 返回JSON中无derived_tags字段: {json_str[:100]}") - return {"success": True, "item": item, "derived_tags": derived_tags} + batch_results = result_json.get('results', []) + + # 按ID匹配结果 + result_map = {r.get('id'): r.get('derived_tags', []) for r in batch_results} + + for item in items: + derived_tags = result_map.get(item['id'], []) + if derived_tags: + results.append({"success": True, "item": item, "derived_tags": derived_tags}) + else: + # 尝试按顺序匹配 + idx = items.index(item) + if idx < len(batch_results): + derived_tags = batch_results[idx].get('derived_tags', []) + results.append({"success": True, "item": item, "derived_tags": derived_tags}) + else: + results.append({"success": False, "item": item, "error": "未匹配到衍生标签"}) else: - logger.warning(f" ID:{item['id']} 未找到JSON内容: {result_text[:200]}") - return {"success": False, "item": item, "error": "未找到JSON内容"} + logger.warning(f" 批量处理未找到JSON内容: {result_text[:200]}") + for item in items: + results.append({"success": False, "item": item, "error": "未找到JSON内容"}) except json.JSONDecodeError as e: - logger.error(f" ID:{item['id']} JSON解析失败: {e}, 内容: {result_text[:200]}") - - return {"success": False, "item": item, "error": "JSON解析失败"} + logger.error(f" 批量处理JSON解析失败: {e}, 内容: {result_text[:200]}") + for item in items: + results.append({"success": False, "item": item, "error": "JSON解析失败"}) else: error_msg = f"{response.code}-{response.message}" - logger.error(f" ID:{item['id']} API调用失败: {error_msg}") + logger.error(f" 批量处理API调用失败: {error_msg}") + + # 检测内容审核失败(不重试,直接标记) + if 'DataInspectionFailed' in error_msg or 'inappropriate content' in error_msg: + for item in items: + results.append({ + "success": False, + "item": item, + "error": "内容审核不通过", + "is_content_review_failed": True + }) + return results + raise Exception(error_msg) # 抛出异常触发重试 + + return results def process_batch(items: List[Dict], tags_dao: TagsDAO, concurrency: int = None) -> List[Dict]: """ - 并发处理一批图片 + 处理一批图片(每10张一个请求,多请求并发执行) Args: items: 要处理的图片列表 @@ -171,43 +235,60 @@ def process_batch(items: List[Dict], tags_dao: TagsDAO, concurrency: int = None) if concurrency is None: concurrency = settings.tag_derive.concurrency - logger.info(f"[处理批次] {len(items)} 张图片,并发数: {concurrency}") + logger.info(f"[处理批次] {len(items)} 张图片,每{BATCH_IMAGE_COUNT}张一个请求,并发数: {concurrency}") db = get_db() process_results = [] api_results = [] - # 1. 并发调用大模型(按并发数限制) + # 1. 按 BATCH_IMAGE_COUNT 分组 + sub_batches = [items[i:i+BATCH_IMAGE_COUNT] for i in range(0, len(items), BATCH_IMAGE_COUNT)] + logger.info(f" 分成 {len(sub_batches)} 个子批次") + + # 2. 并发调用大模型(按并发数限制) with ThreadPoolExecutor(max_workers=concurrency) as executor: - # 提交所有任务 - future_to_item = { - executor.submit(derive_tags_single, item): item - for item in items + # 提交所有批量任务 + future_to_batch = { + executor.submit(derive_tags_batch, sub_batch): sub_batch + for sub_batch in sub_batches } # 收集结果 - for future in as_completed(future_to_item): - item = future_to_item[future] + for future in as_completed(future_to_batch): + sub_batch = future_to_batch[future] try: - result = future.result() - api_results.append(result) + batch_results = future.result() + api_results.extend(batch_results) except Exception as e: - logger.error(f" ID:{item['id']} 处理异常: {e}") - api_results.append({ - "success": False, - "item": item, - "error": str(e) - }) + logger.error(f" 子批次处理异常: {e}") + for item in sub_batch: + api_results.append({ + "success": False, + "item": item, + "error": str(e) + }) - # 2. 逐个处理结果并更新数据库 + # 3. 逐个处理结果并更新数据库 for result in api_results: item = result.get('item', {}) if not result.get('success'): + error_msg = result.get('error', '未知错误') + + # 内容审核失败:更新状态和原因 + if result.get('is_content_review_failed'): + try: + sql = "UPDATE ai_image_tags SET status = 'automated_review_failed', automated_review_failed_reason = %s WHERE id = %s" + db.execute_update(sql, (error_msg[:64], item['id'])) + logger.warning(f" ✗ ID:{item['id']} -> status: automated_review_failed") + except Exception as e: + logger.error(f" 更新审核失败状态异常: {e}") + process_results.append({ "success": False, "image_id": item.get('id'), - "error": result.get('error', '未知错误') + "error": error_msg, + "is_content_review_failed": result.get('is_content_review_failed', False) }) continue @@ -221,15 +302,17 @@ def process_batch(items: List[Dict], tags_dao: TagsDAO, concurrency: int = None) }) continue - logger.info(f" [{item['tag_name']}] 衍生: {derived_tags}") + logger.info(f" [{item['default_tag_name']}] 衍生: {derived_tags}") - # 合并标签(限制总标签数量) + # 只用衍生标签(不包含原始标签) max_total = getattr(settings.tag_derive, 'max_total_tags', None) - merged_tag_name = merge_tags(item['tag_name'], derived_tags, max_total_tags=max_total) + if max_total: + derived_tags = derived_tags[:max_total] + derived_tag_name = ''.join([f'#{t}#' for t in derived_tags if t]) # 插入ai_tags try: - new_tag_id = tags_dao.get_or_create(merged_tag_name, None, item.get('department_name', '')) + new_tag_id = tags_dao.get_or_create(derived_tag_name, None, item.get('department_name', '')) except Exception as e: process_results.append({"success": False, "image_id": item['id'], "error": str(e)}) continue @@ -237,13 +320,13 @@ def process_batch(items: List[Dict], tags_dao: TagsDAO, concurrency: int = None) # 更新ai_image_tags(包括 tag_id, tag_name, status) try: sql = "UPDATE ai_image_tags SET tag_id = %s, tag_name = %s, status = 'manual_review' WHERE id = %s" - db.execute_update(sql, (new_tag_id, merged_tag_name, item['id'])) + db.execute_update(sql, (new_tag_id, derived_tag_name, item['id'])) process_results.append({ "success": True, "image_id": item['id'], - "original_tag": item['tag_name'], + "original_tag": item['default_tag_name'], "derived_tags": derived_tags, - "merged_tag": merged_tag_name, + "derived_tag_name": derived_tag_name, "new_tag_id": new_tag_id }) logger.info(f" ✓ ID:{item['id']} -> tag_id:{new_tag_id}, status -> manual_review") @@ -253,7 +336,7 @@ def process_batch(items: List[Dict], tags_dao: TagsDAO, concurrency: int = None) return process_results -def batch_derive_tags(batch_size: int = None, concurrency: int = None, start_id: int = None, end_id: int = None, ids: List[int] = None) -> List[Dict]: +def batch_derive_tags(batch_size: int = None, concurrency: int = None, start_id: int = None, end_id: int = None, ids: List[int] = None, limit: int = None) -> List[Dict]: """ 分批处理图片标签衍生 @@ -263,6 +346,7 @@ def batch_derive_tags(batch_size: int = None, concurrency: int = None, start_id: start_id: 起始ID,从该ID开始处理(用于断点续传) end_id: 结束ID,处理到该ID为止 ids: 指定ID列表,只处理这些ID + limit: 限制处理的总数量(用于测试) """ if batch_size is None: batch_size = settings.tag_derive.batch_size @@ -277,10 +361,10 @@ def batch_derive_tags(batch_size: int = None, concurrency: int = None, start_id: # 按指定ID查询(查询 status='tag_extension' 的记录) placeholders = ','.join(['%s'] * len(ids)) sql = f""" - SELECT it.id, it.image_thumb_url, it.tag_id, it.tag_name, it.department_name + SELECT it.id, it.image_url as db_image_url, it.tag_id, it.tag_name, it.default_tag_name, it.keywords_name, it.department_name FROM ai_image_tags it WHERE it.id IN ({placeholders}) - AND it.image_thumb_url != '' AND it.tag_name != '' + AND it.image_url != '' AND it.default_tag_name != '' AND it.status = 'tag_extension' ORDER BY it.id """ @@ -288,9 +372,9 @@ def batch_derive_tags(batch_size: int = None, concurrency: int = None, start_id: else: # 按条件查询 status='tag_extension' 的记录 sql = """ - SELECT it.id, it.image_thumb_url, it.tag_id, it.tag_name, it.department_name + SELECT it.id, it.image_url as db_image_url, it.tag_id, it.tag_name, it.default_tag_name, it.keywords_name, it.department_name FROM ai_image_tags it - WHERE it.image_thumb_url != '' AND it.tag_name != '' + WHERE it.image_url != '' AND it.default_tag_name != '' AND it.status = 'tag_extension' """ @@ -304,6 +388,10 @@ def batch_derive_tags(batch_size: int = None, concurrency: int = None, start_id: sql += " ORDER BY it.id" + # 添加 LIMIT 限制 + if limit: + sql += f" LIMIT {int(limit)}" + items = db.execute_query(sql, params) if params else db.execute_query(sql) if not items: @@ -312,8 +400,8 @@ def batch_derive_tags(batch_size: int = None, concurrency: int = None, start_id: # 拼接完整图片URL for item in items: - if item.get('image_thumb_url'): - item['image_url'] = settings.tag_derive.image_cdn_base + item['image_thumb_url'] + if item.get('db_image_url'): + item['image_url'] = settings.tag_derive.image_cdn_base + item['db_image_url'] else: item['image_url'] = '' @@ -353,7 +441,7 @@ def print_summary(results: List[Dict]): logger.info("详细结果:") for r in results: if r.get('success'): - logger.info(f" [ID:{r['image_id']}] {r['original_tag']} -> {r['merged_tag'][:40]}...") + logger.info(f" [ID:{r['image_id']}] {r['original_tag']} -> {r['derived_tag_name'][:40]}...") else: logger.warning(f" [ID:{r.get('image_id')}] 失败: {r.get('error')}") @@ -367,6 +455,7 @@ def main(): parser.add_argument('--batch-size', type=int, default=None, help='每批次从数据库读取的图片数量') parser.add_argument('--concurrency', type=int, default=None, help='并发请求数(同时发出的API请求数)') parser.add_argument('--id', type=int, nargs='+', default=None, help='指定ID,只处理这些ID(可指定多个)') + parser.add_argument('--limit', type=int, default=None, help='限制处理的总数量(用于测试,如 --limit 10)') args = parser.parse_args() batch_size = args.batch_size or settings.tag_derive.batch_size @@ -375,6 +464,8 @@ def main(): logger.info("=" * 60) logger.info("千问视觉大模型 - 图片标签衍生生成器") logger.info(f"模式: 每批 {batch_size} 张,并发 {concurrency} 个请求") + if args.limit: + logger.info(f"测试模式: 只处理 {args.limit} 条") if args.id: logger.info(f"指定ID: {args.id}") elif args.start_id or args.end_id: @@ -387,7 +478,8 @@ def main(): concurrency=args.concurrency, start_id=args.start_id, end_id=args.end_id, - ids=args.id + ids=args.id, + limit=args.limit ) if results: diff --git a/logger.py b/logger.py index f6e67f1..7740ae0 100644 --- a/logger.py +++ b/logger.py @@ -11,7 +11,7 @@ from datetime import datetime from typing import Optional # 日志目录 -LOG_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "logs") +LOG_DIR = os.path.join(os.path.dirname(__file__), "logs") def setup_logger( diff --git a/restore_database.py b/restore_database.py index 5a88aae..be7f75f 100644 --- a/restore_database.py +++ b/restore_database.py @@ -152,6 +152,42 @@ def restore_data(): image_tags = data.get("ai_image_tags", []) print(f"恢复 ai_image_tags 表 ({len(image_tags)} 条)...") + # 先插入一条URL错误的测试数据(id=1,第一条) + error_url_data = { + "id": 1, + "image_id": 99999, + "image_name": "error_test_image.png", + "image_url": "https://invalid-url-test/not-exist/error_image.png", + "image_thumb_url": "https://invalid-url-test/not-exist/error_image_thumb.png", + "tag_id": 12679, + "tag_name": "#测试错误URL#", + "default_tag_id": 0, + "default_tag_name": "", + "keywords_id": 186, + "keywords_name": "测试", + "department_id": 11, + "department_name": "妇科", + "image_source": 1, + "created_user_id": 0, + "created_at": "2025-01-01T00:00:00", + "updated_at": "2025-01-01T00:00:00", + "image_attached_article_count": 0, + "status": "draft", + "blocking_reason": "", + "similarity": "draft", + "similarity_image_tags_id": 0, + "similarity score": 0.0 + } + columns = ", ".join(f"`{k}`" for k in error_url_data.keys()) + placeholders = ", ".join(["%s"] * len(error_url_data)) + sql = f"INSERT INTO ai_image_tags ({columns}) VALUES ({placeholders})" + try: + cursor.execute(sql, list(error_url_data.values())) + conn.commit() + print(f" ✓ 已插入URL错误的测试数据 (id=1)") + except mysql.connector.Error as e: + print(f" 插入错误URL测试数据失败: {e}") + if image_tags: success = 0 for item in image_tags: diff --git a/test_db_select.py b/test_db_select.py new file mode 100644 index 0000000..bdb8295 --- /dev/null +++ b/test_db_select.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +""" +数据库 SELECT 测试脚本 +测试数据库连接和 sleep 是否生效 +""" + +import time +from database_config import get_db, DB_SLEEP + + +def test_select(): + """测试 SELECT 命中""" + print("=" * 50) + print("数据库 SELECT 测试") + print(f"DB_SLEEP = {DB_SLEEP}s") + print("=" * 50) + + db = get_db() + + # 测试1: 简单连接 + print("\n[1] 测试连接...") + result = db.execute_one("SELECT 1 as test") + print(f"✓ 连接成功: {result}") + + # 测试2: 多次查询,验证 sleep + print(f"\n[2] 执行5次 SELECT,验证 sleep...") + start_time = time.time() + + for i in range(5): + result = db.execute_one("SELECT COUNT(*) as cnt FROM ai_image_tags") + print(f" 第{i+1}次: {result['cnt']} 条记录") + + elapsed = time.time() - start_time + expected_sleep = DB_SLEEP * 5 + print(f"\n 总耗时: {elapsed:.3f}s") + print(f" 预期 sleep: {expected_sleep:.3f}s") + print("✓ SELECT 测试通过") + + # 测试3: 查询 ai_image_tags + print("\n[3] 查询 ai_image_tags 前3条...") + results = db.execute_query( + "SELECT id, tag_name, department_name FROM ai_image_tags ORDER BY id DESC LIMIT 3" + ) + for row in results: + print(f" ID: {row['id']}, 科室: {row['department_name']}, 标签: {row['tag_name'][:30]}...") + + # 测试4: 查询 ai_tags + print("\n[4] 查询 ai_tags 表...") + result = db.execute_one("SELECT COUNT(*) as cnt FROM ai_tags") + print(f" ai_tags 总记录数: {result['cnt']}") + + results = db.execute_query( + "SELECT id, tag_name, tag_category, department, status FROM ai_tags ORDER BY id DESC LIMIT 3" + ) + print(" 前3条记录:") + for row in results: + tag_name = row['tag_name'][:30] if row['tag_name'] else '' + print(f" ID: {row['id']}, 分类: {row['tag_category']}, 科室: {row['department']}, 标签: {tag_name}...") + + # 测试5: ai_tags 状态统计 + print("\n[5] ai_tags 状态统计...") + results = db.execute_query( + "SELECT status, COUNT(*) as cnt FROM ai_tags GROUP BY status" + ) + for row in results: + print(f" {row['status']}: {row['cnt']} 条") + + print("\n" + "=" * 50) + print("测试完成!") + + +if __name__ == "__main__": + test_select()