feat: 批量处理模式(10张/请求) + 内容审核失败处理

This commit is contained in:
2026-02-05 18:58:59 +08:00
parent a6f203a3e3
commit 6f66e7c10f
8 changed files with 276 additions and 274 deletions

View File

@@ -241,7 +241,8 @@ CREATE TABLE `ai_image_tags` (
`blocking_reason` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '审核不通过原因',
`similarity` enum('draft','yes','calc','recalc') CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT 'draft' COMMENT 'yes=是相似|calc=已计算|recalc=需要重新计算',
`similarity_image_tags_id` int NOT NULL DEFAULT 0 COMMENT 'yes=是相似|把image_tags_id写入',
`similarity score` float NOT NULL DEFAULT 0 COMMENT '相似时候,计算相似度值',
`similarity_score` float NOT NULL DEFAULT 0 COMMENT '相似时候,计算相似度值',
`automated_review_failed_reason` varchar(64) NOT NULL DEFAULT '' COMMENT '千问大模型审核图不符合原因',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `uk_image_tag`(`image_id` ASC, `tag_id` ASC) USING BTREE,
INDEX `tag_id`(`tag_id` ASC) USING BTREE,
@@ -932,7 +933,7 @@ CREATE TABLE `baidu_keyword` (
`similarity` enum('draft','yes','calc','recalc') CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT 'draft' COMMENT 'yes=是相似|calc=已计算|recalc=需要重新计算',
`similarity_query` int NOT NULL DEFAULT 0 COMMENT 'yes=是相似|把query_id写入',
`similarity_query_keyword` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT 'yes=是相似|把query写入',
`similarity score` float NOT NULL DEFAULT 0 COMMENT '相似时候,计算相似度值',
`similarity_score` float NOT NULL DEFAULT 0 COMMENT '相似时候,计算相似度值',
`reviewed_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '审核日期',
`fast_track` tinyint(1) NOT NULL DEFAULT 0 COMMENT '加急|0=否|1=是',
`automated_review_failed_reason` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '千问大模型审核query不符合原因',

View File

@@ -36,7 +36,7 @@ class QwenConfig:
class TagDeriveConfig:
"""标签衍生配置"""
batch_size: int = 50 # 每批次从数据库读取的图片数量
concurrency: int = 10 # 并发请求数同时发出的API请求数
concurrency: int = 2 # 并发请求数同时发出的API请求数
min_derived_tags: int = 5 # 最少衍生标签数
max_derived_tags: int = 10 # 最多衍生标签数
max_tag_length: int = 10 # 单个标签最大长度

View File

@@ -9,10 +9,14 @@ import mysql.connector
from mysql.connector import pooling
from contextlib import contextmanager
from typing import List, Dict, Any, Optional
import time
# 导入统一配置
from config.settings import settings
# 数据库操作间隔(s)防止hang住
DB_SLEEP = 0.01
class DatabaseManager:
"""数据库管理器 - 单例模式"""
@@ -67,6 +71,7 @@ class DatabaseManager:
finally:
cursor.close()
conn.close()
time.sleep(DB_SLEEP) # 防止数据库hang住
def execute_query(self, sql: str, params: tuple = None) -> List[Dict[str, Any]]:
"""执行查询SQL返回结果列表"""

View File

@@ -1,206 +1 @@
[
{
"success": true,
"image_id": 16495,
"original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#",
"derived_tags": [
"阴道炎",
"白带异常",
"妇科感染",
"盆腔炎",
"宫颈炎",
"分泌物增多",
"抗炎治疗",
"妇科检查"
],
"merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##阴道炎##白带异常##妇科感染##盆腔炎#",
"new_tag_id": 13434
},
{
"success": true,
"image_id": 16508,
"original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#",
"derived_tags": [
"阴道炎",
"宫颈炎",
"盆腔炎",
"白带异常",
"外阴瘙痒",
"妇科检查",
"抗炎治疗",
"女性健康"
],
"merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##阴道炎##宫颈炎##盆腔炎##白带异常#",
"new_tag_id": 13435
},
{
"success": true,
"image_id": 16506,
"original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#",
"derived_tags": [
"阴道炎",
"盆腔炎",
"白带异常",
"妇科检查",
"抗生素治疗",
"私处护理",
"月经不调",
"感染预防"
],
"merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##阴道炎##盆腔炎##白带异常##妇科检查#",
"new_tag_id": 13436
},
{
"success": true,
"image_id": 16503,
"original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#",
"derived_tags": [
"阴道炎",
"宫颈炎",
"盆腔炎",
"白带异常",
"外阴瘙痒",
"妇科感染",
"炎症治疗",
"妇科疾病"
],
"merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##阴道炎##宫颈炎##盆腔炎##白带异常#",
"new_tag_id": 13435
},
{
"success": true,
"image_id": 16515,
"original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#",
"derived_tags": [
"妇科疾病",
"阴道炎",
"盆腔炎",
"白带异常",
"月经不调",
"抗炎治疗",
"个人卫生",
"免疫力"
],
"merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##妇科疾病##阴道炎##盆腔炎##白带异常#",
"new_tag_id": 13437
},
{
"success": true,
"image_id": 16512,
"original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#",
"derived_tags": [
"阴道炎",
"盆腔炎",
"白带异常",
"月经不调",
"抗生素治疗",
"妇科检查",
"免疫力下降",
"激素变化"
],
"merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##阴道炎##盆腔炎##白带异常##月经不调#",
"new_tag_id": 13438
},
{
"success": true,
"image_id": 16514,
"original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#",
"derived_tags": [
"妇科疾病",
"阴道炎",
"盆腔炎",
"白带异常",
"抗生素治疗",
"个人卫生",
"免疫力下降",
"月经不调"
],
"merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##妇科疾病##阴道炎##盆腔炎##白带异常#",
"new_tag_id": 13437
},
{
"success": true,
"image_id": 16513,
"original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#",
"derived_tags": [
"阴道炎",
"宫颈炎",
"盆腔炎",
"白带异常",
"瘙痒",
"抗生素治疗",
"个人卫生",
"妇科检查"
],
"merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##阴道炎##宫颈炎##盆腔炎##白带异常#",
"new_tag_id": 13435
},
{
"success": true,
"image_id": 16496,
"original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#",
"derived_tags": [
"阴道炎",
"宫颈炎",
"盆腔炎",
"白带异常",
"抗生素治疗",
"妇科检查",
"免疫力下降",
"性传播疾病"
],
"merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##阴道炎##宫颈炎##盆腔炎##白带异常#",
"new_tag_id": 13435
},
{
"success": true,
"image_id": 16500,
"original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#",
"derived_tags": [
"妇科疾病",
"阴道炎",
"盆腔炎",
"白带异常",
"私处护理",
"抗生素治疗",
"免疫力提升",
"月经不调"
],
"merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##妇科疾病##阴道炎##盆腔炎##白带异常#",
"new_tag_id": 13437
},
{
"success": true,
"image_id": 16516,
"original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#",
"derived_tags": [
"更年期症状",
"激素变化",
"月经紊乱",
"潮热出汗",
"骨质疏松",
"情绪波动",
"妇科保健",
"内分泌失调"
],
"merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##更年期症状##激素变化##月经紊乱##潮热出汗#",
"new_tag_id": 13439
},
{
"success": true,
"image_id": 16518,
"original_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办#",
"derived_tags": [
"阴道炎",
"盆腔炎",
"白带异常",
"抗生素治疗",
"个人卫生",
"妇科检查",
"免疫力下降",
"性传播疾病"
],
"merged_tag": "#妇科炎症##妇科炎症原因##妇科炎症治疗##妇科炎症怎么办##阴道炎##盆腔炎##白带异常##抗生素治疗#",
"new_tag_id": 13440
}
]
[]

View File

@@ -30,27 +30,38 @@ dashscope.api_key = settings.qwen.api_key
# ============== Prompt模板 ==============
# 单张图片的Prompt
SINGLE_DERIVE_PROMPT = """你是一个专业的医疗健康内容标签分析专家。
# 批量图片的Prompt10张一批
BATCH_DERIVE_PROMPT = """你是一个专业的医疗健康内容标签分析专家。
## 任务
我提供了一张医疗健康相关图片,原始标签为「{original_tag}」。请分析图片内容,生成衍生标签。
我提供了{count}张医疗健康相关图片,每张图片都有对应的序号、原始标签和关键字。请逐一分析每张图片,为每张图片生成衍生标签。
## 图片信息
{image_info}
## 要求
1. 分图片内容,结合原始标签
2. 生成 5-8 个衍生标签
1. 分别分析每张图片内容,结合原始标签和关键字
2. 每张图片生成 5-8 个衍生标签
3. 衍生标签包括:同义词、上位概念、下位概念、相关症状/治疗等
4. 标签简洁每个不超过10个字
## 输出格式
请严格以JSON格式输出
请严格以JSON格式输出,包含每张图片的衍生结果
```json
{{"derived_tags": ["衍生1", "衍生2", "衍生3", "衍生4", "衍生5"]}}
{{
"results": [
{{"id": 图片ID, "derived_tags": ["衍生1", "衍生2", "衍生3"]}},
{{"id": 图片ID, "derived_tags": ["衍生1", "衍生2", "衍生3"]}}
]
}}
```
注意只输出JSON不要输出其他内容。
注意只输出JSON不要输出其他内容。results数组的顺序和数量必须与输入图片一致。
"""
# 每批图片数量
BATCH_IMAGE_COUNT = 10
class TagsDAO:
def __init__(self):
@@ -110,20 +121,43 @@ def merge_tags(original_tag: str, derived_tags: List[str], max_total_tags: int =
@retry(max_retries=settings.qwen.max_retries, delay=settings.qwen.retry_delay, backoff=2.0)
def derive_tags_single(item: Dict) -> Dict:
def derive_tags_batch(items: List[Dict]) -> List[Dict]:
"""
单张图片调用千问大模型获取衍生标签
item: {"id": 1, "image_url": "...", "tag_name": "高血压", ...}
返回: {"success": True/False, "item": item, "derived_tags": [...], "error": "..."}
批量调用千问大模型获取衍生标签10张图片一个请求
items: [{"id": 1, "image_url": "...", "default_tag_name": "高血压", ...}, ...]
返回: [{"success": True/False, "item": item, "derived_tags": [...], "error": "..."}, ...]
"""
logger.debug(f" 处理 ID:{item['id']} - {item['tag_name']}")
if not items:
return []
prompt = SINGLE_DERIVE_PROMPT.format(original_tag=item['tag_name'])
ids = [item['id'] for item in items]
logger.info(f" 批量处理 {len(items)} 张图片, IDs: {ids}")
content = [
{"image": item['image_url']},
{"text": prompt}
]
# 构建图片信息文本
image_info_lines = []
for i, item in enumerate(items, 1):
keywords = item.get('keywords_name', '')
# 如果关键字为"废止",提示大模型忽略
if keywords == '废止':
keywords_desc = "(关键字已废止,请忽略)"
else:
keywords_desc = f"关键字「{keywords}" if keywords else "关键字「无」"
image_info_lines.append(
f"图片{i}: ID={item['id']}, 原始标签「{item['default_tag_name']}」, {keywords_desc}"
)
image_info = "\n".join(image_info_lines)
prompt = BATCH_DERIVE_PROMPT.format(
count=len(items),
image_info=image_info
)
# 构建多图片内容
content = []
for item in items:
content.append({"image": item['image_url']})
content.append({"text": prompt})
messages = [{"role": "user", "content": content}]
@@ -132,9 +166,11 @@ def derive_tags_single(item: Dict) -> Dict:
messages=messages
)
results = []
if response.status_code == HTTPStatus.OK:
result_text = response.output.choices[0].message.content[0]["text"]
logger.debug(f" ID:{item['id']} 原始响应: {result_text[:200]}...")
logger.debug(f" 批量响应: {result_text[:300]}...")
try:
json_start = result_text.find('{')
@@ -142,26 +178,54 @@ def derive_tags_single(item: Dict) -> Dict:
if json_start != -1 and json_end > json_start:
json_str = result_text[json_start:json_end]
result_json = json.loads(json_str)
derived_tags = result_json.get('derived_tags', [])
if not derived_tags:
logger.warning(f" ID:{item['id']} 返回JSON中无derived_tags字段: {json_str[:100]}")
return {"success": True, "item": item, "derived_tags": derived_tags}
batch_results = result_json.get('results', [])
# 按ID匹配结果
result_map = {r.get('id'): r.get('derived_tags', []) for r in batch_results}
for item in items:
derived_tags = result_map.get(item['id'], [])
if derived_tags:
results.append({"success": True, "item": item, "derived_tags": derived_tags})
else:
# 尝试按顺序匹配
idx = items.index(item)
if idx < len(batch_results):
derived_tags = batch_results[idx].get('derived_tags', [])
results.append({"success": True, "item": item, "derived_tags": derived_tags})
else:
results.append({"success": False, "item": item, "error": "未匹配到衍生标签"})
else:
logger.warning(f" ID:{item['id']} 未找到JSON内容: {result_text[:200]}")
return {"success": False, "item": item, "error": "未找到JSON内容"}
logger.warning(f" 批量处理未找到JSON内容: {result_text[:200]}")
for item in items:
results.append({"success": False, "item": item, "error": "未找到JSON内容"})
except json.JSONDecodeError as e:
logger.error(f" ID:{item['id']} JSON解析失败: {e}, 内容: {result_text[:200]}")
return {"success": False, "item": item, "error": "JSON解析失败"}
logger.error(f" 批量处理JSON解析失败: {e}, 内容: {result_text[:200]}")
for item in items:
results.append({"success": False, "item": item, "error": "JSON解析失败"})
else:
error_msg = f"{response.code}-{response.message}"
logger.error(f" ID:{item['id']} API调用失败: {error_msg}")
logger.error(f" 批量处理API调用失败: {error_msg}")
# 检测内容审核失败(不重试,直接标记)
if 'DataInspectionFailed' in error_msg or 'inappropriate content' in error_msg:
for item in items:
results.append({
"success": False,
"item": item,
"error": "内容审核不通过",
"is_content_review_failed": True
})
return results
raise Exception(error_msg) # 抛出异常触发重试
return results
def process_batch(items: List[Dict], tags_dao: TagsDAO, concurrency: int = None) -> List[Dict]:
"""
并发处理一批图片
处理一批图片每10张一个请求多请求并发执行
Args:
items: 要处理的图片列表
@@ -171,43 +235,60 @@ def process_batch(items: List[Dict], tags_dao: TagsDAO, concurrency: int = None)
if concurrency is None:
concurrency = settings.tag_derive.concurrency
logger.info(f"[处理批次] {len(items)} 张图片,并发数: {concurrency}")
logger.info(f"[处理批次] {len(items)} 张图片,{BATCH_IMAGE_COUNT}张一个请求,并发数: {concurrency}")
db = get_db()
process_results = []
api_results = []
# 1. 并发调用大模型(按并发数限制)
# 1. 按 BATCH_IMAGE_COUNT 分组
sub_batches = [items[i:i+BATCH_IMAGE_COUNT] for i in range(0, len(items), BATCH_IMAGE_COUNT)]
logger.info(f" 分成 {len(sub_batches)} 个子批次")
# 2. 并发调用大模型(按并发数限制)
with ThreadPoolExecutor(max_workers=concurrency) as executor:
# 提交所有任务
future_to_item = {
executor.submit(derive_tags_single, item): item
for item in items
# 提交所有批量任务
future_to_batch = {
executor.submit(derive_tags_batch, sub_batch): sub_batch
for sub_batch in sub_batches
}
# 收集结果
for future in as_completed(future_to_item):
item = future_to_item[future]
for future in as_completed(future_to_batch):
sub_batch = future_to_batch[future]
try:
result = future.result()
api_results.append(result)
batch_results = future.result()
api_results.extend(batch_results)
except Exception as e:
logger.error(f" ID:{item['id']} 处理异常: {e}")
api_results.append({
"success": False,
"item": item,
"error": str(e)
})
logger.error(f" 子批次处理异常: {e}")
for item in sub_batch:
api_results.append({
"success": False,
"item": item,
"error": str(e)
})
# 2. 逐个处理结果并更新数据库
# 3. 逐个处理结果并更新数据库
for result in api_results:
item = result.get('item', {})
if not result.get('success'):
error_msg = result.get('error', '未知错误')
# 内容审核失败:更新状态和原因
if result.get('is_content_review_failed'):
try:
sql = "UPDATE ai_image_tags SET status = 'automated_review_failed', automated_review_failed_reason = %s WHERE id = %s"
db.execute_update(sql, (error_msg[:64], item['id']))
logger.warning(f" ✗ ID:{item['id']} -> status: automated_review_failed")
except Exception as e:
logger.error(f" 更新审核失败状态异常: {e}")
process_results.append({
"success": False,
"image_id": item.get('id'),
"error": result.get('error', '未知错误')
"error": error_msg,
"is_content_review_failed": result.get('is_content_review_failed', False)
})
continue
@@ -221,15 +302,17 @@ def process_batch(items: List[Dict], tags_dao: TagsDAO, concurrency: int = None)
})
continue
logger.info(f" [{item['tag_name']}] 衍生: {derived_tags}")
logger.info(f" [{item['default_tag_name']}] 衍生: {derived_tags}")
# 合并标签(限制总标签数量
# 只用衍生标签(不包含原始标签
max_total = getattr(settings.tag_derive, 'max_total_tags', None)
merged_tag_name = merge_tags(item['tag_name'], derived_tags, max_total_tags=max_total)
if max_total:
derived_tags = derived_tags[:max_total]
derived_tag_name = ''.join([f'#{t}#' for t in derived_tags if t])
# 插入ai_tags
try:
new_tag_id = tags_dao.get_or_create(merged_tag_name, None, item.get('department_name', ''))
new_tag_id = tags_dao.get_or_create(derived_tag_name, None, item.get('department_name', ''))
except Exception as e:
process_results.append({"success": False, "image_id": item['id'], "error": str(e)})
continue
@@ -237,13 +320,13 @@ def process_batch(items: List[Dict], tags_dao: TagsDAO, concurrency: int = None)
# 更新ai_image_tags包括 tag_id, tag_name, status
try:
sql = "UPDATE ai_image_tags SET tag_id = %s, tag_name = %s, status = 'manual_review' WHERE id = %s"
db.execute_update(sql, (new_tag_id, merged_tag_name, item['id']))
db.execute_update(sql, (new_tag_id, derived_tag_name, item['id']))
process_results.append({
"success": True,
"image_id": item['id'],
"original_tag": item['tag_name'],
"original_tag": item['default_tag_name'],
"derived_tags": derived_tags,
"merged_tag": merged_tag_name,
"derived_tag_name": derived_tag_name,
"new_tag_id": new_tag_id
})
logger.info(f" ✓ ID:{item['id']} -> tag_id:{new_tag_id}, status -> manual_review")
@@ -253,7 +336,7 @@ def process_batch(items: List[Dict], tags_dao: TagsDAO, concurrency: int = None)
return process_results
def batch_derive_tags(batch_size: int = None, concurrency: int = None, start_id: int = None, end_id: int = None, ids: List[int] = None) -> List[Dict]:
def batch_derive_tags(batch_size: int = None, concurrency: int = None, start_id: int = None, end_id: int = None, ids: List[int] = None, limit: int = None) -> List[Dict]:
"""
分批处理图片标签衍生
@@ -263,6 +346,7 @@ def batch_derive_tags(batch_size: int = None, concurrency: int = None, start_id:
start_id: 起始ID从该ID开始处理用于断点续传
end_id: 结束ID处理到该ID为止
ids: 指定ID列表只处理这些ID
limit: 限制处理的总数量(用于测试)
"""
if batch_size is None:
batch_size = settings.tag_derive.batch_size
@@ -277,10 +361,10 @@ def batch_derive_tags(batch_size: int = None, concurrency: int = None, start_id:
# 按指定ID查询查询 status='tag_extension' 的记录)
placeholders = ','.join(['%s'] * len(ids))
sql = f"""
SELECT it.id, it.image_thumb_url, it.tag_id, it.tag_name, it.department_name
SELECT it.id, it.image_url as db_image_url, it.tag_id, it.tag_name, it.default_tag_name, it.keywords_name, it.department_name
FROM ai_image_tags it
WHERE it.id IN ({placeholders})
AND it.image_thumb_url != '' AND it.tag_name != ''
AND it.image_url != '' AND it.default_tag_name != ''
AND it.status = 'tag_extension'
ORDER BY it.id
"""
@@ -288,9 +372,9 @@ def batch_derive_tags(batch_size: int = None, concurrency: int = None, start_id:
else:
# 按条件查询 status='tag_extension' 的记录
sql = """
SELECT it.id, it.image_thumb_url, it.tag_id, it.tag_name, it.department_name
SELECT it.id, it.image_url as db_image_url, it.tag_id, it.tag_name, it.default_tag_name, it.keywords_name, it.department_name
FROM ai_image_tags it
WHERE it.image_thumb_url != '' AND it.tag_name != ''
WHERE it.image_url != '' AND it.default_tag_name != ''
AND it.status = 'tag_extension'
"""
@@ -304,6 +388,10 @@ def batch_derive_tags(batch_size: int = None, concurrency: int = None, start_id:
sql += " ORDER BY it.id"
# 添加 LIMIT 限制
if limit:
sql += f" LIMIT {int(limit)}"
items = db.execute_query(sql, params) if params else db.execute_query(sql)
if not items:
@@ -312,8 +400,8 @@ def batch_derive_tags(batch_size: int = None, concurrency: int = None, start_id:
# 拼接完整图片URL
for item in items:
if item.get('image_thumb_url'):
item['image_url'] = settings.tag_derive.image_cdn_base + item['image_thumb_url']
if item.get('db_image_url'):
item['image_url'] = settings.tag_derive.image_cdn_base + item['db_image_url']
else:
item['image_url'] = ''
@@ -353,7 +441,7 @@ def print_summary(results: List[Dict]):
logger.info("详细结果:")
for r in results:
if r.get('success'):
logger.info(f" [ID:{r['image_id']}] {r['original_tag']} -> {r['merged_tag'][:40]}...")
logger.info(f" [ID:{r['image_id']}] {r['original_tag']} -> {r['derived_tag_name'][:40]}...")
else:
logger.warning(f" [ID:{r.get('image_id')}] 失败: {r.get('error')}")
@@ -367,6 +455,7 @@ def main():
parser.add_argument('--batch-size', type=int, default=None, help='每批次从数据库读取的图片数量')
parser.add_argument('--concurrency', type=int, default=None, help='并发请求数同时发出的API请求数')
parser.add_argument('--id', type=int, nargs='+', default=None, help='指定ID只处理这些ID可指定多个')
parser.add_argument('--limit', type=int, default=None, help='限制处理的总数量(用于测试,如 --limit 10')
args = parser.parse_args()
batch_size = args.batch_size or settings.tag_derive.batch_size
@@ -375,6 +464,8 @@ def main():
logger.info("=" * 60)
logger.info("千问视觉大模型 - 图片标签衍生生成器")
logger.info(f"模式: 每批 {batch_size} 张,并发 {concurrency} 个请求")
if args.limit:
logger.info(f"测试模式: 只处理 {args.limit}")
if args.id:
logger.info(f"指定ID: {args.id}")
elif args.start_id or args.end_id:
@@ -387,7 +478,8 @@ def main():
concurrency=args.concurrency,
start_id=args.start_id,
end_id=args.end_id,
ids=args.id
ids=args.id,
limit=args.limit
)
if results:

View File

@@ -11,7 +11,7 @@ from datetime import datetime
from typing import Optional
# 日志目录
LOG_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "logs")
LOG_DIR = os.path.join(os.path.dirname(__file__), "logs")
def setup_logger(

View File

@@ -152,6 +152,42 @@ def restore_data():
image_tags = data.get("ai_image_tags", [])
print(f"恢复 ai_image_tags 表 ({len(image_tags)} 条)...")
# 先插入一条URL错误的测试数据id=1第一条
error_url_data = {
"id": 1,
"image_id": 99999,
"image_name": "error_test_image.png",
"image_url": "https://invalid-url-test/not-exist/error_image.png",
"image_thumb_url": "https://invalid-url-test/not-exist/error_image_thumb.png",
"tag_id": 12679,
"tag_name": "#测试错误URL#",
"default_tag_id": 0,
"default_tag_name": "",
"keywords_id": 186,
"keywords_name": "测试",
"department_id": 11,
"department_name": "妇科",
"image_source": 1,
"created_user_id": 0,
"created_at": "2025-01-01T00:00:00",
"updated_at": "2025-01-01T00:00:00",
"image_attached_article_count": 0,
"status": "draft",
"blocking_reason": "",
"similarity": "draft",
"similarity_image_tags_id": 0,
"similarity score": 0.0
}
columns = ", ".join(f"`{k}`" for k in error_url_data.keys())
placeholders = ", ".join(["%s"] * len(error_url_data))
sql = f"INSERT INTO ai_image_tags ({columns}) VALUES ({placeholders})"
try:
cursor.execute(sql, list(error_url_data.values()))
conn.commit()
print(f" ✓ 已插入URL错误的测试数据 (id=1)")
except mysql.connector.Error as e:
print(f" 插入错误URL测试数据失败: {e}")
if image_tags:
success = 0
for item in image_tags:

73
test_db_select.py Normal file
View File

@@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
"""
数据库 SELECT 测试脚本
测试数据库连接和 sleep 是否生效
"""
import time
from database_config import get_db, DB_SLEEP
def test_select():
"""测试 SELECT 命中"""
print("=" * 50)
print("数据库 SELECT 测试")
print(f"DB_SLEEP = {DB_SLEEP}s")
print("=" * 50)
db = get_db()
# 测试1: 简单连接
print("\n[1] 测试连接...")
result = db.execute_one("SELECT 1 as test")
print(f"✓ 连接成功: {result}")
# 测试2: 多次查询,验证 sleep
print(f"\n[2] 执行5次 SELECT验证 sleep...")
start_time = time.time()
for i in range(5):
result = db.execute_one("SELECT COUNT(*) as cnt FROM ai_image_tags")
print(f"{i+1}次: {result['cnt']} 条记录")
elapsed = time.time() - start_time
expected_sleep = DB_SLEEP * 5
print(f"\n 总耗时: {elapsed:.3f}s")
print(f" 预期 sleep: {expected_sleep:.3f}s")
print("✓ SELECT 测试通过")
# 测试3: 查询 ai_image_tags
print("\n[3] 查询 ai_image_tags 前3条...")
results = db.execute_query(
"SELECT id, tag_name, department_name FROM ai_image_tags ORDER BY id DESC LIMIT 3"
)
for row in results:
print(f" ID: {row['id']}, 科室: {row['department_name']}, 标签: {row['tag_name'][:30]}...")
# 测试4: 查询 ai_tags
print("\n[4] 查询 ai_tags 表...")
result = db.execute_one("SELECT COUNT(*) as cnt FROM ai_tags")
print(f" ai_tags 总记录数: {result['cnt']}")
results = db.execute_query(
"SELECT id, tag_name, tag_category, department, status FROM ai_tags ORDER BY id DESC LIMIT 3"
)
print(" 前3条记录:")
for row in results:
tag_name = row['tag_name'][:30] if row['tag_name'] else ''
print(f" ID: {row['id']}, 分类: {row['tag_category']}, 科室: {row['department']}, 标签: {tag_name}...")
# 测试5: ai_tags 状态统计
print("\n[5] ai_tags 状态统计...")
results = db.execute_query(
"SELECT status, COUNT(*) as cnt FROM ai_tags GROUP BY status"
)
for row in results:
print(f" {row['status']}: {row['cnt']}")
print("\n" + "=" * 50)
print("测试完成!")
if __name__ == "__main__":
test_select()