Files
baijiahao_text_to_image/article_auto_image_matching.py

853 lines
37 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文章图片智能匹配脚本
基于通义千问大模型智能匹配文章与图片将结果存储到ai_article_images表
"""
import os
import sys
import time
import json
import logging
import requests
import pymysql
import traceback
import threading
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
# 添加项目根目录到Python路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from database_config import db_manager
from log_config import setup_logger
# 配置日志记录器
logger = setup_logger(
name='article_image_matching',
log_file='logs/article_image_matching.log',
error_log_file='logs/article_image_matching_error.log',
level=logging.INFO,
console_output=True
)
# 配置常量
QWEN_API_KEY = "sk-e6a38204022a4b538b8954f0584712af"
QWEN_API_URL = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"
WORKER_COUNT = 4 # 并行处理worker数量
BATCH_SIZE = 50 # 每批处理的文章数量
MATCH_THRESHOLD = 0.6 # 匹配分数阈值0-1
class ArticleImageMatcher:
def __init__(self):
# 使用统一的数据库管理器
self.db_manager = db_manager
# 并行处理相关
self.processing_lock = threading.Lock()
self.processed_articles = set()
logger.info("ArticleImageMatcher 初始化完成")
self.log_to_database('INFO', 'ArticleImageMatcher 初始化完成')
def log_to_database(self, level: str, message: str, details: Optional[str] = None):
"""记录日志到数据库ai_logs表"""
try:
connection = self.db_manager.get_connection()
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
status_map = {
'INFO': 'success',
'WARNING': 'warning',
'ERROR': 'error'
}
status = status_map.get(level, 'success')
sql = """
INSERT INTO ai_logs (user_id, action, description, status, error_message, created_at)
VALUES (%s, %s, %s, %s, %s, NOW())
"""
cursor.execute(sql, (None, 'article_image_matching', message, status, details))
connection.commit()
logger.info(f"日志已记录到数据库: {level} - {message}")
finally:
connection.close()
except Exception as e:
logger.error(f"记录日志到数据库失败: {e}")
def get_articles_with_tags(self, limit: int = BATCH_SIZE) -> List[Dict]:
"""
从ai_article_tags表获取需要匹配图片的文章
Returns:
包含文章ID标签等信息的列表
"""
try:
connection = self.db_manager.get_connection()
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
# 查询有标签但未匹配图片的文章
sql = """
SELECT
at.id,
at.article_id,
at.coze_tag,
a.title,
a.content
FROM ai_article_tags at
INNER JOIN ai_articles a ON at.article_id = a.id
WHERE at.coze_tag IS NOT NULL
AND at.coze_tag != ''
AND NOT EXISTS (
SELECT 1 FROM ai_article_images ai
WHERE ai.article_id = at.article_id
)
AND a.status = 'approved'
ORDER BY at.id DESC
LIMIT %s
"""
cursor.execute(sql, (limit,))
results = cursor.fetchall()
if results:
logger.info(f"查询到 {len(results)} 篇需要匹配图片的文章")
self.log_to_database('INFO', f"查询到需要匹配图片的文章", f"数量: {len(results)}")
else:
logger.info("未查询到需要匹配图片的文章")
return results
finally:
connection.close()
except Exception as e:
error_msg = f"查询文章标签异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return []
def get_available_images_with_tags(self) -> List[Dict]:
"""
从ai_image_tags表获取可用的图片及其标签
Returns:
包含图片ID标签等信息的列表
"""
try:
connection = self.db_manager.get_connection()
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
# 查询附加文章数量小于5的图片
sql = """
SELECT
id,
image_id,
image_name,
image_url,
image_thumb_url,
tag_id,
tag_name,
keywords_id,
keywords_name,
department_id,
department_name,
image_attached_article_count
FROM ai_image_tags
WHERE image_attached_article_count < 5
ORDER BY image_attached_article_count ASC, id DESC
"""
cursor.execute(sql)
results = cursor.fetchall()
if results:
logger.info(f"查询到 {len(results)} 张可用图片")
self.log_to_database('INFO', f"查询到可用图片", f"数量: {len(results)}")
else:
logger.info("未查询到可用图片")
return results
finally:
connection.close()
except Exception as e:
error_msg = f"查询图片标签异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return []
def parse_article_tags(self, coze_tag: str) -> List[str]:
"""
解析文章标签支持JSON格式和逗号分隔格式
Args:
coze_tag: 标签字符串
Returns:
标签列表
"""
try:
if not coze_tag:
return []
# 尝试解析JSON格式
try:
tags_data = json.loads(coze_tag)
if isinstance(tags_data, list):
return tags_data
elif isinstance(tags_data, dict):
return list(tags_data.values())
except json.JSONDecodeError:
pass
# 按逗号分隔
tags = [tag.strip() for tag in str(coze_tag).split(',') if tag.strip()]
return tags
except Exception as e:
logger.error(f"解析文章标签异常: {e}")
return []
def call_qwen_for_matching(self, article_title: str, article_tags: List[str],
image_tags: List[str], image_keywords: str) -> Tuple[bool, float]:
"""
调用通义千问API评估文章与图片的匹配度
Args:
article_title: 文章标题
article_tags: 文章标签列表
image_tags: 图片标签列表
image_keywords: 图片关键词
Returns:
(是否匹配, 匹配分数)
"""
try:
# 构建提示词
prompt = f"""请评估以下文章与图片的匹配度:
文章标题{article_title}
文章标签{', '.join(article_tags)}
图片标签{', '.join(image_tags)}
图片关键词{image_keywords}
请根据标签的语义相关性给出0-1之间的匹配分数并说明是否适合匹配
输出格式{{"match": true/false, "score": 0.0-1.0, "reason": "匹配原因"}}
只输出JSON格式不要其他内容"""
headers = {
'Authorization': f'Bearer {QWEN_API_KEY}',
'Content-Type': 'application/json'
}
payload = {
"model": "qwen-max",
"input": {
"messages": [
{
"role": "user",
"content": prompt
}
]
},
"parameters": {
"result_format": "message"
}
}
response = requests.post(
QWEN_API_URL,
json=payload,
headers=headers,
timeout=30
)
if response.status_code == 200:
result = response.json()
if result.get('output') and result['output'].get('choices'):
message = result['output']['choices'][0].get('message', {})
content = message.get('content', '')
# 解析JSON响应
try:
match_result = json.loads(content)
is_match = match_result.get('match', False)
score = match_result.get('score', 0.0)
reason = match_result.get('reason', '')
logger.info(f"通义千问评估结果 - 匹配: {is_match}, 分数: {score}, 原因: {reason}")
return is_match, score
except json.JSONDecodeError:
logger.error(f"解析通义千问响应失败: {content}")
return False, 0.0
else:
logger.error(f"通义千问API返回格式异常: {result}")
return False, 0.0
else:
logger.error(f"通义千问API请求失败状态码: {response.status_code}")
return False, 0.0
except requests.exceptions.Timeout:
logger.error("通义千问API请求超时")
return False, 0.0
except Exception as e:
error_msg = f"调用通义千问API异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return False, 0.0
def insert_article_image_relation(self, article_id: int, image_data: Dict,
match_score: float) -> bool:
"""
将匹配结果插入ai_article_images表
Args:
article_id: 文章ID
image_data: 图片数据
match_score: 匹配分数
Returns:
是否插入成功
"""
try:
connection = self.db_manager.get_connection()
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
# 查询当前文章下已有图片的最大sort_order
query_max_sort = """
SELECT COALESCE(MAX(sort_order), 0) as max_sort_order
FROM ai_article_images
WHERE article_id = %s
"""
cursor.execute(query_max_sort, (article_id,))
result = cursor.fetchone()
max_sort_order = result.get('max_sort_order', 0)
new_sort_order = max_sort_order + 1
# 插入关联记录
insert_sql = """
INSERT INTO ai_article_images
(article_id, image_id, image_url, image_thumb_url, image_tag_id,
sort_order, keywords_id, keywords_name, department_id, department_name,
image_source, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())
"""
cursor.execute(insert_sql, (
article_id,
image_data['image_id'],
image_data['image_url'],
image_data['image_thumb_url'],
image_data['id'], # image_tag_id
new_sort_order,
image_data['keywords_id'],
image_data['keywords_name'],
image_data['department_id'],
image_data['department_name'],
1 # image_source: 1表示tag匹配
))
# 更新图片附加文章计数
update_sql = """
UPDATE ai_image_tags
SET image_attached_article_count = image_attached_article_count + 1
WHERE id = %s
"""
cursor.execute(update_sql, (image_data['id'],))
connection.commit()
logger.info(f"成功插入文章图片关联 - 文章ID: {article_id}, 图片ID: {image_data['image_id']}, 分数: {match_score}")
return True
finally:
connection.close()
except Exception as e:
error_msg = f"插入文章图片关联异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return False
def generate_image_with_gemini(self, prompt: str, article_tags: List[str], article_id: int) -> Optional[str]:
"""
使用Gemini生成图片并上传到服务器
Args:
prompt: 图片生成提示词
article_tags: 文章标签列表用于查询department和keywords
article_id: 文章ID用于关联图片
Returns:
上传后的图片URL失败返回None
"""
try:
# 导入必要的库
from google import genai
from google.genai.client import HttpOptions
client = genai.Client(
http_options=HttpOptions(base_url="https://work.poloapi.com"),
api_key="sk-V4tPnDgzFPa7nxWrvKnNJsW8ZcBXXPuGmjfgvPVRnwpHoeob"
)
logger.info(f"正在调用Gemini API生成图片提示词: {prompt[:50]}...")
# 生成内容
response = client.models.generate_content(
model="gemini-3-pro-image-preview",
contents=[prompt],
)
# 检查是否有候选答案
if not response.candidates:
raise Exception("Gemini API未返回任何候选答案")
# 处理响应
candidate = response.candidates[0]
if not candidate.content or not candidate.content.parts:
raise Exception("Gemini API返回的候选答案中没有内容部分")
for part in candidate.content.parts:
if hasattr(part, 'inline_data') and part.inline_data is not None:
image_data = part.inline_data
if image_data.data is not None:
# 生成唯一的文件名(基于时间戳)
timestamp_ms = int(time.time() * 1000)
image_filename = f"{timestamp_ms}.png"
today_date = datetime.now().strftime("%Y%m%d")
image_url_path = f"{today_date}/{image_filename}"
temp_filename = f"temp_generated_image_{timestamp_ms}.png"
# 保存图片数据到临时文件
with open(temp_filename, 'wb') as f:
f.write(image_data.data)
logger.info(f"Gemini生成图片成功: {temp_filename}")
# 先将图片信息插入数据库
image_info = self.insert_generated_image_to_db(image_filename, image_url_path, article_tags)
if not image_info:
raise Exception("插入图片信息到数据库失败")
logger.info(f"图片信息已插入数据库tag_image_id: {image_info['tag_image_id']}, image_id: {image_info['image_id']}")
# 使用tag_image_id上传图片到服务器
uploaded_url = self.upload_image_to_server(temp_filename, image_info['tag_image_id'])
# 将文章与图片的关联信息插入ai_article_images表
article_image_id = self.insert_article_image_relation_for_generated(
article_id=article_id,
image_id=image_info['image_id'],
image_url=image_info['image_url'],
image_thumb_url=image_info['image_thumb_url'],
tag_image_id=image_info['tag_image_id'],
keywords_id=image_info['keywords_id'],
keywords_name=image_info['keywords_name'],
department_id=image_info['department_id'],
department_name=image_info['department_name'],
image_source=0 # 默认值
)
if article_image_id:
logger.info(f"文章图片关联信息已创建ai_article_images.id: {article_image_id}")
# 删除临时文件
os.remove(temp_filename)
logger.info(f"图片已上传到服务器: {uploaded_url}")
return uploaded_url
raise Exception("Gemini API未返回有效的图片数据")
except ImportError:
logger.error("错误未安装google-genai库请运行 'pip install google-genai' 进行安装")
return None
except Exception as e:
error_msg = f"Gemini生成图片异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return None
def insert_generated_image_to_db(self, image_name: str, image_url: str, article_tags: List[str]) -> Optional[Dict]:
"""
将Gemini生成的图片信息插入数据库
Args:
image_name: 图片文件名
image_url: 图片URL路径
article_tags: 文章标签列表
Returns:
包含插入信息的字典
"""
try:
connection = self.db_manager.get_connection()
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
# 根据文章标签查询ai_image_tags表
if article_tags:
query = """
SELECT department_name, keywords_name, department_id, keywords_id, tag_id
FROM ai_image_tags
WHERE tag_name = %s
LIMIT 1
"""
cursor.execute(query, (article_tags[0],))
tag_info = cursor.fetchone()
if tag_info:
department = tag_info['department_name']
keywords = tag_info['keywords_name']
department_id = tag_info['department_id']
keywords_id = tag_info['keywords_id']
tag_id = tag_info['tag_id']
tag_name = article_tags[0]
else:
department = "AI生成"
keywords = "AI图片"
department_id = 1
keywords_id = 1
tag_id = 1
tag_name = article_tags[0] if article_tags else "AI生成"
else:
department = "AI生成"
keywords = "AI图片"
department_id = 1
keywords_id = 1
tag_id = 1
tag_name = "AI生成"
# 插入ai_images表
insert_image_query = """
INSERT INTO ai_images
(image_name, image_url, image_thumb_url, department, keywords, image_type, upload_user_id, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_image_query, (
image_name, image_url, '', department, keywords, 'medical', 1, 'active'
))
image_id = cursor.lastrowid
logger.info(f"图片信息已插入ai_images表image_id: {image_id}")
# 插入ai_image_tags表
insert_tag_query = """
INSERT INTO ai_image_tags
(image_id, image_name, image_url, image_thumb_url, tag_id, tag_name,
keywords_id, keywords_name, department_id, department_name,
image_source, created_user_id, image_attached_article_count)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_tag_query, (
image_id, image_name, image_url, '', tag_id, tag_name,
keywords_id, keywords, department_id, department,
3, 1, 0 # image_source: 3表示AI生成
))
tag_image_id = cursor.lastrowid
logger.info(f"图片标签信息已插入ai_image_tags表tag_image_id: {tag_image_id}")
connection.commit()
return {
'tag_image_id': tag_image_id,
'image_id': image_id,
'image_url': image_url,
'image_thumb_url': '',
'keywords_id': keywords_id,
'keywords_name': keywords,
'department_id': department_id,
'department_name': department
}
finally:
connection.close()
except Exception as e:
logger.error(f"插入图片信息到数据库失败: {e}")
return None
def upload_image_to_server(self, image_path: str, tag_image_id: int) -> str:
"""
上传图片到服务器
Args:
image_path: 本地图片路径
tag_image_id: 图片标签ID
Returns:
服务器上的图片URL
"""
base_url = "http://47.99.184.230:8324"
jwt_token = self.login_and_get_jwt_token(base_url)
if not jwt_token:
raise Exception("获取JWT token失败无法上传图片")
upload_url = f"{base_url}/api/images/upload"
headers = {'Authorization': f'Bearer {jwt_token}'}
with open(image_path, 'rb') as image_file:
files = {'file': image_file}
data = {'tag_image_id': tag_image_id}
response = requests.post(upload_url, headers=headers, files=files, data=data)
logger.info(f"图片上传响应状态码: {response.status_code}")
if response.status_code == 200:
result = response.json()
if result.get('code') == 200:
return result['data']['http_image_url']
else:
raise Exception(f"图片上传失败: {result.get('message', '未知错误')}")
else:
raise Exception(f"图片上传请求失败,状态码: {response.status_code}")
def login_and_get_jwt_token(self, base_url: str) -> Optional[str]:
"""登录获取JWT token"""
login_url = f"{base_url}/api/auth/login"
login_data = {"username": "user010", "password": "@5^2W6R7"}
try:
response = requests.post(login_url, json=login_data, headers={'Content-Type': 'application/json'})
if response.status_code == 200:
result = response.json()
if result.get('code') == 200:
logger.info("JWT token获取成功")
return result['data']['token']
logger.error(f"登录失败: {response.status_code}")
return None
except Exception as e:
logger.error(f"登录异常: {e}")
return None
def insert_article_image_relation_for_generated(self, article_id: int, image_id: int, image_url: str,
image_thumb_url: str, tag_image_id: int, keywords_id: int,
keywords_name: str, department_id: int, department_name: str,
image_source: int = 0) -> Optional[int]:
"""
将文章与生成图片的关联信息插入ai_article_images表
"""
try:
connection = self.db_manager.get_connection()
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
# 查询当前文章下已有图片的最大sort_order
query_max_sort = """
SELECT COALESCE(MAX(sort_order), 0) as max_sort_order
FROM ai_article_images
WHERE article_id = %s
"""
cursor.execute(query_max_sort, (article_id,))
result = cursor.fetchone()
max_sort_order = result['max_sort_order'] if result else 0
new_sort_order = max_sort_order + 1
# 插入ai_article_images表
insert_query = """
INSERT INTO ai_article_images
(article_id, image_id, image_url, image_thumb_url, image_tag_id, sort_order,
keywords_id, keywords_name, department_id, department_name, image_source)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_query, (
article_id, image_id, image_url, image_thumb_url, tag_image_id, new_sort_order,
keywords_id, keywords_name, department_id, department_name, image_source
))
article_image_id = cursor.lastrowid
logger.info(f"文章图片关联信息已插入ai_article_images表id: {article_image_id}")
connection.commit()
return article_image_id
finally:
connection.close()
except Exception as e:
logger.error(f"插入文章图片关联信息失败: {e}")
return None
def match_article_with_images(self, article_data: Dict, available_images: List[Dict]) -> bool:
"""
为单篇文章匹配图片未成功匹配时调用Gemini生图
Args:
article_data: 文章数据
available_images: 可用图片列表
Returns:
是否匹配成功
"""
article_id = article_data['article_id']
article_title = article_data.get('title', '')
article_content = article_data.get('content', '')
coze_tag = article_data.get('coze_tag', '')
try:
# 解析文章标签
article_tags = self.parse_article_tags(coze_tag)
if not article_tags:
logger.warning(f"文章 {article_id} 没有有效标签,跳过")
return False
logger.info(f"开始为文章 {article_id} 匹配图片 - 标题: {article_title}, 标签: {article_tags}")
best_match = None
best_score = 0.0
# 遍历可用图片,找到最佳匹配
for image_data in available_images:
image_tags = [image_data['tag_name']]
image_keywords = image_data.get('keywords_name', '')
# 调用通义千问评估匹配度
is_match, score = self.call_qwen_for_matching(
article_title, article_tags, image_tags, image_keywords
)
# 记录评估结果
logger.info(f"文章 {article_id} vs 图片 {image_data['image_id']} - 匹配: {is_match}, 分数: {score}")
# 更新最佳匹配
if is_match and score > best_score and score >= MATCH_THRESHOLD:
best_score = score
best_match = image_data
# 如果找到匹配的图片,插入关联记录
if best_match:
if self.insert_article_image_relation(article_id, best_match, best_score):
logger.info(f"文章 {article_id} 成功匹配图片 {best_match['image_id']}, 分数: {best_score}")
self.log_to_database('INFO', f"文章匹配成功",
f"文章ID: {article_id}, 图片ID: {best_match['image_id']}, 分数: {best_score}")
return True
else:
return False
else:
# 未找到合适的匹配图片使用Gemini生成图片
logger.info(f"文章 {article_id} 未找到合适的匹配图片调用Gemini生成图片")
self.log_to_database('WARNING', f"文章未找到匹配图片,尝试生成图片", f"文章ID: {article_id}")
# 构建生成提示词
prompt = f"'{article_title}'相关的插图,标签: {', '.join(article_tags)}"
generated_image_url = self.generate_image_with_gemini(prompt, article_tags, article_id)
if generated_image_url:
logger.info(f"文章 {article_id} 成功生成图片: {generated_image_url}")
self.log_to_database('INFO', f"文章生成图片成功",
f"文章ID: {article_id}, 图片URL: {generated_image_url}")
return True
else:
logger.error(f"文章 {article_id} 生成图片失败")
self.log_to_database('ERROR', f"文章生成图片失败", f"文章ID: {article_id}")
return False
except Exception as e:
error_msg = f"匹配文章图片异常 - 文章ID: {article_id}, 错误: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return False
def get_next_available_article(self, pending_articles: List[Dict]) -> Optional[Dict]:
"""线程安全地获取下一篇待处理文章"""
with self.processing_lock:
for article_data in pending_articles:
article_id = article_data['article_id']
if article_id not in self.processed_articles:
self.processed_articles.add(article_id)
return article_data
return None
def worker_process_articles(self, pending_articles: List[Dict],
available_images: List[Dict], worker_id: int) -> int:
"""Worker线程处理文章匹配"""
processed_count = 0
thread_name = f"Worker-{worker_id}"
threading.current_thread().name = thread_name
logger.info(f"[{thread_name}] 启动,准备处理文章匹配")
while True:
# 线程安全地获取下一篇待处理文章
article_data = self.get_next_available_article(pending_articles)
if not article_data:
logger.info(f"[{thread_name}] 没有更多待处理文章,退出")
break
# 匹配文章与图片
if self.match_article_with_images(article_data, available_images):
processed_count += 1
logger.info(f"[{thread_name}] 成功处理文章: {article_data['article_id']}")
else:
logger.warning(f"[{thread_name}] 文章处理失败: {article_data['article_id']}")
logger.info(f"[{thread_name}] 完成,共处理 {processed_count} 篇文章")
return processed_count
def run_matching(self):
"""运行文章图片匹配流程"""
logger.info("开始文章图片智能匹配...")
self.log_to_database('INFO', '启动文章图片智能匹配服务', f'worker数量: {WORKER_COUNT}')
try:
# 获取需要匹配的文章
pending_articles = self.get_articles_with_tags()
if not pending_articles:
logger.info("没有需要匹配的文章")
return
# 获取可用图片
available_images = self.get_available_images_with_tags()
if not available_images:
logger.warning("没有可用图片,无法进行匹配")
self.log_to_database('WARNING', '没有可用图片')
return
logger.info(f"开始匹配 {len(pending_articles)} 篇文章与 {len(available_images)} 张图片")
self.log_to_database('INFO', '开始批量匹配',
f'文章数: {len(pending_articles)}, 图片数: {len(available_images)}')
# 清空已处理记录集合
with self.processing_lock:
self.processed_articles.clear()
# 使用线程池并行处理
with ThreadPoolExecutor(max_workers=WORKER_COUNT, thread_name_prefix="MatchWorker") as executor:
# 提交worker任务
future_to_worker = {}
for worker_id in range(1, WORKER_COUNT + 1):
future = executor.submit(
self.worker_process_articles,
pending_articles,
available_images,
worker_id
)
future_to_worker[future] = worker_id
# 等待所有worker完成
total_processed = 0
for future in as_completed(future_to_worker):
worker_id = future_to_worker[future]
try:
processed_count = future.result()
total_processed += processed_count
logger.info(f"Worker-{worker_id} 完成,处理了 {processed_count} 篇文章")
except Exception as e:
logger.error(f"Worker-{worker_id} 执行异常: {e}")
self.log_to_database('ERROR', f'Worker-{worker_id} 执行异常', str(e))
logger.info(f"匹配完成,共处理 {total_processed} 篇文章")
self.log_to_database('INFO', '匹配任务完成', f'共处理 {total_processed} 篇文章')
except Exception as e:
error_msg = f"匹配流程异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
def main():
"""主函数"""
matcher = ArticleImageMatcher()
try:
# 运行匹配
matcher.run_matching()
except Exception as e:
logger.error(f"程序运行异常: {e}")
matcher.log_to_database('ERROR', f'程序运行异常: {e}', traceback.format_exc())
if __name__ == "__main__":
main()