961 lines
42 KiB
Python
961 lines
42 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
文章图片智能匹配脚本
|
||
基于通义千问大模型,智能匹配文章与图片,将结果存储到ai_article_images表
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import time
|
||
import json
|
||
import logging
|
||
import requests
|
||
import pymysql
|
||
import traceback
|
||
import threading
|
||
from datetime import datetime
|
||
from typing import Dict, List, Optional, Tuple
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
|
||
# 添加项目根目录到Python路径
|
||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
from database_config import db_manager
|
||
from log_config import setup_logger
|
||
|
||
# 配置日志记录器
|
||
logger = setup_logger(
|
||
name='article_image_matching',
|
||
log_file='logs/article_image_matching.log',
|
||
error_log_file='logs/article_image_matching_error.log',
|
||
level=logging.INFO,
|
||
console_output=True
|
||
)
|
||
|
||
# 配置常量
|
||
QWEN_API_KEY = "sk-e6a38204022a4b538b8954f0584712af"
|
||
QWEN_API_URL = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"
|
||
WORKER_COUNT = 4 # 并行处理worker数量
|
||
BATCH_SIZE = 50 # 每批处理的文章数量
|
||
MATCH_THRESHOLD = 0.6 # 匹配分数阈值(0-1)
|
||
|
||
class ArticleImageMatcher:
|
||
def __init__(self):
|
||
# 使用统一的数据库管理器
|
||
self.db_manager = db_manager
|
||
|
||
# 并行处理相关
|
||
self.processing_lock = threading.Lock()
|
||
self.processed_articles = set()
|
||
|
||
logger.info("ArticleImageMatcher 初始化完成")
|
||
self.log_to_database('INFO', 'ArticleImageMatcher 初始化完成')
|
||
|
||
def log_to_database(self, level: str, message: str, details: Optional[str] = None):
|
||
"""记录日志到数据库ai_logs表"""
|
||
try:
|
||
connection = self.db_manager.get_connection()
|
||
try:
|
||
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
|
||
status_map = {
|
||
'INFO': 'success',
|
||
'WARNING': 'warning',
|
||
'ERROR': 'error'
|
||
}
|
||
status = status_map.get(level, 'success')
|
||
|
||
sql = """
|
||
INSERT INTO ai_logs (user_id, action, description, status, error_message, created_at)
|
||
VALUES (%s, %s, %s, %s, %s, NOW())
|
||
"""
|
||
cursor.execute(sql, (None, 'article_image_matching', message, status, details))
|
||
connection.commit()
|
||
logger.info(f"日志已记录到数据库: {level} - {message}")
|
||
finally:
|
||
connection.close()
|
||
except Exception as e:
|
||
logger.error(f"记录日志到数据库失败: {e}")
|
||
|
||
def get_articles_with_tags(self, limit: int = BATCH_SIZE) -> List[Dict]:
|
||
"""
|
||
从ai_articles表获取需要匹配图片的文章
|
||
|
||
Returns:
|
||
包含文章ID、标签等信息的列表
|
||
"""
|
||
try:
|
||
connection = self.db_manager.get_connection()
|
||
try:
|
||
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
|
||
# 查询有标签但未匹配图片的文章
|
||
sql = """
|
||
SELECT
|
||
a.id as article_id,
|
||
a.title,
|
||
a.content,
|
||
a.coze_tag,
|
||
a.department
|
||
FROM ai_articles a
|
||
WHERE NOT EXISTS (
|
||
SELECT 1 FROM ai_article_images ai
|
||
WHERE ai.article_id = a.id
|
||
)
|
||
AND a.status = 'pending_review'
|
||
AND a.review_user_id = 152
|
||
ORDER BY a.id DESC
|
||
LIMIT %s
|
||
"""
|
||
cursor.execute(sql, (limit,))
|
||
results = cursor.fetchall()
|
||
|
||
if results:
|
||
logger.info(f"查询到 {len(results)} 篇需要匹配图片的文章")
|
||
self.log_to_database('INFO', f"查询到需要匹配图片的文章", f"数量: {len(results)}")
|
||
else:
|
||
logger.info("未查询到需要匹配图片的文章")
|
||
|
||
return results
|
||
finally:
|
||
connection.close()
|
||
except Exception as e:
|
||
error_msg = f"查询文章标签异常: {e}"
|
||
logger.error(error_msg)
|
||
self.log_to_database('ERROR', error_msg, traceback.format_exc())
|
||
return []
|
||
|
||
def get_available_images_with_tags(self) -> List[Dict]:
|
||
"""
|
||
从ai_image_tags表获取可用的图片及其标签(状态为generate且挂载次数<5)
|
||
|
||
Returns:
|
||
包含图片ID、标签等信息的列表
|
||
"""
|
||
try:
|
||
connection = self.db_manager.get_connection()
|
||
try:
|
||
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
|
||
# 查询状态为generate且附加文章数量小于5的图片(不使用JOIN)
|
||
# 包含image_source字段用于区分实拍图和模板图
|
||
sql = """
|
||
SELECT
|
||
it.id,
|
||
it.image_id,
|
||
it.image_name,
|
||
it.image_url,
|
||
it.image_thumb_url,
|
||
it.tag_id,
|
||
it.tag_name,
|
||
it.keywords_id,
|
||
it.keywords_name,
|
||
it.department_id,
|
||
it.department_name,
|
||
it.image_attached_article_count,
|
||
it.image_source
|
||
FROM ai_image_tags it
|
||
WHERE it.image_attached_article_count < 5
|
||
AND EXISTS (
|
||
SELECT 1 FROM ai_images i
|
||
WHERE i.id = it.image_id
|
||
AND i.status = 'generate'
|
||
)
|
||
ORDER BY it.image_attached_article_count ASC, it.id DESC
|
||
"""
|
||
cursor.execute(sql)
|
||
cursor.execute(sql)
|
||
results = cursor.fetchall()
|
||
|
||
if results:
|
||
logger.info(f"查询到 {len(results)} 张可用图片")
|
||
self.log_to_database('INFO', f"查询到可用图片", f"数量: {len(results)}")
|
||
else:
|
||
logger.info("未查询到可用图片")
|
||
|
||
return results
|
||
finally:
|
||
connection.close()
|
||
except Exception as e:
|
||
error_msg = f"查询图片标签异常: {e}"
|
||
logger.error(error_msg)
|
||
self.log_to_database('ERROR', error_msg, traceback.format_exc())
|
||
return []
|
||
|
||
def parse_article_tags(self, coze_tag: str) -> List[str]:
|
||
"""
|
||
解析文章标签(支持JSON格式和逗号分隔格式)
|
||
|
||
Args:
|
||
coze_tag: 标签字符串
|
||
|
||
Returns:
|
||
标签列表
|
||
"""
|
||
try:
|
||
if not coze_tag:
|
||
return []
|
||
|
||
# 尝试解析JSON格式
|
||
try:
|
||
tags_data = json.loads(coze_tag)
|
||
if isinstance(tags_data, list):
|
||
return tags_data
|
||
elif isinstance(tags_data, dict):
|
||
return list(tags_data.values())
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
# 按逗号分隔
|
||
tags = [tag.strip() for tag in str(coze_tag).split(',') if tag.strip()]
|
||
return tags
|
||
except Exception as e:
|
||
logger.error(f"解析文章标签异常: {e}")
|
||
return []
|
||
|
||
def call_qwen_for_matching(self, article_title: str, article_tags: List[str],
|
||
image_tags: List[str], image_keywords: str) -> Tuple[bool, float]:
|
||
"""
|
||
调用通义千问API评估文章与图片的匹配度
|
||
|
||
Args:
|
||
article_title: 文章标题
|
||
article_tags: 文章标签列表
|
||
image_tags: 图片标签列表
|
||
image_keywords: 图片关键词
|
||
|
||
Returns:
|
||
(是否匹配, 匹配分数)
|
||
"""
|
||
try:
|
||
# 构建提示词
|
||
prompt = f"""请评估以下文章与图片的匹配度:
|
||
|
||
文章标题:{article_title}
|
||
文章标签:{', '.join(article_tags)}
|
||
|
||
图片标签:{', '.join(image_tags)}
|
||
图片关键词:{image_keywords}
|
||
|
||
请根据标签的语义相关性,给出0-1之间的匹配分数,并说明是否适合匹配。
|
||
输出格式:{{"match": true/false, "score": 0.0-1.0, "reason": "匹配原因"}}
|
||
只输出JSON格式,不要其他内容。"""
|
||
|
||
headers = {
|
||
'Authorization': f'Bearer {QWEN_API_KEY}',
|
||
'Content-Type': 'application/json'
|
||
}
|
||
|
||
payload = {
|
||
"model": "qwen-max",
|
||
"input": {
|
||
"messages": [
|
||
{
|
||
"role": "user",
|
||
"content": prompt
|
||
}
|
||
]
|
||
},
|
||
"parameters": {
|
||
"result_format": "message"
|
||
}
|
||
}
|
||
|
||
response = requests.post(
|
||
QWEN_API_URL,
|
||
json=payload,
|
||
headers=headers,
|
||
timeout=30
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get('output') and result['output'].get('choices'):
|
||
message = result['output']['choices'][0].get('message', {})
|
||
content = message.get('content', '')
|
||
|
||
# 解析JSON响应
|
||
try:
|
||
match_result = json.loads(content)
|
||
is_match = match_result.get('match', False)
|
||
score = match_result.get('score', 0.0)
|
||
reason = match_result.get('reason', '')
|
||
|
||
logger.info(f"通义千问评估结果 - 匹配: {is_match}, 分数: {score}, 原因: {reason}")
|
||
return is_match, score
|
||
except json.JSONDecodeError:
|
||
logger.error(f"解析通义千问响应失败: {content}")
|
||
return False, 0.0
|
||
else:
|
||
logger.error(f"通义千问API返回格式异常: {result}")
|
||
return False, 0.0
|
||
else:
|
||
logger.error(f"通义千问API请求失败,状态码: {response.status_code}")
|
||
return False, 0.0
|
||
|
||
except requests.exceptions.Timeout:
|
||
logger.error("通义千问API请求超时")
|
||
return False, 0.0
|
||
except Exception as e:
|
||
error_msg = f"调用通义千问API异常: {e}"
|
||
logger.error(error_msg)
|
||
self.log_to_database('ERROR', error_msg, traceback.format_exc())
|
||
return False, 0.0
|
||
|
||
def update_article_status(self, article_id: int, new_status: str) -> bool:
|
||
"""
|
||
更新文章状态
|
||
|
||
Args:
|
||
article_id: 文章ID
|
||
new_status: 新状态
|
||
|
||
Returns:
|
||
是否更新成功
|
||
"""
|
||
try:
|
||
connection = self.db_manager.get_connection()
|
||
try:
|
||
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
|
||
# 更新文章状态
|
||
update_sql = """
|
||
UPDATE ai_articles
|
||
SET status = %s
|
||
WHERE id = %s
|
||
"""
|
||
cursor.execute(update_sql, (new_status, article_id))
|
||
|
||
connection.commit()
|
||
logger.info(f"成功更新文章 {article_id} 状态为 {new_status}")
|
||
return True
|
||
finally:
|
||
connection.close()
|
||
except Exception as e:
|
||
error_msg = f"更新文章状态异常: {e}"
|
||
logger.error(error_msg)
|
||
self.log_to_database('ERROR', error_msg, traceback.format_exc())
|
||
return False
|
||
|
||
def insert_article_image_relation(self, article_id: int, image_data: Dict,
|
||
match_score: float) -> bool:
|
||
"""
|
||
将匹配结果插入ai_article_images表
|
||
|
||
Args:
|
||
article_id: 文章ID
|
||
image_data: 图片数据
|
||
match_score: 匹配分数
|
||
|
||
Returns:
|
||
是否插入成功
|
||
"""
|
||
try:
|
||
connection = self.db_manager.get_connection()
|
||
try:
|
||
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
|
||
# 查询当前文章下已有图片的最大sort_order
|
||
query_max_sort = """
|
||
SELECT COALESCE(MAX(sort_order), 0) as max_sort_order
|
||
FROM ai_article_images
|
||
WHERE article_id = %s
|
||
"""
|
||
cursor.execute(query_max_sort, (article_id,))
|
||
result = cursor.fetchone()
|
||
max_sort_order = result.get('max_sort_order', 0)
|
||
new_sort_order = max_sort_order + 1
|
||
|
||
# 插入关联记录
|
||
insert_sql = """
|
||
INSERT INTO ai_article_images
|
||
(article_id, image_id, image_url, image_thumb_url, image_tag_id,
|
||
sort_order, keywords_id, keywords_name, department_id, department_name,
|
||
image_source, created_at)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())
|
||
"""
|
||
cursor.execute(insert_sql, (
|
||
article_id,
|
||
image_data['image_id'],
|
||
image_data['image_url'],
|
||
image_data['image_thumb_url'],
|
||
image_data['id'], # image_tag_id
|
||
new_sort_order,
|
||
image_data['keywords_id'],
|
||
image_data['keywords_name'],
|
||
image_data['department_id'],
|
||
image_data['department_name'],
|
||
1 # image_source: 1表示tag匹配
|
||
))
|
||
|
||
# 更新图片附加文章计数
|
||
update_sql = """
|
||
UPDATE ai_image_tags
|
||
SET image_attached_article_count = image_attached_article_count + 1
|
||
WHERE id = %s
|
||
"""
|
||
cursor.execute(update_sql, (image_data['id'],))
|
||
|
||
# 更新图片状态为published
|
||
update_image_status_sql = """
|
||
UPDATE ai_images
|
||
SET status = 'published'
|
||
WHERE id = %s
|
||
"""
|
||
cursor.execute(update_image_status_sql, (image_data['image_id'],))
|
||
|
||
# 更新文章状态为published_review
|
||
self.update_article_status(article_id, 'published_review')
|
||
|
||
connection.commit()
|
||
logger.info(f"成功插入文章图片关联 - 文章ID: {article_id}, 图片ID: {image_data['image_id']}, 分数: {match_score}")
|
||
return True
|
||
finally:
|
||
connection.close()
|
||
|
||
except Exception as e:
|
||
error_msg = f"插入文章图片关联异常: {e}"
|
||
logger.error(error_msg)
|
||
self.log_to_database('ERROR', error_msg, traceback.format_exc())
|
||
return False
|
||
|
||
def generate_image_with_gemini(self, prompt: str, article_tags: List[str], article_id: int) -> Optional[str]:
|
||
"""
|
||
使用Gemini生成图片并上传到服务器
|
||
|
||
Args:
|
||
prompt: 图片生成提示词
|
||
article_tags: 文章标签列表,用于查询department和keywords
|
||
article_id: 文章ID,用于关联图片
|
||
|
||
Returns:
|
||
上传后的图片URL,失败返回None
|
||
"""
|
||
try:
|
||
# 导入必要的库
|
||
from google import genai
|
||
from google.genai.client import HttpOptions
|
||
|
||
client = genai.Client(
|
||
http_options=HttpOptions(base_url="https://work.poloapi.com"),
|
||
api_key="sk-V4tPnDgzFPa7nxWrvKnNJsW8ZcBXXPuGmjfgvPVRnwpHoeob"
|
||
)
|
||
|
||
logger.info(f"正在调用Gemini API生成图片,提示词: {prompt[:50]}...")
|
||
|
||
# 生成内容
|
||
response = client.models.generate_content(
|
||
model="gemini-3-pro-image-preview",
|
||
contents=[prompt],
|
||
)
|
||
|
||
# 检查是否有候选答案
|
||
if not response.candidates:
|
||
raise Exception("Gemini API未返回任何候选答案")
|
||
|
||
# 处理响应
|
||
candidate = response.candidates[0]
|
||
if not candidate.content or not candidate.content.parts:
|
||
raise Exception("Gemini API返回的候选答案中没有内容部分")
|
||
|
||
for part in candidate.content.parts:
|
||
if hasattr(part, 'inline_data') and part.inline_data is not None:
|
||
image_data = part.inline_data
|
||
if image_data.data is not None:
|
||
# 生成唯一的文件名(基于时间戳)
|
||
timestamp_ms = int(time.time() * 1000)
|
||
image_filename = f"{timestamp_ms}.png"
|
||
today_date = datetime.now().strftime("%Y%m%d")
|
||
image_url_path = f"{today_date}/{image_filename}"
|
||
|
||
temp_filename = f"temp_generated_image_{timestamp_ms}.png"
|
||
# 保存图片数据到临时文件
|
||
with open(temp_filename, 'wb') as f:
|
||
f.write(image_data.data)
|
||
logger.info(f"Gemini生成图片成功: {temp_filename}")
|
||
|
||
# 先将图片信息插入数据库
|
||
image_info = self.insert_generated_image_to_db(image_filename, image_url_path, article_tags)
|
||
|
||
if not image_info:
|
||
raise Exception("插入图片信息到数据库失败")
|
||
|
||
logger.info(f"图片信息已插入数据库,tag_image_id: {image_info['tag_image_id']}, image_id: {image_info['image_id']}")
|
||
|
||
# 使用tag_image_id上传图片到服务器
|
||
uploaded_url = self.upload_image_to_server(temp_filename, image_info['tag_image_id'])
|
||
|
||
# 将文章与图片的关联信息插入ai_article_images表
|
||
article_image_id = self.insert_article_image_relation_for_generated(
|
||
article_id=article_id,
|
||
image_id=image_info['image_id'],
|
||
image_url=image_info['image_url'],
|
||
image_thumb_url=image_info['image_thumb_url'],
|
||
tag_image_id=image_info['tag_image_id'],
|
||
keywords_id=image_info['keywords_id'],
|
||
keywords_name=image_info['keywords_name'],
|
||
department_id=image_info['department_id'],
|
||
department_name=image_info['department_name'],
|
||
image_source=0 # 默认值
|
||
)
|
||
|
||
if article_image_id:
|
||
logger.info(f"文章图片关联信息已创建,ai_article_images.id: {article_image_id}")
|
||
|
||
# 删除临时文件
|
||
os.remove(temp_filename)
|
||
|
||
logger.info(f"图片已上传到服务器: {uploaded_url}")
|
||
return uploaded_url
|
||
|
||
raise Exception("Gemini API未返回有效的图片数据")
|
||
|
||
except ImportError:
|
||
logger.error("错误:未安装google-genai库,请运行 'pip install google-genai' 进行安装")
|
||
return None
|
||
except Exception as e:
|
||
error_msg = f"Gemini生成图片异常: {e}"
|
||
logger.error(error_msg)
|
||
self.log_to_database('ERROR', error_msg, traceback.format_exc())
|
||
return None
|
||
|
||
def insert_generated_image_to_db(self, image_name: str, image_url: str, article_tags: List[str]) -> Optional[Dict]:
|
||
"""
|
||
将Gemini生成的图片信息插入数据库
|
||
|
||
Args:
|
||
image_name: 图片文件名
|
||
image_url: 图片URL路径
|
||
article_tags: 文章标签列表
|
||
|
||
Returns:
|
||
包含插入信息的字典
|
||
"""
|
||
try:
|
||
connection = self.db_manager.get_connection()
|
||
try:
|
||
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
|
||
# 根据文章标签查询ai_image_tags表
|
||
if article_tags:
|
||
query = """
|
||
SELECT department_name, keywords_name, department_id, keywords_id, tag_id
|
||
FROM ai_image_tags
|
||
WHERE tag_name = %s
|
||
LIMIT 1
|
||
"""
|
||
cursor.execute(query, (article_tags[0],))
|
||
tag_info = cursor.fetchone()
|
||
|
||
if tag_info:
|
||
department = tag_info['department_name']
|
||
keywords = tag_info['keywords_name']
|
||
department_id = tag_info['department_id']
|
||
keywords_id = tag_info['keywords_id']
|
||
tag_id = tag_info['tag_id']
|
||
tag_name = article_tags[0]
|
||
else:
|
||
department = "AI生成"
|
||
keywords = "AI图片"
|
||
department_id = 1
|
||
keywords_id = 1
|
||
tag_id = 1
|
||
tag_name = article_tags[0] if article_tags else "AI生成"
|
||
else:
|
||
department = "AI生成"
|
||
keywords = "AI图片"
|
||
department_id = 1
|
||
keywords_id = 1
|
||
tag_id = 1
|
||
tag_name = "AI生成"
|
||
|
||
# 插入ai_images表
|
||
insert_image_query = """
|
||
INSERT INTO ai_images
|
||
(image_name, image_url, image_thumb_url, department, keywords, image_type, upload_user_id, status)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
||
"""
|
||
cursor.execute(insert_image_query, (
|
||
image_name, image_url, '', department, keywords, 'medical', 1, 'active'
|
||
))
|
||
image_id = cursor.lastrowid
|
||
logger.info(f"图片信息已插入ai_images表,image_id: {image_id}")
|
||
|
||
# 插入ai_image_tags表
|
||
insert_tag_query = """
|
||
INSERT INTO ai_image_tags
|
||
(image_id, image_name, image_url, image_thumb_url, tag_id, tag_name,
|
||
keywords_id, keywords_name, department_id, department_name,
|
||
image_source, created_user_id, image_attached_article_count)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
"""
|
||
cursor.execute(insert_tag_query, (
|
||
image_id, image_name, image_url, '', tag_id, tag_name,
|
||
keywords_id, keywords, department_id, department,
|
||
3, 1, 0 # image_source: 3表示AI生成
|
||
))
|
||
tag_image_id = cursor.lastrowid
|
||
logger.info(f"图片标签信息已插入ai_image_tags表,tag_image_id: {tag_image_id}")
|
||
|
||
connection.commit()
|
||
|
||
return {
|
||
'tag_image_id': tag_image_id,
|
||
'image_id': image_id,
|
||
'image_url': image_url,
|
||
'image_thumb_url': '',
|
||
'keywords_id': keywords_id,
|
||
'keywords_name': keywords,
|
||
'department_id': department_id,
|
||
'department_name': department
|
||
}
|
||
finally:
|
||
connection.close()
|
||
except Exception as e:
|
||
logger.error(f"插入图片信息到数据库失败: {e}")
|
||
return None
|
||
|
||
def upload_image_to_server(self, image_path: str, tag_image_id: int) -> str:
|
||
"""
|
||
上传图片到服务器
|
||
|
||
Args:
|
||
image_path: 本地图片路径
|
||
tag_image_id: 图片标签ID
|
||
|
||
Returns:
|
||
服务器上的图片URL
|
||
"""
|
||
base_url = "http://47.99.184.230:8324"
|
||
jwt_token = self.login_and_get_jwt_token(base_url)
|
||
|
||
if not jwt_token:
|
||
raise Exception("获取JWT token失败,无法上传图片")
|
||
|
||
upload_url = f"{base_url}/api/images/upload"
|
||
headers = {'Authorization': f'Bearer {jwt_token}'}
|
||
|
||
with open(image_path, 'rb') as image_file:
|
||
files = {'file': image_file}
|
||
data = {'tag_image_id': tag_image_id}
|
||
|
||
response = requests.post(upload_url, headers=headers, files=files, data=data)
|
||
|
||
logger.info(f"图片上传响应状态码: {response.status_code}")
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get('code') == 200:
|
||
return result['data']['http_image_url']
|
||
else:
|
||
raise Exception(f"图片上传失败: {result.get('message', '未知错误')}")
|
||
else:
|
||
raise Exception(f"图片上传请求失败,状态码: {response.status_code}")
|
||
|
||
def login_and_get_jwt_token(self, base_url: str) -> Optional[str]:
|
||
"""登录获取JWT token"""
|
||
login_url = f"{base_url}/api/auth/login"
|
||
login_data = {"username": "user010", "password": "@5^2W6R7"}
|
||
|
||
try:
|
||
response = requests.post(login_url, json=login_data, headers={'Content-Type': 'application/json'})
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get('code') == 200:
|
||
logger.info("JWT token获取成功")
|
||
return result['data']['token']
|
||
|
||
logger.error(f"登录失败: {response.status_code}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"登录异常: {e}")
|
||
return None
|
||
|
||
def insert_article_image_relation_for_generated(self, article_id: int, image_id: int, image_url: str,
|
||
image_thumb_url: str, tag_image_id: int, keywords_id: int,
|
||
keywords_name: str, department_id: int, department_name: str,
|
||
image_source: int = 0) -> Optional[int]:
|
||
"""
|
||
将文章与生成图片的关联信息插入ai_article_images表
|
||
"""
|
||
try:
|
||
connection = self.db_manager.get_connection()
|
||
try:
|
||
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
|
||
# 查询当前文章下已有图片的最大sort_order
|
||
query_max_sort = """
|
||
SELECT COALESCE(MAX(sort_order), 0) as max_sort_order
|
||
FROM ai_article_images
|
||
WHERE article_id = %s
|
||
"""
|
||
cursor.execute(query_max_sort, (article_id,))
|
||
result = cursor.fetchone()
|
||
max_sort_order = result['max_sort_order'] if result else 0
|
||
new_sort_order = max_sort_order + 1
|
||
|
||
# 插入ai_article_images表
|
||
insert_query = """
|
||
INSERT INTO ai_article_images
|
||
(article_id, image_id, image_url, image_thumb_url, image_tag_id, sort_order,
|
||
keywords_id, keywords_name, department_id, department_name, image_source)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
"""
|
||
cursor.execute(insert_query, (
|
||
article_id, image_id, image_url, image_thumb_url, tag_image_id, new_sort_order,
|
||
keywords_id, keywords_name, department_id, department_name, image_source
|
||
))
|
||
article_image_id = cursor.lastrowid
|
||
logger.info(f"文章图片关联信息已插入ai_article_images表,id: {article_image_id}")
|
||
|
||
# 更新图片附加文章计数
|
||
update_count_sql = """
|
||
UPDATE ai_image_tags
|
||
SET image_attached_article_count = image_attached_article_count + 1
|
||
WHERE id = %s
|
||
"""
|
||
cursor.execute(update_count_sql, (tag_image_id,))
|
||
|
||
# 更新图片状态为published
|
||
update_image_status_sql = """
|
||
UPDATE ai_images
|
||
SET status = 'published'
|
||
WHERE id = %s
|
||
"""
|
||
cursor.execute(update_image_status_sql, (image_id,))
|
||
|
||
# 更新文章状态为published_review
|
||
self.update_article_status(article_id, 'published_review')
|
||
|
||
connection.commit()
|
||
return article_image_id
|
||
finally:
|
||
connection.close()
|
||
except Exception as e:
|
||
logger.error(f"插入文章图片关联信息失败: {e}")
|
||
return None
|
||
|
||
def match_article_with_images(self, article_data: Dict, available_images: List[Dict]) -> bool:
|
||
"""
|
||
为单篇文章匹配图片,未成功匹配时调用Gemini生图
|
||
|
||
Args:
|
||
article_data: 文章数据
|
||
available_images: 可用图片列表
|
||
|
||
Returns:
|
||
是否匹配成功
|
||
"""
|
||
article_id = article_data['article_id']
|
||
article_title = article_data.get('title', '')
|
||
article_content = article_data.get('content', '')
|
||
coze_tag = article_data.get('coze_tag', '')
|
||
article_department = article_data.get('department', '')
|
||
|
||
try:
|
||
# 解析文章标签
|
||
if not coze_tag:
|
||
logger.warning(f"文章 {article_id} 没有标签信息,跳过")
|
||
return False
|
||
|
||
article_tags = self.parse_article_tags(coze_tag)
|
||
if not article_tags:
|
||
logger.warning(f"文章 {article_id} 没有有效标签,跳过")
|
||
return False
|
||
|
||
logger.info(f"开始为文章 {article_id} 匹配图片 - 标题: {article_title}, 标签: {article_tags}, 科室: {article_department}")
|
||
|
||
# 根据文章科室ID缩小图片范围
|
||
department_filtered_images = []
|
||
article_dept_id = article_data.get('department_id', 0)
|
||
|
||
for img in available_images:
|
||
# 匹配科室ID相同的图片
|
||
if img.get('department_id', 0) == article_dept_id:
|
||
department_filtered_images.append(img)
|
||
|
||
# 如果没有匹配科室的图片,则使用所有图片
|
||
if not department_filtered_images:
|
||
department_filtered_images = available_images
|
||
|
||
# 根据图片类型(实拍图/模板图)进行分类处理
|
||
# 根据image_source字段:1=clean_images(模板图), 2=Flower_character(实拍图)
|
||
actual_photos = [] # 实拍图
|
||
template_photos = [] # 模板图
|
||
|
||
for img in department_filtered_images:
|
||
image_source = img.get('image_source', 1) # 默认为模板图
|
||
if image_source == 2: # 实拍图
|
||
actual_photos.append(img)
|
||
else: # 模板图
|
||
template_photos.append(img)
|
||
|
||
# 按照挂载次数排序,优先选择挂载次数少的图片
|
||
actual_photos.sort(key=lambda x: x['image_attached_article_count'])
|
||
template_photos.sort(key=lambda x: x['image_attached_article_count'])
|
||
|
||
# 合并图片列表,优先使用实拍图,然后是模板图
|
||
filtered_images = actual_photos + template_photos
|
||
|
||
best_match = None
|
||
best_score = 0.0
|
||
|
||
# 遍历筛选后的可用图片,找到最佳匹配
|
||
for image_data in filtered_images:
|
||
image_tags = [image_data['tag_name']]
|
||
image_keywords = image_data.get('keywords_name', '')
|
||
image_department = image_data.get('department_name', '')
|
||
|
||
# 调用通义千问评估匹配度
|
||
is_match, score = self.call_qwen_for_matching(
|
||
article_title, article_tags, image_tags, image_keywords
|
||
)
|
||
|
||
# 记录评估结果
|
||
logger.info(f"文章 {article_id} vs 图片 {image_data['image_id']} - 匹配: {is_match}, 分数: {score}, 科室: {image_department}")
|
||
|
||
# 更新最佳匹配
|
||
if is_match and score > best_score and score >= MATCH_THRESHOLD:
|
||
best_score = score
|
||
best_match = image_data
|
||
|
||
# 如果找到匹配的图片,插入关联记录
|
||
if best_match:
|
||
if self.insert_article_image_relation(article_id, best_match, best_score):
|
||
logger.info(f"文章 {article_id} 成功匹配图片 {best_match['image_id']}, 分数: {best_score}")
|
||
self.log_to_database('INFO', f"文章匹配成功",
|
||
f"文章ID: {article_id}, 图片ID: {best_match['image_id']}, 分数: {best_score}")
|
||
return True
|
||
else:
|
||
return False
|
||
else:
|
||
# 未找到合适的匹配图片,使用Gemini生成图片
|
||
logger.info(f"文章 {article_id} 未找到合适的匹配图片,调用Gemini生成图片")
|
||
self.log_to_database('WARNING', f"文章未找到匹配图片,尝试生成图片", f"文章ID: {article_id}")
|
||
|
||
# 构建生成提示词
|
||
prompt = f"与'{article_title}'相关的插图,标签: {', '.join(article_tags)}"
|
||
generated_image_url = self.generate_image_with_gemini(prompt, article_tags, article_id)
|
||
|
||
if generated_image_url:
|
||
logger.info(f"文章 {article_id} 成功生成图片: {generated_image_url}")
|
||
self.log_to_database('INFO', f"文章生成图片成功",
|
||
f"文章ID: {article_id}, 图片URL: {generated_image_url}")
|
||
return True
|
||
else:
|
||
logger.error(f"文章 {article_id} 生成图片失败")
|
||
self.log_to_database('ERROR', f"文章生成图片失败", f"文章ID: {article_id}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
error_msg = f"匹配文章图片异常 - 文章ID: {article_id}, 错误: {e}"
|
||
logger.error(error_msg)
|
||
self.log_to_database('ERROR', error_msg, traceback.format_exc())
|
||
return False
|
||
|
||
def get_next_available_article(self, pending_articles: List[Dict]) -> Optional[Dict]:
|
||
"""线程安全地获取下一篇待处理文章"""
|
||
with self.processing_lock:
|
||
for article_data in pending_articles:
|
||
article_id = article_data['article_id']
|
||
if article_id not in self.processed_articles:
|
||
self.processed_articles.add(article_id)
|
||
return article_data
|
||
return None
|
||
|
||
def worker_process_articles(self, pending_articles: List[Dict],
|
||
available_images: List[Dict], worker_id: int) -> int:
|
||
"""Worker线程处理文章匹配"""
|
||
processed_count = 0
|
||
thread_name = f"Worker-{worker_id}"
|
||
threading.current_thread().name = thread_name
|
||
|
||
logger.info(f"[{thread_name}] 启动,准备处理文章匹配")
|
||
|
||
while True:
|
||
# 线程安全地获取下一篇待处理文章
|
||
article_data = self.get_next_available_article(pending_articles)
|
||
if not article_data:
|
||
logger.info(f"[{thread_name}] 没有更多待处理文章,退出")
|
||
break
|
||
|
||
# 匹配文章与图片
|
||
if self.match_article_with_images(article_data, available_images):
|
||
processed_count += 1
|
||
logger.info(f"[{thread_name}] 成功处理文章: {article_data['article_id']}")
|
||
else:
|
||
logger.warning(f"[{thread_name}] 文章处理失败: {article_data['article_id']}")
|
||
|
||
logger.info(f"[{thread_name}] 完成,共处理 {processed_count} 篇文章")
|
||
return processed_count
|
||
|
||
def run_matching(self):
|
||
"""运行文章图片匹配流程"""
|
||
logger.info("开始文章图片智能匹配...")
|
||
self.log_to_database('INFO', '启动文章图片智能匹配服务', f'worker数量: {WORKER_COUNT}')
|
||
|
||
try:
|
||
# 获取需要匹配的文章
|
||
pending_articles = self.get_articles_with_tags()
|
||
if not pending_articles:
|
||
logger.info("没有需要匹配的文章")
|
||
return
|
||
|
||
# 获取可用图片
|
||
available_images = self.get_available_images_with_tags()
|
||
if not available_images:
|
||
logger.warning("没有可用图片,无法进行匹配")
|
||
self.log_to_database('WARNING', '没有可用图片')
|
||
return
|
||
|
||
logger.info(f"开始匹配 {len(pending_articles)} 篇文章与 {len(available_images)} 张图片")
|
||
self.log_to_database('INFO', '开始批量匹配',
|
||
f'文章数: {len(pending_articles)}, 图片数: {len(available_images)}')
|
||
|
||
# 清空已处理记录集合
|
||
with self.processing_lock:
|
||
self.processed_articles.clear()
|
||
|
||
# 使用线程池并行处理
|
||
with ThreadPoolExecutor(max_workers=WORKER_COUNT, thread_name_prefix="MatchWorker") as executor:
|
||
# 提交worker任务
|
||
future_to_worker = {}
|
||
for worker_id in range(1, WORKER_COUNT + 1):
|
||
future = executor.submit(
|
||
self.worker_process_articles,
|
||
pending_articles,
|
||
available_images,
|
||
worker_id
|
||
)
|
||
future_to_worker[future] = worker_id
|
||
|
||
# 等待所有worker完成
|
||
total_processed = 0
|
||
for future in as_completed(future_to_worker):
|
||
worker_id = future_to_worker[future]
|
||
try:
|
||
processed_count = future.result()
|
||
total_processed += processed_count
|
||
logger.info(f"Worker-{worker_id} 完成,处理了 {processed_count} 篇文章")
|
||
except Exception as e:
|
||
logger.error(f"Worker-{worker_id} 执行异常: {e}")
|
||
self.log_to_database('ERROR', f'Worker-{worker_id} 执行异常', str(e))
|
||
|
||
logger.info(f"匹配完成,共处理 {total_processed} 篇文章")
|
||
self.log_to_database('INFO', '匹配任务完成', f'共处理 {total_processed} 篇文章')
|
||
|
||
except Exception as e:
|
||
error_msg = f"匹配流程异常: {e}"
|
||
logger.error(error_msg)
|
||
self.log_to_database('ERROR', error_msg, traceback.format_exc())
|
||
|
||
def main():
|
||
"""主函数"""
|
||
matcher = ArticleImageMatcher()
|
||
|
||
try:
|
||
# 运行匹配
|
||
matcher.run_matching()
|
||
|
||
except Exception as e:
|
||
logger.error(f"程序运行异常: {e}")
|
||
matcher.log_to_database('ERROR', f'程序运行异常: {e}', traceback.format_exc())
|
||
|
||
if __name__ == "__main__":
|
||
main()
|