From 5a6fbcbf28139b0fdc970faa6e264b0017b0a7b9 Mon Sep 17 00:00:00 2001 From: liangguodong Date: Thu, 5 Feb 2026 19:01:38 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E9=87=8D=E7=AE=97?= =?UTF-8?q?=E8=84=9A=E6=9C=AC=E5=92=8C=E7=BB=9F=E8=AE=A1=E8=84=9A=E6=9C=AC?= =?UTF-8?q?=EF=BC=8C=E6=9B=B4=E6=96=B0README?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 40 +++-- ai_article.sql | 4 +- image_similarity_check.py | 5 +- image_similarity_recalc.py | 293 +++++++++++++++++++++++++++++++++++++ query_status.py | 4 +- reset_data.py | 2 +- stats_similarity.py | 141 ++++++++++++++++++ 7 files changed, 469 insertions(+), 20 deletions(-) create mode 100644 image_similarity_recalc.py create mode 100644 stats_similarity.py diff --git a/README.md b/README.md index 775b905..44cbae3 100644 --- a/README.md +++ b/README.md @@ -49,8 +49,8 @@ vector_dimension = 1024 cdn_base = https://your-cdn.com/ [similarity] -phash_threshold = 10 -vector_threshold = 0.85 +phash_threshold = 5 +vector_threshold = 0.94 [process] batch_size = 100 @@ -62,25 +62,37 @@ log_file = image_similarity.log ## 使用方法 ```bash +# 处理新图片 (status='draft', similarity='draft') python image_similarity_check.py + +# 重新处理失败的图片 (status='draft', similarity='recalc') +python image_similarity_recalc.py + +# 查看统计报告 +python stats_similarity.py ``` ## 项目结构 ``` -├── image_similarity_check.py # 主程序:图片去重审核 -├── query_status.py # 查询处理状态 -├── reset_data.py # 重置数据 -├── reset_vector.py # 重置向量库 -├── basket.py # 测试脚本 -├── requirements.txt # 依赖包 -└── config.ini # 配置文件(不提交) +├── image_similarity_check.py # 主程序:处理新图片 +├── image_similarity_recalc.py # 重算程序:处理失败的图片 +├── stats_similarity.py # 统计脚本:查看处理结果 +├── query_status.py # 查询处理状态 +├── reset_data.py # 重置数据 +├── reset_vector.py # 重置向量库 +├── config.ini # 配置文件 +└── requirements.txt # 依赖包 ``` ## 工作流程 -1. 从数据库获取待处理的图片记录 -2. 调用 DashScope API 获取图片的多模态 Embedding -3. 在 DashVector 中搜索相似图片 -4. 根据相似度阈值判断是否重复 -5. 更新数据库状态(重复/不重复) +1. 从数据库获取待处理图片 (`status='draft'`, `similarity='draft'`) +2. 拼接 CDN URL:`cdn_base + image_url` +3. 调用 DashScope API 获取 1024 维向量 +4. 在 DashVector 中搜索 topk=3 相似图片 +5. 计算相似度:`similarity = 1.0 - score` +6. 判断结果: + - `similarity >= 0.94` → 标记为重复 (`status='similarity'`) + - `similarity < 0.94` → 标记为不重复 (`status='tag_extension'`),向量入库 + - 处理失败 → 标记为待重算 (`similarity='recalc'`) diff --git a/ai_article.sql b/ai_article.sql index 0a40562..5de0de2 100644 --- a/ai_article.sql +++ b/ai_article.sql @@ -241,7 +241,7 @@ CREATE TABLE `ai_image_tags` ( `blocking_reason` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '审核不通过原因', `similarity` enum('draft','yes','calc','recalc') CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT 'draft' COMMENT 'yes=是相似|calc=已计算|recalc=需要重新计算', `similarity_image_tags_id` int NOT NULL DEFAULT 0 COMMENT 'yes=是相似|把image_tags_id写入', - `similarity score` float NOT NULL DEFAULT 0 COMMENT '相似时候,计算相似度值', + `similarity_score` float NOT NULL DEFAULT 0 COMMENT '相似时候,计算相似度值', PRIMARY KEY (`id`) USING BTREE, UNIQUE INDEX `uk_image_tag`(`image_id` ASC, `tag_id` ASC) USING BTREE, INDEX `tag_id`(`tag_id` ASC) USING BTREE, @@ -932,7 +932,7 @@ CREATE TABLE `baidu_keyword` ( `similarity` enum('draft','yes','calc','recalc') CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT 'draft' COMMENT 'yes=是相似|calc=已计算|recalc=需要重新计算', `similarity_query` int NOT NULL DEFAULT 0 COMMENT 'yes=是相似|把query_id写入', `similarity_query_keyword` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT 'yes=是相似|把query写入', - `similarity score` float NOT NULL DEFAULT 0 COMMENT '相似时候,计算相似度值', + `similarity_score` float NOT NULL DEFAULT 0 COMMENT '相似时候,计算相似度值', `reviewed_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '审核日期', `fast_track` tinyint(1) NOT NULL DEFAULT 0 COMMENT '加急|0=否|1=是', `automated_review_failed_reason` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '千问大模型审核query不符合原因', diff --git a/image_similarity_check.py b/image_similarity_check.py index 52421c5..f2a2b1a 100644 --- a/image_similarity_check.py +++ b/image_similarity_check.py @@ -249,7 +249,7 @@ class ImageSimilarityChecker: SET status = 'similarity', similarity = 'yes', similarity_image_tags_id = %s, - `similarity score` = %s, + similarity_score = %s, updated_at = NOW() WHERE id = %s """ @@ -371,6 +371,9 @@ class ImageSimilarityChecker: total_unique += uniq self.logger.info(f"批次结果: 重复={dup}, 不重复={uniq}, 失败={fail}") + + # 批次间休息,避免数据库连接问题 + time.sleep(1) finally: if self.db_conn: diff --git a/image_similarity_recalc.py b/image_similarity_recalc.py new file mode 100644 index 0000000..92cd03f --- /dev/null +++ b/image_similarity_recalc.py @@ -0,0 +1,293 @@ +# -*- coding: utf-8 -*- +""" +图片去重审核脚本 - 重新计算版 +专门处理 status='draft' AND similarity='recalc' 的数据 +""" + +import configparser +import logging +import time +import dashscope +from dashscope import MultiModalEmbedding +from typing import Optional, Tuple, List, Dict + +import pymysql +from dashvector import Client, Doc + + +class ImageSimilarityRecalc: + """图片相似度重新计算器""" + + def __init__(self, config_path: str = 'config.ini'): + self.config = configparser.ConfigParser() + self.config.read(config_path, encoding='utf-8') + + self._setup_logging() + + # 连接 + self.db_conn = None + self.dashvector_client = None + self.collection = None + + # DashScope API + self.dashscope_api_key = self.config.get('dashscope', 'api_key') + dashscope.api_key = self.dashscope_api_key + + # 配置参数 + self.image_cdn_base = self.config.get('image', 'cdn_base') + self.vector_threshold = self.config.getfloat('similarity', 'vector_threshold') + self.batch_size = self.config.getint('process', 'batch_size') + + def _setup_logging(self): + log_level = self.config.get('process', 'log_level', fallback='INFO') + log_file = self.config.get('process', 'log_file', fallback='image_similarity.log') + + self.logger = logging.getLogger('recalc') + + if not self.logger.handlers: + self.logger.setLevel(getattr(logging, log_level)) + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') + + fh = logging.FileHandler(log_file, encoding='utf-8') + fh.setFormatter(formatter) + self.logger.addHandler(fh) + + sh = logging.StreamHandler() + sh.setFormatter(formatter) + self.logger.addHandler(sh) + + def connect_db(self): + """连接数据库""" + self.db_conn = pymysql.connect( + host=self.config.get('database', 'host'), + port=self.config.getint('database', 'port'), + user=self.config.get('database', 'user'), + password=self.config.get('database', 'password'), + database=self.config.get('database', 'database'), + charset=self.config.get('database', 'charset'), + cursorclass=pymysql.cursors.DictCursor + ) + self.logger.info("数据库连接成功") + + def connect_dashvector(self): + """连接 DashVector""" + api_key = self.config.get('dashvector', 'api_key') + endpoint = self.config.get('dashvector', 'endpoint') + collection_name = self.config.get('dashvector', 'collection_name') + + self.dashvector_client = Client(api_key=api_key, endpoint=endpoint) + self.collection = self.dashvector_client.get(collection_name) + self.logger.info("DashVector 连接成功") + + def get_image_embedding(self, image_url: str, max_retries: int = 5) -> Optional[List[float]]: + """调用 DashScope 多模态 Embedding SDK 获取图片向量""" + for attempt in range(max_retries): + try: + input_data = [{'image': image_url}] + resp = MultiModalEmbedding.call( + model='multimodal-embedding-v1', + input=input_data + ) + + if resp.status_code == 200: + return resp.output['embeddings'][0]['embedding'] + elif resp.status_code in (429, 403): + wait_time = 3 + attempt * 3 + self.logger.warning(f"API 限流,等待 {wait_time} 秒后重试 ({attempt + 1}/{max_retries})...") + time.sleep(wait_time) + else: + self.logger.warning(f"Embedding API 错误: {resp.status_code} - {resp.message}") + return None + except Exception as e: + self.logger.warning(f"Embedding API 异常: {e}") + time.sleep(2) + + return None + + def get_recalc_images(self) -> List[dict]: + """获取需要重新计算的图片 (status='draft' AND similarity='recalc')""" + with self.db_conn.cursor() as cursor: + sql = """ + SELECT id, image_id, image_url, image_thumb_url, image_name + FROM ai_image_tags + WHERE status = 'draft' AND similarity = 'recalc' + AND image_url != '' AND image_url IS NOT NULL + ORDER BY id ASC + LIMIT %s + """ + cursor.execute(sql, (self.batch_size,)) + return cursor.fetchall() + + def search_similar(self, features: List[float], exclude_id: int) -> Tuple[bool, Optional[int], Optional[float]]: + """在 DashVector 中搜索相似图片""" + try: + results = self.collection.query(features, topk=3) + + if results and results.output: + for doc in results.output: + similar_id = int(doc.id) + if similar_id == exclude_id: + continue + + similarity = 1.0 - doc.score + self.logger.info(f"搜索到: {similar_id}, 距离={doc.score:.4f}, 相似度={similarity:.4f}") + + if similarity >= self.vector_threshold: + return True, similar_id, similarity + + return False, None, None + except Exception as e: + self.logger.warning(f"搜索失败: {e}") + return False, None, None + + def upsert_to_dashvector(self, image_id: int, features: List[float]): + """存入 DashVector""" + try: + doc = Doc(id=str(image_id), vector=features) + result = self.collection.upsert([doc]) + if result.code == 0: + self.logger.info(f"向量入库成功: {image_id}") + else: + self.logger.warning(f"向量入库失败 ID={image_id}: code={result.code}, msg={result.message}") + except Exception as e: + self.logger.warning(f"存入 DashVector 异常 ID={image_id}: {e}") + + def update_as_duplicate(self, image_id: int, similar_id: int, score: float): + """更新为重复图片""" + with self.db_conn.cursor() as cursor: + sql = """ + UPDATE ai_image_tags + SET status = 'similarity', + similarity = 'yes', + similarity_image_tags_id = %s, + similarity_score = %s, + updated_at = NOW() + WHERE id = %s + """ + cursor.execute(sql, (similar_id, score, image_id)) + self.db_conn.commit() + self.logger.info(f"重复: {image_id} -> {similar_id} (分数={score:.4f})") + + def update_as_unique(self, image_id: int): + """更新为不重复图片""" + with self.db_conn.cursor() as cursor: + sql = """ + UPDATE ai_image_tags + SET status = 'tag_extension', + similarity = 'calc', + updated_at = NOW() + WHERE id = %s + """ + cursor.execute(sql, (image_id,)) + self.db_conn.commit() + self.logger.info(f"不重复: {image_id} -> tag_extension") + + def update_as_failed(self, image_id: int, reason: str): + """标记为处理失败(保持 recalc 状态)""" + with self.db_conn.cursor() as cursor: + sql = """ + UPDATE ai_image_tags + SET updated_at = NOW() + WHERE id = %s + """ + cursor.execute(sql, (image_id,)) + self.db_conn.commit() + self.logger.warning(f"处理失败 {image_id}: {reason}") + + def process_batch(self, image_records: List[dict]) -> Tuple[int, int, int]: + """处理一批图片,返回 (重复数, 不重复数, 失败数)""" + if not image_records: + return 0, 0, 0 + + duplicates = 0 + unique = 0 + failed = 0 + + for rec in image_records: + image_id = rec['id'] + + if not rec['image_url'] or rec['image_url'].strip() == '': + self.logger.warning(f"图像URL为空,跳过处理: {image_id}") + self.update_as_failed(image_id, "图像URL为空") + failed += 1 + continue + + full_url = f"{self.image_cdn_base}{rec['image_url']}" + + try: + time.sleep(0.5) + self.logger.info(f"重新计算 Embedding: {image_id} -> {full_url}") + + features = self.get_image_embedding(image_url=full_url) + + if features is None: + self.logger.warning(f"Embedding 获取失败: {image_id}") + self.update_as_failed(image_id, "Embedding API 失败") + failed += 1 + continue + + is_dup, similar_id, score = self.search_similar(features, image_id) + + if is_dup: + self.update_as_duplicate(image_id, similar_id, score) + duplicates += 1 + else: + self.upsert_to_dashvector(image_id, features) + self.update_as_unique(image_id) + unique += 1 + + except Exception as e: + self.logger.error(f"处理失败 {image_id}: {e}") + self.update_as_failed(image_id, str(e)[:200]) + failed += 1 + continue + + return duplicates, unique, failed + + def run(self): + """运行主流程""" + self.logger.info("=" * 60) + self.logger.info("图片去重审核 - 重新计算版 (recalc)") + self.logger.info("=" * 60) + + self.connect_db() + self.connect_dashvector() + + total_duplicates = 0 + total_unique = 0 + total_failed = 0 + batch_num = 0 + + try: + while True: + images = self.get_recalc_images() + + if not images: + self.logger.info("没有需要重新计算的图片") + break + + batch_num += 1 + self.logger.info(f"\n--- 批次 {batch_num}: {len(images)} 张 (recalc) ---") + + dup, uniq, fail = self.process_batch(images) + total_duplicates += dup + total_unique += uniq + total_failed += fail + + self.logger.info(f"批次结果: 重复={dup}, 不重复={uniq}, 失败={fail}") + + # 批次间休息,避免数据库连接问题 + time.sleep(1) + + finally: + if self.db_conn: + self.db_conn.close() + + self.logger.info("=" * 60) + self.logger.info(f"完成! 总重复: {total_duplicates}, 总不重复: {total_unique}, 总失败: {total_failed}") + self.logger.info("=" * 60) + + +if __name__ == '__main__': + recalc = ImageSimilarityRecalc('config.ini') + recalc.run() diff --git a/query_status.py b/query_status.py index 0a633fa..fa33350 100644 --- a/query_status.py +++ b/query_status.py @@ -24,7 +24,7 @@ def main(): with db_conn.cursor() as cursor: sql = """ SELECT id, image_name, status, similarity, - similarity_image_tags_id, `similarity score`, blocking_reason + similarity_image_tags_id, similarity_score, blocking_reason FROM ai_image_tags ORDER BY id """ @@ -45,7 +45,7 @@ def main(): print("=" * 100) for r in rows: - score = f"{r['similarity score']:.4f}" if r['similarity score'] else "-" + score = f"{r['similarity_score']:.4f}" if r['similarity_score'] else "-" similar_id = r['similarity_image_tags_id'] if r['similarity_image_tags_id'] else "-" reason = r['blocking_reason'][:20] if r['blocking_reason'] else "-" print(f"{r['id']:<8} {r['image_name'][:28]:<30} {r['status']:<15} {r['similarity']:<8} {similar_id:<8} {score:<8} {reason}") diff --git a/reset_data.py b/reset_data.py index 5a0b588..218bbe5 100644 --- a/reset_data.py +++ b/reset_data.py @@ -31,7 +31,7 @@ def main(): SET status = 'draft', similarity = 'draft', similarity_image_tags_id = 0, - `similarity score` = 0 + similarity_score = 0 WHERE status != 'draft' OR similarity != 'draft' """ affected = cursor.execute(sql) diff --git a/stats_similarity.py b/stats_similarity.py new file mode 100644 index 0000000..b9e3d5f --- /dev/null +++ b/stats_similarity.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- +""" +图片相似度统计脚本 +统计各状态的图片数量和重复率 +""" + +import configparser +import pymysql +from datetime import datetime + + +def main(): + # 读取配置 + config = configparser.ConfigParser() + config.read('config.ini', encoding='utf-8') + + # 连接数据库 + db_conn = pymysql.connect( + host=config.get('database', 'host'), + port=config.getint('database', 'port'), + user=config.get('database', 'user'), + password=config.get('database', 'password'), + database=config.get('database', 'database'), + charset=config.get('database', 'charset'), + cursorclass=pymysql.cursors.DictCursor + ) + + print("=" * 70) + print(f"图片相似度统计报告 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print("=" * 70) + + with db_conn.cursor() as cursor: + # 1. 总体统计 + print("\n【一、总体统计】\n") + + stats = [ + ("待处理 (draft/draft)", "status = 'draft' AND similarity = 'draft'"), + ("重复图片 (similarity/yes)", "status = 'similarity' AND similarity = 'yes'"), + ("不重复图片 (tag_extension/calc)", "status = 'tag_extension' AND similarity = 'calc'"), + ("待重算 (draft/recalc)", "status = 'draft' AND similarity = 'recalc'"), + ] + + results = {} + for name, condition in stats: + cursor.execute(f"SELECT COUNT(*) as cnt FROM ai_image_tags WHERE {condition}") + count = cursor.fetchone()['cnt'] + results[name] = count + print(f" {name:<35} : {count:>10,} 张") + + # 总计 + cursor.execute("SELECT COUNT(*) as cnt FROM ai_image_tags") + total = cursor.fetchone()['cnt'] + print(f" {'─' * 50}") + print(f" {'总计':<35} : {total:>10,} 张") + + # 2. 重复率计算 + print("\n【二、重复率分析】\n") + + duplicate_count = results["重复图片 (similarity/yes)"] + unique_count = results["不重复图片 (tag_extension/calc)"] + processed = duplicate_count + unique_count + + if processed > 0: + duplicate_rate = (duplicate_count / processed) * 100 + print(f" 已处理图片数 : {processed:>10,} 张") + print(f" 重复图片数 : {duplicate_count:>10,} 张") + print(f" 不重复图片数 : {unique_count:>10,} 张") + print(f" 重复率 : {duplicate_rate:>10.2f} %") + else: + print(" 暂无已处理的图片数据") + + # 3. 处理进度 + print("\n【三、处理进度】\n") + + pending = results["待处理 (draft/draft)"] + recalc = results["待重算 (draft/recalc)"] + + if total > 0: + progress = (processed / total) * 100 + print(f" 总进度 : {progress:>10.2f} %") + print(f" 待处理 : {pending:>10,} 张") + print(f" 待重算 : {recalc:>10,} 张") + print(f" 已完成 : {processed:>10,} 张") + + # 4. 相似度分数分布(仅重复图片) + print("\n【四、相似度分数分布】\n") + + cursor.execute(""" + SELECT + CASE + WHEN similarity_score >= 0.99 THEN '0.99-1.00 (几乎相同)' + WHEN similarity_score >= 0.97 THEN '0.97-0.99 (非常相似)' + WHEN similarity_score >= 0.95 THEN '0.95-0.97 (高度相似)' + WHEN similarity_score >= 0.94 THEN '0.94-0.95 (相似)' + ELSE '< 0.94 (其他)' + END as score_range, + COUNT(*) as cnt + FROM ai_image_tags + WHERE status = 'similarity' AND similarity = 'yes' + GROUP BY score_range + ORDER BY score_range DESC + """) + + score_stats = cursor.fetchall() + if score_stats: + for row in score_stats: + print(f" {row['score_range']:<25} : {row['cnt']:>10,} 张") + else: + print(" 暂无重复图片数据") + + # 5. 最近处理记录 + print("\n【五、最近 10 条重复记录】\n") + + cursor.execute(""" + SELECT id, image_name, similarity_image_tags_id, similarity_score, updated_at + FROM ai_image_tags + WHERE status = 'similarity' AND similarity = 'yes' + ORDER BY updated_at DESC + LIMIT 10 + """) + + recent = cursor.fetchall() + if recent: + print(f" {'ID':<10} {'相似ID':<10} {'分数':<10} {'更新时间':<20}") + print(f" {'-' * 55}") + for row in recent: + score = f"{row['similarity_score']:.4f}" if row['similarity_score'] else "-" + updated = row['updated_at'].strftime('%Y-%m-%d %H:%M') if row['updated_at'] else "-" + print(f" {row['id']:<10} {row['similarity_image_tags_id']:<10} {score:<10} {updated:<20}") + else: + print(" 暂无重复记录") + + db_conn.close() + + print("\n" + "=" * 70) + print("统计完成") + print("=" * 70) + + +if __name__ == '__main__': + main()