From b9f65e2f9b1a3cc00bafdf0965ba08bbb5a8cc99 Mon Sep 17 00:00:00 2001 From: shengyudong Date: Mon, 2 Feb 2026 17:40:02 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=96=87=E7=AB=A0=E5=9B=BE?= =?UTF-8?q?=E7=89=87=E6=99=BA=E8=83=BD=E5=8C=B9=E9=85=8D=E8=84=9A=E6=9C=AC?= =?UTF-8?q?article=5Fauto=5Fimage=5Fmatching.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- article_auto_image_matching.py | 541 +++++++++++++++++++++++++++++++++ 1 file changed, 541 insertions(+) create mode 100644 article_auto_image_matching.py diff --git a/article_auto_image_matching.py b/article_auto_image_matching.py new file mode 100644 index 0000000..643c6ec --- /dev/null +++ b/article_auto_image_matching.py @@ -0,0 +1,541 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +文章图片智能匹配脚本 +基于通义千问大模型,智能匹配文章与图片,将结果存储到ai_article_images表 +""" + +import os +import sys +import time +import json +import logging +import requests +import pymysql +import traceback +import threading +from datetime import datetime +from typing import Dict, List, Optional, Tuple +from concurrent.futures import ThreadPoolExecutor, as_completed + +# 添加项目根目录到Python路径 +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +from database_config import db_manager +from log_config import setup_logger + +# 配置日志记录器 +logger = setup_logger( + name='article_image_matching', + log_file='logs/article_image_matching.log', + error_log_file='logs/article_image_matching_error.log', + level=logging.INFO, + console_output=True +) + +# 配置常量 +QWEN_API_KEY = "sk-e6a38204022a4b538b8954f0584712af" +QWEN_API_URL = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation" +WORKER_COUNT = 4 # 并行处理worker数量 +BATCH_SIZE = 50 # 每批处理的文章数量 +MATCH_THRESHOLD = 0.6 # 匹配分数阈值(0-1) + +class ArticleImageMatcher: + def __init__(self): + # 使用统一的数据库管理器 + self.db_manager = db_manager + + # 并行处理相关 + self.processing_lock = threading.Lock() + self.processed_articles = set() + + logger.info("ArticleImageMatcher 初始化完成") + self.log_to_database('INFO', 'ArticleImageMatcher 初始化完成') + + def log_to_database(self, level: str, message: str, details: Optional[str] = None): + """记录日志到数据库ai_logs表""" + try: + connection = self.db_manager.get_connection() + try: + with connection.cursor(pymysql.cursors.DictCursor) as cursor: + status_map = { + 'INFO': 'success', + 'WARNING': 'warning', + 'ERROR': 'error' + } + status = status_map.get(level, 'success') + + sql = """ + INSERT INTO ai_logs (user_id, action, description, status, error_message, created_at) + VALUES (%s, %s, %s, %s, %s, NOW()) + """ + cursor.execute(sql, (None, 'article_image_matching', message, status, details)) + connection.commit() + logger.info(f"日志已记录到数据库: {level} - {message}") + finally: + connection.close() + except Exception as e: + logger.error(f"记录日志到数据库失败: {e}") + + def get_articles_with_tags(self, limit: int = BATCH_SIZE) -> List[Dict]: + """ + 从ai_article_tags表获取需要匹配图片的文章 + + Returns: + 包含文章ID、标签等信息的列表 + """ + try: + connection = self.db_manager.get_connection() + try: + with connection.cursor(pymysql.cursors.DictCursor) as cursor: + # 查询有标签但未匹配图片的文章 + sql = """ + SELECT + at.id, + at.article_id, + at.coze_tag, + a.title, + a.content + FROM ai_article_tags at + INNER JOIN ai_articles a ON at.article_id = a.id + WHERE at.coze_tag IS NOT NULL + AND at.coze_tag != '' + AND NOT EXISTS ( + SELECT 1 FROM ai_article_images ai + WHERE ai.article_id = at.article_id + ) + AND a.status = 'approved' + ORDER BY at.id DESC + LIMIT %s + """ + cursor.execute(sql, (limit,)) + results = cursor.fetchall() + + if results: + logger.info(f"查询到 {len(results)} 篇需要匹配图片的文章") + self.log_to_database('INFO', f"查询到需要匹配图片的文章", f"数量: {len(results)}") + else: + logger.info("未查询到需要匹配图片的文章") + + return results + finally: + connection.close() + except Exception as e: + error_msg = f"查询文章标签异常: {e}" + logger.error(error_msg) + self.log_to_database('ERROR', error_msg, traceback.format_exc()) + return [] + + def get_available_images_with_tags(self) -> List[Dict]: + """ + 从ai_image_tags表获取可用的图片及其标签 + + Returns: + 包含图片ID、标签等信息的列表 + """ + try: + connection = self.db_manager.get_connection() + try: + with connection.cursor(pymysql.cursors.DictCursor) as cursor: + # 查询附加文章数量小于5的图片 + sql = """ + SELECT + id, + image_id, + image_name, + image_url, + image_thumb_url, + tag_id, + tag_name, + keywords_id, + keywords_name, + department_id, + department_name, + image_attached_article_count + FROM ai_image_tags + WHERE image_attached_article_count < 5 + ORDER BY image_attached_article_count ASC, id DESC + """ + cursor.execute(sql) + results = cursor.fetchall() + + if results: + logger.info(f"查询到 {len(results)} 张可用图片") + self.log_to_database('INFO', f"查询到可用图片", f"数量: {len(results)}") + else: + logger.info("未查询到可用图片") + + return results + finally: + connection.close() + except Exception as e: + error_msg = f"查询图片标签异常: {e}" + logger.error(error_msg) + self.log_to_database('ERROR', error_msg, traceback.format_exc()) + return [] + + def parse_article_tags(self, coze_tag: str) -> List[str]: + """ + 解析文章标签(支持JSON格式和逗号分隔格式) + + Args: + coze_tag: 标签字符串 + + Returns: + 标签列表 + """ + try: + if not coze_tag: + return [] + + # 尝试解析JSON格式 + try: + tags_data = json.loads(coze_tag) + if isinstance(tags_data, list): + return tags_data + elif isinstance(tags_data, dict): + return list(tags_data.values()) + except json.JSONDecodeError: + pass + + # 按逗号分隔 + tags = [tag.strip() for tag in str(coze_tag).split(',') if tag.strip()] + return tags + except Exception as e: + logger.error(f"解析文章标签异常: {e}") + return [] + + def call_qwen_for_matching(self, article_title: str, article_tags: List[str], + image_tags: List[str], image_keywords: str) -> Tuple[bool, float]: + """ + 调用通义千问API评估文章与图片的匹配度 + + Args: + article_title: 文章标题 + article_tags: 文章标签列表 + image_tags: 图片标签列表 + image_keywords: 图片关键词 + + Returns: + (是否匹配, 匹配分数) + """ + try: + # 构建提示词 + prompt = f"""请评估以下文章与图片的匹配度: + +文章标题:{article_title} +文章标签:{', '.join(article_tags)} + +图片标签:{', '.join(image_tags)} +图片关键词:{image_keywords} + +请根据标签的语义相关性,给出0-1之间的匹配分数,并说明是否适合匹配。 +输出格式:{{"match": true/false, "score": 0.0-1.0, "reason": "匹配原因"}} +只输出JSON格式,不要其他内容。""" + + headers = { + 'Authorization': f'Bearer {QWEN_API_KEY}', + 'Content-Type': 'application/json' + } + + payload = { + "model": "qwen-max", + "input": { + "messages": [ + { + "role": "user", + "content": prompt + } + ] + }, + "parameters": { + "result_format": "message" + } + } + + response = requests.post( + QWEN_API_URL, + json=payload, + headers=headers, + timeout=30 + ) + + if response.status_code == 200: + result = response.json() + if result.get('output') and result['output'].get('choices'): + message = result['output']['choices'][0].get('message', {}) + content = message.get('content', '') + + # 解析JSON响应 + try: + match_result = json.loads(content) + is_match = match_result.get('match', False) + score = match_result.get('score', 0.0) + reason = match_result.get('reason', '') + + logger.info(f"通义千问评估结果 - 匹配: {is_match}, 分数: {score}, 原因: {reason}") + return is_match, score + except json.JSONDecodeError: + logger.error(f"解析通义千问响应失败: {content}") + return False, 0.0 + else: + logger.error(f"通义千问API返回格式异常: {result}") + return False, 0.0 + else: + logger.error(f"通义千问API请求失败,状态码: {response.status_code}") + return False, 0.0 + + except requests.exceptions.Timeout: + logger.error("通义千问API请求超时") + return False, 0.0 + except Exception as e: + error_msg = f"调用通义千问API异常: {e}" + logger.error(error_msg) + self.log_to_database('ERROR', error_msg, traceback.format_exc()) + return False, 0.0 + + def insert_article_image_relation(self, article_id: int, image_data: Dict, + match_score: float) -> bool: + """ + 将匹配结果插入ai_article_images表 + + Args: + article_id: 文章ID + image_data: 图片数据 + match_score: 匹配分数 + + Returns: + 是否插入成功 + """ + try: + connection = self.db_manager.get_connection() + try: + with connection.cursor(pymysql.cursors.DictCursor) as cursor: + # 查询当前文章下已有图片的最大sort_order + query_max_sort = """ + SELECT COALESCE(MAX(sort_order), 0) as max_sort_order + FROM ai_article_images + WHERE article_id = %s + """ + cursor.execute(query_max_sort, (article_id,)) + result = cursor.fetchone() + max_sort_order = result.get('max_sort_order', 0) + new_sort_order = max_sort_order + 1 + + # 插入关联记录 + insert_sql = """ + INSERT INTO ai_article_images + (article_id, image_id, image_url, image_thumb_url, image_tag_id, + sort_order, keywords_id, keywords_name, department_id, department_name, + image_source, created_at) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()) + """ + cursor.execute(insert_sql, ( + article_id, + image_data['image_id'], + image_data['image_url'], + image_data['image_thumb_url'], + image_data['id'], # image_tag_id + new_sort_order, + image_data['keywords_id'], + image_data['keywords_name'], + image_data['department_id'], + image_data['department_name'], + 1 # image_source: 1表示tag匹配 + )) + + # 更新图片附加文章计数 + update_sql = """ + UPDATE ai_image_tags + SET image_attached_article_count = image_attached_article_count + 1 + WHERE id = %s + """ + cursor.execute(update_sql, (image_data['id'],)) + + connection.commit() + logger.info(f"成功插入文章图片关联 - 文章ID: {article_id}, 图片ID: {image_data['image_id']}, 分数: {match_score}") + return True + finally: + connection.close() + + except Exception as e: + error_msg = f"插入文章图片关联异常: {e}" + logger.error(error_msg) + self.log_to_database('ERROR', error_msg, traceback.format_exc()) + return False + + def match_article_with_images(self, article_data: Dict, available_images: List[Dict]) -> bool: + """ + 为单篇文章匹配图片 + + Args: + article_data: 文章数据 + available_images: 可用图片列表 + + Returns: + 是否匹配成功 + """ + article_id = article_data['article_id'] + article_title = article_data.get('title', '') + coze_tag = article_data.get('coze_tag', '') + + try: + # 解析文章标签 + article_tags = self.parse_article_tags(coze_tag) + if not article_tags: + logger.warning(f"文章 {article_id} 没有有效标签,跳过") + return False + + logger.info(f"开始为文章 {article_id} 匹配图片 - 标题: {article_title}, 标签: {article_tags}") + + best_match = None + best_score = 0.0 + + # 遍历可用图片,找到最佳匹配 + for image_data in available_images: + image_tags = [image_data['tag_name']] + image_keywords = image_data.get('keywords_name', '') + + # 调用通义千问评估匹配度 + is_match, score = self.call_qwen_for_matching( + article_title, article_tags, image_tags, image_keywords + ) + + # 记录评估结果 + logger.info(f"文章 {article_id} vs 图片 {image_data['image_id']} - 匹配: {is_match}, 分数: {score}") + + # 更新最佳匹配 + if is_match and score > best_score and score >= MATCH_THRESHOLD: + best_score = score + best_match = image_data + + # 如果找到匹配的图片,插入关联记录 + if best_match: + if self.insert_article_image_relation(article_id, best_match, best_score): + logger.info(f"文章 {article_id} 成功匹配图片 {best_match['image_id']}, 分数: {best_score}") + self.log_to_database('INFO', f"文章匹配成功", + f"文章ID: {article_id}, 图片ID: {best_match['image_id']}, 分数: {best_score}") + return True + else: + return False + else: + logger.info(f"文章 {article_id} 未找到合适的匹配图片") + self.log_to_database('WARNING', f"文章未找到匹配图片", f"文章ID: {article_id}") + return False + + except Exception as e: + error_msg = f"匹配文章图片异常 - 文章ID: {article_id}, 错误: {e}" + logger.error(error_msg) + self.log_to_database('ERROR', error_msg, traceback.format_exc()) + return False + + def get_next_available_article(self, pending_articles: List[Dict]) -> Optional[Dict]: + """线程安全地获取下一篇待处理文章""" + with self.processing_lock: + for article_data in pending_articles: + article_id = article_data['article_id'] + if article_id not in self.processed_articles: + self.processed_articles.add(article_id) + return article_data + return None + + def worker_process_articles(self, pending_articles: List[Dict], + available_images: List[Dict], worker_id: int) -> int: + """Worker线程处理文章匹配""" + processed_count = 0 + thread_name = f"Worker-{worker_id}" + threading.current_thread().name = thread_name + + logger.info(f"[{thread_name}] 启动,准备处理文章匹配") + + while True: + # 线程安全地获取下一篇待处理文章 + article_data = self.get_next_available_article(pending_articles) + if not article_data: + logger.info(f"[{thread_name}] 没有更多待处理文章,退出") + break + + # 匹配文章与图片 + if self.match_article_with_images(article_data, available_images): + processed_count += 1 + logger.info(f"[{thread_name}] 成功处理文章: {article_data['article_id']}") + else: + logger.warning(f"[{thread_name}] 文章处理失败: {article_data['article_id']}") + + logger.info(f"[{thread_name}] 完成,共处理 {processed_count} 篇文章") + return processed_count + + def run_matching(self): + """运行文章图片匹配流程""" + logger.info("开始文章图片智能匹配...") + self.log_to_database('INFO', '启动文章图片智能匹配服务', f'worker数量: {WORKER_COUNT}') + + try: + # 获取需要匹配的文章 + pending_articles = self.get_articles_with_tags() + if not pending_articles: + logger.info("没有需要匹配的文章") + return + + # 获取可用图片 + available_images = self.get_available_images_with_tags() + if not available_images: + logger.warning("没有可用图片,无法进行匹配") + self.log_to_database('WARNING', '没有可用图片') + return + + logger.info(f"开始匹配 {len(pending_articles)} 篇文章与 {len(available_images)} 张图片") + self.log_to_database('INFO', '开始批量匹配', + f'文章数: {len(pending_articles)}, 图片数: {len(available_images)}') + + # 清空已处理记录集合 + with self.processing_lock: + self.processed_articles.clear() + + # 使用线程池并行处理 + with ThreadPoolExecutor(max_workers=WORKER_COUNT, thread_name_prefix="MatchWorker") as executor: + # 提交worker任务 + future_to_worker = {} + for worker_id in range(1, WORKER_COUNT + 1): + future = executor.submit( + self.worker_process_articles, + pending_articles, + available_images, + worker_id + ) + future_to_worker[future] = worker_id + + # 等待所有worker完成 + total_processed = 0 + for future in as_completed(future_to_worker): + worker_id = future_to_worker[future] + try: + processed_count = future.result() + total_processed += processed_count + logger.info(f"Worker-{worker_id} 完成,处理了 {processed_count} 篇文章") + except Exception as e: + logger.error(f"Worker-{worker_id} 执行异常: {e}") + self.log_to_database('ERROR', f'Worker-{worker_id} 执行异常', str(e)) + + logger.info(f"匹配完成,共处理 {total_processed} 篇文章") + self.log_to_database('INFO', '匹配任务完成', f'共处理 {total_processed} 篇文章') + + except Exception as e: + error_msg = f"匹配流程异常: {e}" + logger.error(error_msg) + self.log_to_database('ERROR', error_msg, traceback.format_exc()) + +def main(): + """主函数""" + matcher = ArticleImageMatcher() + + try: + # 运行匹配 + matcher.run_matching() + + except Exception as e: + logger.error(f"程序运行异常: {e}") + matcher.log_to_database('ERROR', f'程序运行异常: {e}', traceback.format_exc()) + +if __name__ == "__main__": + main()