#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 文章图片智能匹配脚本 基于通义千问大模型,智能匹配文章与图片,将结果存储到ai_article_images表 """ import os import sys import time import json import logging import requests import pymysql import traceback import threading from datetime import datetime from typing import Dict, List, Optional, Tuple from concurrent.futures import ThreadPoolExecutor, as_completed # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.abspath(__file__))) from database_config import db_manager from log_config import setup_logger # 配置日志记录器 logger = setup_logger( name='article_image_matching', log_file='logs/article_image_matching.log', error_log_file='logs/article_image_matching_error.log', level=logging.INFO, console_output=True ) # 配置常量 QWEN_API_KEY = "sk-e6a38204022a4b538b8954f0584712af" QWEN_API_URL = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation" WORKER_COUNT = 4 # 并行处理worker数量 BATCH_SIZE = 50 # 每批处理的文章数量 MATCH_THRESHOLD = 0.6 # 匹配分数阈值(0-1) class ArticleImageMatcher: def __init__(self): # 使用统一的数据库管理器 self.db_manager = db_manager # 并行处理相关 self.processing_lock = threading.Lock() self.processed_articles = set() logger.info("ArticleImageMatcher 初始化完成") self.log_to_database('INFO', 'ArticleImageMatcher 初始化完成') def log_to_database(self, level: str, message: str, details: Optional[str] = None): """记录日志到数据库ai_logs表""" try: connection = self.db_manager.get_connection() try: with connection.cursor(pymysql.cursors.DictCursor) as cursor: status_map = { 'INFO': 'success', 'WARNING': 'warning', 'ERROR': 'error' } status = status_map.get(level, 'success') sql = """ INSERT INTO ai_logs (user_id, action, description, status, error_message, created_at) VALUES (%s, %s, %s, %s, %s, NOW()) """ cursor.execute(sql, (None, 'article_image_matching', message, status, details)) connection.commit() logger.info(f"日志已记录到数据库: {level} - {message}") finally: connection.close() except Exception as e: logger.error(f"记录日志到数据库失败: {e}") def get_articles_with_tags(self, limit: int = BATCH_SIZE) -> List[Dict]: """ 从ai_article_tags表获取需要匹配图片的文章 Returns: 包含文章ID、标签等信息的列表 """ try: connection = self.db_manager.get_connection() try: with connection.cursor(pymysql.cursors.DictCursor) as cursor: # 查询有标签但未匹配图片的文章 sql = """ SELECT at.id, at.article_id, at.coze_tag, a.title, a.content FROM ai_article_tags at INNER JOIN ai_articles a ON at.article_id = a.id WHERE at.coze_tag IS NOT NULL AND at.coze_tag != '' AND NOT EXISTS ( SELECT 1 FROM ai_article_images ai WHERE ai.article_id = at.article_id ) AND a.status = 'approved' ORDER BY at.id DESC LIMIT %s """ cursor.execute(sql, (limit,)) results = cursor.fetchall() if results: logger.info(f"查询到 {len(results)} 篇需要匹配图片的文章") self.log_to_database('INFO', f"查询到需要匹配图片的文章", f"数量: {len(results)}") else: logger.info("未查询到需要匹配图片的文章") return results finally: connection.close() except Exception as e: error_msg = f"查询文章标签异常: {e}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, traceback.format_exc()) return [] def get_available_images_with_tags(self) -> List[Dict]: """ 从ai_image_tags表获取可用的图片及其标签 Returns: 包含图片ID、标签等信息的列表 """ try: connection = self.db_manager.get_connection() try: with connection.cursor(pymysql.cursors.DictCursor) as cursor: # 查询附加文章数量小于5的图片 sql = """ SELECT id, image_id, image_name, image_url, image_thumb_url, tag_id, tag_name, keywords_id, keywords_name, department_id, department_name, image_attached_article_count FROM ai_image_tags WHERE image_attached_article_count < 5 ORDER BY image_attached_article_count ASC, id DESC """ cursor.execute(sql) results = cursor.fetchall() if results: logger.info(f"查询到 {len(results)} 张可用图片") self.log_to_database('INFO', f"查询到可用图片", f"数量: {len(results)}") else: logger.info("未查询到可用图片") return results finally: connection.close() except Exception as e: error_msg = f"查询图片标签异常: {e}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, traceback.format_exc()) return [] def parse_article_tags(self, coze_tag: str) -> List[str]: """ 解析文章标签(支持JSON格式和逗号分隔格式) Args: coze_tag: 标签字符串 Returns: 标签列表 """ try: if not coze_tag: return [] # 尝试解析JSON格式 try: tags_data = json.loads(coze_tag) if isinstance(tags_data, list): return tags_data elif isinstance(tags_data, dict): return list(tags_data.values()) except json.JSONDecodeError: pass # 按逗号分隔 tags = [tag.strip() for tag in str(coze_tag).split(',') if tag.strip()] return tags except Exception as e: logger.error(f"解析文章标签异常: {e}") return [] def call_qwen_for_matching(self, article_title: str, article_tags: List[str], image_tags: List[str], image_keywords: str) -> Tuple[bool, float]: """ 调用通义千问API评估文章与图片的匹配度 Args: article_title: 文章标题 article_tags: 文章标签列表 image_tags: 图片标签列表 image_keywords: 图片关键词 Returns: (是否匹配, 匹配分数) """ try: # 构建提示词 prompt = f"""请评估以下文章与图片的匹配度: 文章标题:{article_title} 文章标签:{', '.join(article_tags)} 图片标签:{', '.join(image_tags)} 图片关键词:{image_keywords} 请根据标签的语义相关性,给出0-1之间的匹配分数,并说明是否适合匹配。 输出格式:{{"match": true/false, "score": 0.0-1.0, "reason": "匹配原因"}} 只输出JSON格式,不要其他内容。""" headers = { 'Authorization': f'Bearer {QWEN_API_KEY}', 'Content-Type': 'application/json' } payload = { "model": "qwen-max", "input": { "messages": [ { "role": "user", "content": prompt } ] }, "parameters": { "result_format": "message" } } response = requests.post( QWEN_API_URL, json=payload, headers=headers, timeout=30 ) if response.status_code == 200: result = response.json() if result.get('output') and result['output'].get('choices'): message = result['output']['choices'][0].get('message', {}) content = message.get('content', '') # 解析JSON响应 try: match_result = json.loads(content) is_match = match_result.get('match', False) score = match_result.get('score', 0.0) reason = match_result.get('reason', '') logger.info(f"通义千问评估结果 - 匹配: {is_match}, 分数: {score}, 原因: {reason}") return is_match, score except json.JSONDecodeError: logger.error(f"解析通义千问响应失败: {content}") return False, 0.0 else: logger.error(f"通义千问API返回格式异常: {result}") return False, 0.0 else: logger.error(f"通义千问API请求失败,状态码: {response.status_code}") return False, 0.0 except requests.exceptions.Timeout: logger.error("通义千问API请求超时") return False, 0.0 except Exception as e: error_msg = f"调用通义千问API异常: {e}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, traceback.format_exc()) return False, 0.0 def insert_article_image_relation(self, article_id: int, image_data: Dict, match_score: float) -> bool: """ 将匹配结果插入ai_article_images表 Args: article_id: 文章ID image_data: 图片数据 match_score: 匹配分数 Returns: 是否插入成功 """ try: connection = self.db_manager.get_connection() try: with connection.cursor(pymysql.cursors.DictCursor) as cursor: # 查询当前文章下已有图片的最大sort_order query_max_sort = """ SELECT COALESCE(MAX(sort_order), 0) as max_sort_order FROM ai_article_images WHERE article_id = %s """ cursor.execute(query_max_sort, (article_id,)) result = cursor.fetchone() max_sort_order = result.get('max_sort_order', 0) new_sort_order = max_sort_order + 1 # 插入关联记录 insert_sql = """ INSERT INTO ai_article_images (article_id, image_id, image_url, image_thumb_url, image_tag_id, sort_order, keywords_id, keywords_name, department_id, department_name, image_source, created_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()) """ cursor.execute(insert_sql, ( article_id, image_data['image_id'], image_data['image_url'], image_data['image_thumb_url'], image_data['id'], # image_tag_id new_sort_order, image_data['keywords_id'], image_data['keywords_name'], image_data['department_id'], image_data['department_name'], 1 # image_source: 1表示tag匹配 )) # 更新图片附加文章计数 update_sql = """ UPDATE ai_image_tags SET image_attached_article_count = image_attached_article_count + 1 WHERE id = %s """ cursor.execute(update_sql, (image_data['id'],)) connection.commit() logger.info(f"成功插入文章图片关联 - 文章ID: {article_id}, 图片ID: {image_data['image_id']}, 分数: {match_score}") return True finally: connection.close() except Exception as e: error_msg = f"插入文章图片关联异常: {e}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, traceback.format_exc()) return False def match_article_with_images(self, article_data: Dict, available_images: List[Dict]) -> bool: """ 为单篇文章匹配图片 Args: article_data: 文章数据 available_images: 可用图片列表 Returns: 是否匹配成功 """ article_id = article_data['article_id'] article_title = article_data.get('title', '') coze_tag = article_data.get('coze_tag', '') try: # 解析文章标签 article_tags = self.parse_article_tags(coze_tag) if not article_tags: logger.warning(f"文章 {article_id} 没有有效标签,跳过") return False logger.info(f"开始为文章 {article_id} 匹配图片 - 标题: {article_title}, 标签: {article_tags}") best_match = None best_score = 0.0 # 遍历可用图片,找到最佳匹配 for image_data in available_images: image_tags = [image_data['tag_name']] image_keywords = image_data.get('keywords_name', '') # 调用通义千问评估匹配度 is_match, score = self.call_qwen_for_matching( article_title, article_tags, image_tags, image_keywords ) # 记录评估结果 logger.info(f"文章 {article_id} vs 图片 {image_data['image_id']} - 匹配: {is_match}, 分数: {score}") # 更新最佳匹配 if is_match and score > best_score and score >= MATCH_THRESHOLD: best_score = score best_match = image_data # 如果找到匹配的图片,插入关联记录 if best_match: if self.insert_article_image_relation(article_id, best_match, best_score): logger.info(f"文章 {article_id} 成功匹配图片 {best_match['image_id']}, 分数: {best_score}") self.log_to_database('INFO', f"文章匹配成功", f"文章ID: {article_id}, 图片ID: {best_match['image_id']}, 分数: {best_score}") return True else: return False else: logger.info(f"文章 {article_id} 未找到合适的匹配图片") self.log_to_database('WARNING', f"文章未找到匹配图片", f"文章ID: {article_id}") return False except Exception as e: error_msg = f"匹配文章图片异常 - 文章ID: {article_id}, 错误: {e}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, traceback.format_exc()) return False def get_next_available_article(self, pending_articles: List[Dict]) -> Optional[Dict]: """线程安全地获取下一篇待处理文章""" with self.processing_lock: for article_data in pending_articles: article_id = article_data['article_id'] if article_id not in self.processed_articles: self.processed_articles.add(article_id) return article_data return None def worker_process_articles(self, pending_articles: List[Dict], available_images: List[Dict], worker_id: int) -> int: """Worker线程处理文章匹配""" processed_count = 0 thread_name = f"Worker-{worker_id}" threading.current_thread().name = thread_name logger.info(f"[{thread_name}] 启动,准备处理文章匹配") while True: # 线程安全地获取下一篇待处理文章 article_data = self.get_next_available_article(pending_articles) if not article_data: logger.info(f"[{thread_name}] 没有更多待处理文章,退出") break # 匹配文章与图片 if self.match_article_with_images(article_data, available_images): processed_count += 1 logger.info(f"[{thread_name}] 成功处理文章: {article_data['article_id']}") else: logger.warning(f"[{thread_name}] 文章处理失败: {article_data['article_id']}") logger.info(f"[{thread_name}] 完成,共处理 {processed_count} 篇文章") return processed_count def run_matching(self): """运行文章图片匹配流程""" logger.info("开始文章图片智能匹配...") self.log_to_database('INFO', '启动文章图片智能匹配服务', f'worker数量: {WORKER_COUNT}') try: # 获取需要匹配的文章 pending_articles = self.get_articles_with_tags() if not pending_articles: logger.info("没有需要匹配的文章") return # 获取可用图片 available_images = self.get_available_images_with_tags() if not available_images: logger.warning("没有可用图片,无法进行匹配") self.log_to_database('WARNING', '没有可用图片') return logger.info(f"开始匹配 {len(pending_articles)} 篇文章与 {len(available_images)} 张图片") self.log_to_database('INFO', '开始批量匹配', f'文章数: {len(pending_articles)}, 图片数: {len(available_images)}') # 清空已处理记录集合 with self.processing_lock: self.processed_articles.clear() # 使用线程池并行处理 with ThreadPoolExecutor(max_workers=WORKER_COUNT, thread_name_prefix="MatchWorker") as executor: # 提交worker任务 future_to_worker = {} for worker_id in range(1, WORKER_COUNT + 1): future = executor.submit( self.worker_process_articles, pending_articles, available_images, worker_id ) future_to_worker[future] = worker_id # 等待所有worker完成 total_processed = 0 for future in as_completed(future_to_worker): worker_id = future_to_worker[future] try: processed_count = future.result() total_processed += processed_count logger.info(f"Worker-{worker_id} 完成,处理了 {processed_count} 篇文章") except Exception as e: logger.error(f"Worker-{worker_id} 执行异常: {e}") self.log_to_database('ERROR', f'Worker-{worker_id} 执行异常', str(e)) logger.info(f"匹配完成,共处理 {total_processed} 篇文章") self.log_to_database('INFO', '匹配任务完成', f'共处理 {total_processed} 篇文章') except Exception as e: error_msg = f"匹配流程异常: {e}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, traceback.format_exc()) def main(): """主函数""" matcher = ArticleImageMatcher() try: # 运行匹配 matcher.run_matching() except Exception as e: logger.error(f"程序运行异常: {e}") matcher.log_to_database('ERROR', f'程序运行异常: {e}', traceback.format_exc()) if __name__ == "__main__": main()