#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 文章图片智能匹配脚本 基于通义千问大模型,智能匹配文章与图片,将结果存储到ai_article_images表 """ import os import sys import time import json import logging import requests import pymysql import traceback import threading from datetime import datetime from typing import Dict, List, Optional, Tuple from concurrent.futures import ThreadPoolExecutor, as_completed # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.abspath(__file__))) from database_config import db_manager from log_config import setup_logger # 配置日志记录器 logger = setup_logger( name='article_image_matching', log_file='logs/article_image_matching.log', error_log_file='logs/article_image_matching_error.log', level=logging.INFO, console_output=True ) # 配置常量 QWEN_API_KEY = "sk-e6a38204022a4b538b8954f0584712af" QWEN_API_URL = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation" WORKER_COUNT = 4 # 并行处理worker数量 BATCH_SIZE = 50 # 每批处理的文章数量 MATCH_THRESHOLD = 0.6 # 匹配分数阈值(0-1) class ArticleImageMatcher: def __init__(self): # 使用统一的数据库管理器 self.db_manager = db_manager # 并行处理相关 self.processing_lock = threading.Lock() self.processed_articles = set() logger.info("ArticleImageMatcher 初始化完成") self.log_to_database('INFO', 'ArticleImageMatcher 初始化完成') def log_to_database(self, level: str, message: str, details: Optional[str] = None): """记录日志到数据库ai_logs表""" try: connection = self.db_manager.get_connection() try: with connection.cursor(pymysql.cursors.DictCursor) as cursor: status_map = { 'INFO': 'success', 'WARNING': 'warning', 'ERROR': 'error' } status = status_map.get(level, 'success') sql = """ INSERT INTO ai_logs (user_id, action, description, status, error_message, created_at) VALUES (%s, %s, %s, %s, %s, NOW()) """ cursor.execute(sql, (None, 'article_image_matching', message, status, details)) connection.commit() logger.info(f"日志已记录到数据库: {level} - {message}") finally: connection.close() except Exception as e: logger.error(f"记录日志到数据库失败: {e}") def get_articles_with_tags(self, limit: int = BATCH_SIZE) -> List[Dict]: """ 从ai_articles表获取需要匹配图片的文章 Returns: 包含文章ID、标签等信息的列表 """ try: connection = self.db_manager.get_connection() try: with connection.cursor(pymysql.cursors.DictCursor) as cursor: # 查询有标签但未匹配图片的文章 sql = """ SELECT a.id as article_id, a.title, a.content, a.coze_tag, a.department FROM ai_articles a WHERE NOT EXISTS ( SELECT 1 FROM ai_article_images ai WHERE ai.article_id = a.id ) AND a.status = 'pending_review' AND a.review_user_id = 152 ORDER BY a.id DESC LIMIT %s """ cursor.execute(sql, (limit,)) results = cursor.fetchall() if results: logger.info(f"查询到 {len(results)} 篇需要匹配图片的文章") self.log_to_database('INFO', f"查询到需要匹配图片的文章", f"数量: {len(results)}") else: logger.info("未查询到需要匹配图片的文章") return results finally: connection.close() except Exception as e: error_msg = f"查询文章标签异常: {e}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, traceback.format_exc()) return [] def get_available_images_with_tags(self) -> List[Dict]: """ 从ai_image_tags表获取可用的图片及其标签(状态为generate且挂载次数<5) Returns: 包含图片ID、标签等信息的列表 """ try: connection = self.db_manager.get_connection() try: with connection.cursor(pymysql.cursors.DictCursor) as cursor: # 查询状态为generate且附加文章数量小于5的图片(不使用JOIN) # 包含image_source字段用于区分实拍图和模板图 sql = """ SELECT it.id, it.image_id, it.image_name, it.image_url, it.image_thumb_url, it.tag_id, it.tag_name, it.keywords_id, it.keywords_name, it.department_id, it.department_name, it.image_attached_article_count, it.image_source FROM ai_image_tags it WHERE it.image_attached_article_count < 5 AND it.image_id IN ( SELECT id FROM ai_images WHERE status = 'generate' ) ORDER BY it.image_attached_article_count ASC, it.id DESC """ cursor.execute(sql) results = cursor.fetchall() if results: logger.info(f"查询到 {len(results)} 张可用图片") self.log_to_database('INFO', f"查询到可用图片", f"数量: {len(results)}") else: logger.info("未查询到可用图片") return results finally: connection.close() except Exception as e: error_msg = f"查询图片标签异常: {e}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, traceback.format_exc()) return [] def parse_article_tags(self, coze_tag: str) -> List[str]: """ 解析文章标签(支持JSON格式和逗号分隔格式) Args: coze_tag: 标签字符串 Returns: 标签列表 """ try: if not coze_tag: return [] # 尝试解析JSON格式 try: tags_data = json.loads(coze_tag) if isinstance(tags_data, list): return tags_data elif isinstance(tags_data, dict): return list(tags_data.values()) except json.JSONDecodeError: pass # 按逗号分隔 tags = [tag.strip() for tag in str(coze_tag).split(',') if tag.strip()] return tags except Exception as e: logger.error(f"解析文章标签异常: {e}") return [] def call_qwen_for_matching(self, article_title: str, article_tags: List[str], image_tags: List[str], image_keywords: str) -> Tuple[bool, float]: """ 调用通义千问API评估文章与图片的匹配度 Args: article_title: 文章标题 article_tags: 文章标签列表 image_tags: 图片标签列表 image_keywords: 图片关键词 Returns: (是否匹配, 匹配分数) """ try: # 构建提示词 prompt = f"""请评估以下文章与图片的匹配度: 文章标题:{article_title} 文章标签:{', '.join(article_tags)} 图片标签:{', '.join(image_tags)} 图片关键词:{image_keywords} 请根据标签的语义相关性,给出0-1之间的匹配分数,并说明是否适合匹配。 输出格式:{{"match": true/false, "score": 0.0-1.0, "reason": "匹配原因"}} 只输出JSON格式,不要其他内容。""" headers = { 'Authorization': f'Bearer {QWEN_API_KEY}', 'Content-Type': 'application/json' } payload = { "model": "qwen-max", "input": { "messages": [ { "role": "user", "content": prompt } ] }, "parameters": { "result_format": "message" } } response = requests.post( QWEN_API_URL, json=payload, headers=headers, timeout=30 ) if response.status_code == 200: result = response.json() if result.get('output') and result['output'].get('choices'): message = result['output']['choices'][0].get('message', {}) content = message.get('content', '') # 解析JSON响应 try: match_result = json.loads(content) is_match = match_result.get('match', False) score = match_result.get('score', 0.0) reason = match_result.get('reason', '') logger.info(f"通义千问评估结果 - 匹配: {is_match}, 分数: {score}, 原因: {reason}") return is_match, score except json.JSONDecodeError: logger.error(f"解析通义千问响应失败: {content}") return False, 0.0 else: logger.error(f"通义千问API返回格式异常: {result}") return False, 0.0 else: logger.error(f"通义千问API请求失败,状态码: {response.status_code}") return False, 0.0 except requests.exceptions.Timeout: logger.error("通义千问API请求超时") return False, 0.0 except Exception as e: error_msg = f"调用通义千问API异常: {e}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, traceback.format_exc()) return False, 0.0 def update_article_status(self, article_id: int, new_status: str) -> bool: """ 更新文章状态 Args: article_id: 文章ID new_status: 新状态 Returns: 是否更新成功 """ try: connection = self.db_manager.get_connection() try: with connection.cursor(pymysql.cursors.DictCursor) as cursor: # 更新文章状态 update_sql = """ UPDATE ai_articles SET status = %s WHERE id = %s """ cursor.execute(update_sql, (new_status, article_id)) connection.commit() logger.info(f"成功更新文章 {article_id} 状态为 {new_status}") return True finally: connection.close() except Exception as e: error_msg = f"更新文章状态异常: {e}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, traceback.format_exc()) return False def insert_article_image_relation(self, article_id: int, image_data: Dict, match_score: float) -> bool: """ 将匹配结果插入ai_article_images表 Args: article_id: 文章ID image_data: 图片数据 match_score: 匹配分数 Returns: 是否插入成功 """ try: connection = self.db_manager.get_connection() try: with connection.cursor(pymysql.cursors.DictCursor) as cursor: # 查询当前文章下已有图片的最大sort_order query_max_sort = """ SELECT COALESCE(MAX(sort_order), 0) as max_sort_order FROM ai_article_images WHERE article_id = %s """ cursor.execute(query_max_sort, (article_id,)) result = cursor.fetchone() max_sort_order = result.get('max_sort_order', 0) new_sort_order = max_sort_order + 1 # 插入关联记录 insert_sql = """ INSERT INTO ai_article_images (article_id, image_id, image_url, image_thumb_url, image_tag_id, sort_order, keywords_id, keywords_name, department_id, department_name, image_source, created_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()) """ cursor.execute(insert_sql, ( article_id, image_data['image_id'], image_data['image_url'], image_data['image_thumb_url'], image_data['id'], # image_tag_id new_sort_order, image_data['keywords_id'], image_data['keywords_name'], image_data['department_id'], image_data['department_name'], 1 # image_source: 1表示tag匹配 )) # 更新图片附加文章计数 update_sql = """ UPDATE ai_image_tags SET image_attached_article_count = image_attached_article_count + 1 WHERE id = %s """ cursor.execute(update_sql, (image_data['id'],)) # 更新图片状态为published update_image_status_sql = """ UPDATE ai_images SET status = 'published' WHERE id = %s """ cursor.execute(update_image_status_sql, (image_data['image_id'],)) # 更新文章状态为published_review self.update_article_status(article_id, 'published_review') connection.commit() logger.info(f"成功插入文章图片关联 - 文章ID: {article_id}, 图片ID: {image_data['image_id']}, 分数: {match_score}") return True finally: connection.close() except Exception as e: error_msg = f"插入文章图片关联异常: {e}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, traceback.format_exc()) return False def generate_image_with_gemini(self, prompt: str, article_tags: List[str], article_id: int) -> Optional[str]: """ 使用Gemini生成图片并上传到服务器 Args: prompt: 图片生成提示词 article_tags: 文章标签列表,用于查询department和keywords article_id: 文章ID,用于关联图片 Returns: 上传后的图片URL,失败返回None """ try: # 导入必要的库 from google import genai from google.genai.client import HttpOptions client = genai.Client( http_options=HttpOptions(base_url="https://work.poloapi.com"), api_key="sk-V4tPnDgzFPa7nxWrvKnNJsW8ZcBXXPuGmjfgvPVRnwpHoeob" ) logger.info(f"正在调用Gemini API生成图片,提示词: {prompt[:50]}...") # 生成内容 response = client.models.generate_content( model="gemini-3-pro-image-preview", contents=[prompt], ) # 检查是否有候选答案 if not response.candidates: raise Exception("Gemini API未返回任何候选答案") # 处理响应 candidate = response.candidates[0] if not candidate.content or not candidate.content.parts: raise Exception("Gemini API返回的候选答案中没有内容部分") for part in candidate.content.parts: if hasattr(part, 'inline_data') and part.inline_data is not None: image_data = part.inline_data if image_data.data is not None: # 生成唯一的文件名(基于时间戳) timestamp_ms = int(time.time() * 1000) image_filename = f"{timestamp_ms}.png" today_date = datetime.now().strftime("%Y%m%d") image_url_path = f"{today_date}/{image_filename}" temp_filename = f"temp_generated_image_{timestamp_ms}.png" # 保存图片数据到临时文件 with open(temp_filename, 'wb') as f: f.write(image_data.data) logger.info(f"Gemini生成图片成功: {temp_filename}") # 先将图片信息插入数据库 image_info = self.insert_generated_image_to_db(image_filename, image_url_path, article_tags) if not image_info: raise Exception("插入图片信息到数据库失败") logger.info(f"图片信息已插入数据库,tag_image_id: {image_info['tag_image_id']}, image_id: {image_info['image_id']}") # 使用tag_image_id上传图片到服务器 uploaded_url = self.upload_image_to_server(temp_filename, image_info['tag_image_id']) # 将文章与图片的关联信息插入ai_article_images表 article_image_id = self.insert_article_image_relation_for_generated( article_id=article_id, image_id=image_info['image_id'], image_url=image_info['image_url'], image_thumb_url=image_info['image_thumb_url'], tag_image_id=image_info['tag_image_id'], keywords_id=image_info['keywords_id'], keywords_name=image_info['keywords_name'], department_id=image_info['department_id'], department_name=image_info['department_name'], image_source=0 # 默认值 ) if article_image_id: logger.info(f"文章图片关联信息已创建,ai_article_images.id: {article_image_id}") # 删除临时文件 os.remove(temp_filename) logger.info(f"图片已上传到服务器: {uploaded_url}") return uploaded_url raise Exception("Gemini API未返回有效的图片数据") except ImportError: logger.error("错误:未安装google-genai库,请运行 'pip install google-genai' 进行安装") return None except Exception as e: error_msg = f"Gemini生成图片异常: {e}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, traceback.format_exc()) return None def insert_generated_image_to_db(self, image_name: str, image_url: str, article_tags: List[str]) -> Optional[Dict]: """ 将Gemini生成的图片信息插入数据库 Args: image_name: 图片文件名 image_url: 图片URL路径 article_tags: 文章标签列表 Returns: 包含插入信息的字典 """ try: connection = self.db_manager.get_connection() try: with connection.cursor(pymysql.cursors.DictCursor) as cursor: # 根据文章标签查询ai_image_tags表 if article_tags: query = """ SELECT department_name, keywords_name, department_id, keywords_id, tag_id FROM ai_image_tags WHERE tag_name = %s LIMIT 1 """ cursor.execute(query, (article_tags[0],)) tag_info = cursor.fetchone() if tag_info: department = tag_info['department_name'] keywords = tag_info['keywords_name'] department_id = tag_info['department_id'] keywords_id = tag_info['keywords_id'] tag_id = tag_info['tag_id'] tag_name = article_tags[0] else: department = "AI生成" keywords = "AI图片" department_id = 1 keywords_id = 1 tag_id = 1 tag_name = article_tags[0] if article_tags else "AI生成" else: department = "AI生成" keywords = "AI图片" department_id = 1 keywords_id = 1 tag_id = 1 tag_name = "AI生成" # 插入ai_images表 insert_image_query = """ INSERT INTO ai_images (image_name, image_url, image_thumb_url, department, keywords, image_type, upload_user_id, status) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """ cursor.execute(insert_image_query, ( image_name, image_url, '', department, keywords, 'medical', 1, 'active' )) image_id = cursor.lastrowid logger.info(f"图片信息已插入ai_images表,image_id: {image_id}") # 插入ai_image_tags表 insert_tag_query = """ INSERT INTO ai_image_tags (image_id, image_name, image_url, image_thumb_url, tag_id, tag_name, keywords_id, keywords_name, department_id, department_name, image_source, created_user_id, image_attached_article_count) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ cursor.execute(insert_tag_query, ( image_id, image_name, image_url, '', tag_id, tag_name, keywords_id, keywords, department_id, department, 3, 1, 0 # image_source: 3表示AI生成 )) tag_image_id = cursor.lastrowid logger.info(f"图片标签信息已插入ai_image_tags表,tag_image_id: {tag_image_id}") connection.commit() return { 'tag_image_id': tag_image_id, 'image_id': image_id, 'image_url': image_url, 'image_thumb_url': '', 'keywords_id': keywords_id, 'keywords_name': keywords, 'department_id': department_id, 'department_name': department } finally: connection.close() except Exception as e: logger.error(f"插入图片信息到数据库失败: {e}") return None def upload_image_to_server(self, image_path: str, tag_image_id: int) -> str: """ 上传图片到服务器 Args: image_path: 本地图片路径 tag_image_id: 图片标签ID Returns: 服务器上的图片URL """ base_url = "http://47.99.184.230:8324" jwt_token = self.login_and_get_jwt_token(base_url) if not jwt_token: raise Exception("获取JWT token失败,无法上传图片") upload_url = f"{base_url}/api/images/upload" headers = {'Authorization': f'Bearer {jwt_token}'} with open(image_path, 'rb') as image_file: files = {'file': image_file} data = {'tag_image_id': tag_image_id} response = requests.post(upload_url, headers=headers, files=files, data=data) logger.info(f"图片上传响应状态码: {response.status_code}") if response.status_code == 200: result = response.json() if result.get('code') == 200: return result['data']['http_image_url'] else: raise Exception(f"图片上传失败: {result.get('message', '未知错误')}") else: raise Exception(f"图片上传请求失败,状态码: {response.status_code}") def login_and_get_jwt_token(self, base_url: str) -> Optional[str]: """登录获取JWT token""" login_url = f"{base_url}/api/auth/login" login_data = {"username": "user010", "password": "@5^2W6R7"} try: response = requests.post(login_url, json=login_data, headers={'Content-Type': 'application/json'}) if response.status_code == 200: result = response.json() if result.get('code') == 200: logger.info("JWT token获取成功") return result['data']['token'] logger.error(f"登录失败: {response.status_code}") return None except Exception as e: logger.error(f"登录异常: {e}") return None def insert_article_image_relation_for_generated(self, article_id: int, image_id: int, image_url: str, image_thumb_url: str, tag_image_id: int, keywords_id: int, keywords_name: str, department_id: int, department_name: str, image_source: int = 0) -> Optional[int]: """ 将文章与生成图片的关联信息插入ai_article_images表 """ try: connection = self.db_manager.get_connection() try: with connection.cursor(pymysql.cursors.DictCursor) as cursor: # 查询当前文章下已有图片的最大sort_order query_max_sort = """ SELECT COALESCE(MAX(sort_order), 0) as max_sort_order FROM ai_article_images WHERE article_id = %s """ cursor.execute(query_max_sort, (article_id,)) result = cursor.fetchone() max_sort_order = result['max_sort_order'] if result else 0 new_sort_order = max_sort_order + 1 # 插入ai_article_images表 insert_query = """ INSERT INTO ai_article_images (article_id, image_id, image_url, image_thumb_url, image_tag_id, sort_order, keywords_id, keywords_name, department_id, department_name, image_source) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ cursor.execute(insert_query, ( article_id, image_id, image_url, image_thumb_url, tag_image_id, new_sort_order, keywords_id, keywords_name, department_id, department_name, image_source )) article_image_id = cursor.lastrowid logger.info(f"文章图片关联信息已插入ai_article_images表,id: {article_image_id}") # 更新图片附加文章计数 update_count_sql = """ UPDATE ai_image_tags SET image_attached_article_count = image_attached_article_count + 1 WHERE id = %s """ cursor.execute(update_count_sql, (tag_image_id,)) # 更新图片状态为published update_image_status_sql = """ UPDATE ai_images SET status = 'published' WHERE id = %s """ cursor.execute(update_image_status_sql, (image_id,)) # 更新文章状态为published_review self.update_article_status(article_id, 'published_review') connection.commit() return article_image_id finally: connection.close() except Exception as e: logger.error(f"插入文章图片关联信息失败: {e}") return None def match_article_with_images(self, article_data: Dict, available_images: List[Dict]) -> bool: """ 为单篇文章匹配图片,未成功匹配时调用Gemini生图 Args: article_data: 文章数据 available_images: 可用图片列表 Returns: 是否匹配成功 """ article_id = article_data['article_id'] article_title = article_data.get('title', '') article_content = article_data.get('content', '') coze_tag = article_data.get('coze_tag', '') article_department = article_data.get('department', '') try: # 解析文章标签 if not coze_tag: logger.warning(f"文章 {article_id} 没有标签信息,跳过") return False article_tags = self.parse_article_tags(coze_tag) if not article_tags: logger.warning(f"文章 {article_id} 没有有效标签,跳过") return False logger.info(f"开始为文章 {article_id} 匹配图片 - 标题: {article_title}, 标签: {article_tags}, 科室: {article_department}") # 根据文章科室缩小图片范围 department_filtered_images = [] for img in available_images: # 优先匹配科室相同的图片 if article_department and img.get('department_name', '').lower() == article_department.lower(): department_filtered_images.append(img) # 如果没有匹配科室的图片,则使用所有图片 if not department_filtered_images: department_filtered_images = available_images # 根据图片类型(实拍图/模板图)进行分类处理 # 根据image_source字段:1=clean_images(模板图), 2=Flower_character(实拍图) actual_photos = [] # 实拍图 template_photos = [] # 模板图 for img in department_filtered_images: image_source = img.get('image_source', 1) # 默认为模板图 if image_source == 2: # 实拍图 actual_photos.append(img) else: # 模板图 template_photos.append(img) # 按照挂载次数排序,优先选择挂载次数少的图片 actual_photos.sort(key=lambda x: x['image_attached_article_count']) template_photos.sort(key=lambda x: x['image_attached_article_count']) # 合并图片列表,优先使用实拍图,然后是模板图 filtered_images = actual_photos + template_photos best_match = None best_score = 0.0 # 遍历筛选后的可用图片,找到最佳匹配 for image_data in filtered_images: image_tags = [image_data['tag_name']] image_keywords = image_data.get('keywords_name', '') image_department = image_data.get('department_name', '') # 调用通义千问评估匹配度 is_match, score = self.call_qwen_for_matching( article_title, article_tags, image_tags, image_keywords ) # 记录评估结果 logger.info(f"文章 {article_id} vs 图片 {image_data['image_id']} - 匹配: {is_match}, 分数: {score}, 科室: {image_department}") # 更新最佳匹配 if is_match and score > best_score and score >= MATCH_THRESHOLD: best_score = score best_match = image_data # 如果找到匹配的图片,插入关联记录 if best_match: if self.insert_article_image_relation(article_id, best_match, best_score): logger.info(f"文章 {article_id} 成功匹配图片 {best_match['image_id']}, 分数: {best_score}") self.log_to_database('INFO', f"文章匹配成功", f"文章ID: {article_id}, 图片ID: {best_match['image_id']}, 分数: {best_score}") return True else: return False else: # 未找到合适的匹配图片,使用Gemini生成图片 logger.info(f"文章 {article_id} 未找到合适的匹配图片,调用Gemini生成图片") self.log_to_database('WARNING', f"文章未找到匹配图片,尝试生成图片", f"文章ID: {article_id}") # 构建生成提示词 prompt = f"与'{article_title}'相关的插图,标签: {', '.join(article_tags)}" generated_image_url = self.generate_image_with_gemini(prompt, article_tags, article_id) if generated_image_url: logger.info(f"文章 {article_id} 成功生成图片: {generated_image_url}") self.log_to_database('INFO', f"文章生成图片成功", f"文章ID: {article_id}, 图片URL: {generated_image_url}") return True else: logger.error(f"文章 {article_id} 生成图片失败") self.log_to_database('ERROR', f"文章生成图片失败", f"文章ID: {article_id}") return False except Exception as e: error_msg = f"匹配文章图片异常 - 文章ID: {article_id}, 错误: {e}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, traceback.format_exc()) return False def get_next_available_article(self, pending_articles: List[Dict]) -> Optional[Dict]: """线程安全地获取下一篇待处理文章""" with self.processing_lock: for article_data in pending_articles: article_id = article_data['article_id'] if article_id not in self.processed_articles: self.processed_articles.add(article_id) return article_data return None def worker_process_articles(self, pending_articles: List[Dict], available_images: List[Dict], worker_id: int) -> int: """Worker线程处理文章匹配""" processed_count = 0 thread_name = f"Worker-{worker_id}" threading.current_thread().name = thread_name logger.info(f"[{thread_name}] 启动,准备处理文章匹配") while True: # 线程安全地获取下一篇待处理文章 article_data = self.get_next_available_article(pending_articles) if not article_data: logger.info(f"[{thread_name}] 没有更多待处理文章,退出") break # 匹配文章与图片 if self.match_article_with_images(article_data, available_images): processed_count += 1 logger.info(f"[{thread_name}] 成功处理文章: {article_data['article_id']}") else: logger.warning(f"[{thread_name}] 文章处理失败: {article_data['article_id']}") logger.info(f"[{thread_name}] 完成,共处理 {processed_count} 篇文章") return processed_count def run_matching(self): """运行文章图片匹配流程""" logger.info("开始文章图片智能匹配...") self.log_to_database('INFO', '启动文章图片智能匹配服务', f'worker数量: {WORKER_COUNT}') try: # 获取需要匹配的文章 pending_articles = self.get_articles_with_tags() if not pending_articles: logger.info("没有需要匹配的文章") return # 获取可用图片 available_images = self.get_available_images_with_tags() if not available_images: logger.warning("没有可用图片,无法进行匹配") self.log_to_database('WARNING', '没有可用图片') return logger.info(f"开始匹配 {len(pending_articles)} 篇文章与 {len(available_images)} 张图片") self.log_to_database('INFO', '开始批量匹配', f'文章数: {len(pending_articles)}, 图片数: {len(available_images)}') # 清空已处理记录集合 with self.processing_lock: self.processed_articles.clear() # 使用线程池并行处理 with ThreadPoolExecutor(max_workers=WORKER_COUNT, thread_name_prefix="MatchWorker") as executor: # 提交worker任务 future_to_worker = {} for worker_id in range(1, WORKER_COUNT + 1): future = executor.submit( self.worker_process_articles, pending_articles, available_images, worker_id ) future_to_worker[future] = worker_id # 等待所有worker完成 total_processed = 0 for future in as_completed(future_to_worker): worker_id = future_to_worker[future] try: processed_count = future.result() total_processed += processed_count logger.info(f"Worker-{worker_id} 完成,处理了 {processed_count} 篇文章") except Exception as e: logger.error(f"Worker-{worker_id} 执行异常: {e}") self.log_to_database('ERROR', f'Worker-{worker_id} 执行异常', str(e)) logger.info(f"匹配完成,共处理 {total_processed} 篇文章") self.log_to_database('INFO', '匹配任务完成', f'共处理 {total_processed} 篇文章') except Exception as e: error_msg = f"匹配流程异常: {e}" logger.error(error_msg) self.log_to_database('ERROR', error_msg, traceback.format_exc()) def main(): """主函数""" matcher = ArticleImageMatcher() try: # 运行匹配 matcher.run_matching() except Exception as e: logger.error(f"程序运行异常: {e}") matcher.log_to_database('ERROR', f'程序运行异常: {e}', traceback.format_exc()) if __name__ == "__main__": main()