Files
baijiahao_text_to_image/article_auto_image_matching.py

1524 lines
69 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文章图片智能匹配脚本
基于通义千问大模型智能匹配文章与图片将结果存储到ai_article_images表
"""
import os
import sys
import time
import json
import logging
import requests
import pymysql
import traceback
import threading
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
# 添加项目根目录到Python路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from database_config import db_manager
from log_config import setup_logger
# 配置日志记录器
logger = setup_logger(
name='article_image_matching',
log_file='logs/article_image_matching.log',
error_log_file='logs/article_image_matching_error.log',
level=logging.INFO,
console_output=True
)
# 配置常量
QWEN_API_KEY = "sk-e6a38204022a4b538b8954f0584712af"
QWEN_API_URL = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"
WORKER_COUNT = 4 # 并行处理worker数量
BATCH_SIZE = 50 # 每批处理的文章数量
MATCH_THRESHOLD = 0.6 # 匹配分数阈值0-1
class ArticleImageMatcher:
def __init__(self):
# 使用统一的数据库管理器
self.db_manager = db_manager
# 并行处理相关
self.processing_lock = threading.Lock()
self.processed_articles = set()
logger.info("ArticleImageMatcher 初始化完成")
self.log_to_database('INFO', 'ArticleImageMatcher 初始化完成')
def log_to_database(self, level: str, message: str, details: Optional[str] = None):
"""记录日志到数据库ai_logs表"""
try:
connection = self.db_manager.get_connection()
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
status_map = {
'INFO': 'success',
'WARNING': 'warning',
'ERROR': 'error'
}
status = status_map.get(level, 'success')
sql = """
INSERT INTO ai_logs (user_id, action, description, status, error_message, created_at)
VALUES (%s, %s, %s, %s, %s, NOW())
"""
cursor.execute(sql, (None, 'article_image_matching', message, status, details))
connection.commit()
logger.info(f"日志已记录到数据库: {level} - {message}")
finally:
connection.close()
except Exception as e:
logger.error(f"记录日志到数据库失败: {e}")
def get_articles_with_tags(self, limit: int = BATCH_SIZE) -> List[Dict]:
"""
从ai_articles表获取需要匹配图片的文章
Returns:
包含文章ID标签等信息的列表
"""
try:
connection = self.db_manager.get_connection()
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
# 查询有标签但未匹配图片的文章
sql = """
SELECT
a.id as article_id,
a.title,
a.content,
a.coze_tag,
a.department,
a.department_id
FROM ai_articles a
WHERE NOT EXISTS (
SELECT 1 FROM ai_article_images ai
WHERE ai.article_id = a.id
)
AND a.status = 'pending_review'
AND a.review_user_id = 152
ORDER BY a.id DESC
LIMIT %s
"""
cursor.execute(sql, (limit,))
results = cursor.fetchall()
if results:
logger.info(f"查询到 {len(results)} 篇需要匹配图片的文章")
self.log_to_database('INFO', f"查询到需要匹配图片的文章", f"数量: {len(results)}")
else:
logger.info("未查询到需要匹配图片的文章")
return results
finally:
connection.close()
except Exception as e:
error_msg = f"查询文章标签异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return []
def get_available_images_with_tags(self, article_department_id: int = 0) -> List[Dict]:
"""
从ai_image_tags表获取可用的图片及其标签按科室ID过滤且状态为generate且挂载次数<5
Returns:
包含图片ID标签等信息的列表
"""
try:
connection = self.db_manager.get_connection()
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
# 查询指定科室ID且状态为generate且附加文章数量小于5的图片
# 包含image_source字段用于区分实拍图和模板图
if article_department_id > 0:
sql = """
SELECT
it.id,
it.image_id,
it.image_name,
it.image_url,
it.image_thumb_url,
it.tag_id,
it.tag_name,
it.keywords_id,
it.keywords_name,
it.department_id,
it.department_name,
it.image_attached_article_count,
it.image_source
FROM ai_image_tags it
WHERE it.image_attached_article_count < 5
AND it.department_id = %s
AND it.status = 'generate'
ORDER BY it.image_attached_article_count ASC, it.id DESC
"""
cursor.execute(sql, (article_department_id,))
else:
# 如果没有提供科室ID则查询所有科室的图片
sql = """
SELECT
it.id,
it.image_id,
it.image_name,
it.image_url,
it.image_thumb_url,
it.tag_id,
it.tag_name,
it.keywords_id,
it.keywords_name,
it.department_id,
it.department_name,
it.image_attached_article_count,
it.image_source
FROM ai_image_tags it
WHERE it.image_attached_article_count < 5
AND it.status = 'generate'
ORDER BY it.image_attached_article_count ASC, it.id DESC
"""
cursor.execute(sql)
results = cursor.fetchall()
if results:
logger.info(f"查询到 {len(results)} 张可用图片")
self.log_to_database('INFO', f"查询到可用图片", f"数量: {len(results)}")
else:
logger.info("未查询到可用图片")
# 如果相关科室下没有可使用的图片,记录日志
if article_department_id > 0:
logger.info(f"科室ID {article_department_id} 下没有可使用的图片将进行Gemini生图")
return results
finally:
connection.close()
except Exception as e:
error_msg = f"查询图片标签异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return []
def parse_article_tags(self, coze_tag: str) -> List[str]:
"""
解析文章标签支持JSON格式和逗号分隔格式
Args:
coze_tag: 标签字符串
Returns:
标签列表
"""
try:
if not coze_tag:
return []
# 尝试解析JSON格式
try:
tags_data = json.loads(coze_tag)
if isinstance(tags_data, list):
return tags_data
elif isinstance(tags_data, dict):
return list(tags_data.values())
except json.JSONDecodeError:
pass
# 按逗号分隔
tags = [tag.strip() for tag in str(coze_tag).split(',') if tag.strip()]
return tags
except Exception as e:
logger.error(f"解析文章标签异常: {e}")
return []
def call_qwen_for_matching(self, article_title: str, article_tags: List[str],
image_tags: List[str], image_keywords: str) -> Tuple[bool, float]:
"""
调用通义千问API评估文章与图片的匹配度
Args:
article_title: 文章标题
article_tags: 文章标签列表
image_tags: 图片标签列表
image_keywords: 图片关键词
Returns:
(是否匹配, 匹配分数)
"""
try:
# 构建提示词
prompt = f"""请评估以下文章与图片的匹配度:
文章标题{article_title}
文章标签{', '.join(article_tags)}
图片标签{', '.join(image_tags)}
图片关键词{image_keywords}
请根据标签的语义相关性给出0-1之间的匹配分数并说明是否适合匹配
输出格式{{"match": true/false, "score": 0.0-1.0, "reason": "匹配原因"}}
只输出JSON格式不要其他内容"""
headers = {
'Authorization': f'Bearer {QWEN_API_KEY}',
'Content-Type': 'application/json'
}
payload = {
"model": "qwen-max",
"input": {
"messages": [
{
"role": "user",
"content": prompt
}
]
},
"parameters": {
"result_format": "message"
}
}
response = requests.post(
QWEN_API_URL,
json=payload,
headers=headers,
timeout=30
)
if response.status_code == 200:
result = response.json()
if result.get('output') and result['output'].get('choices'):
message = result['output']['choices'][0].get('message', {})
content = message.get('content', '')
# 解析JSON响应
try:
match_result = json.loads(content)
is_match = match_result.get('match', False)
score = match_result.get('score', 0.0)
reason = match_result.get('reason', '')
logger.info(f"通义千问评估结果 - 匹配: {is_match}, 分数: {score}, 原因: {reason}")
return is_match, score
except json.JSONDecodeError:
logger.error(f"解析通义千问响应失败: {content}")
return False, 0.0
else:
logger.error(f"通义千问API返回格式异常: {result}")
return False, 0.0
else:
logger.error(f"通义千问API请求失败状态码: {response.status_code}")
return False, 0.0
except requests.exceptions.Timeout:
logger.error("通义千问API请求超时")
return False, 0.0
except Exception as e:
error_msg = f"调用通义千问API异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return False, 0.0
def get_article_image_count(self, article_id: int) -> int:
"""
获取文章当前已关联的图片数量
Args:
article_id: 文章ID
Returns:
图片数量
"""
try:
connection = self.db_manager.get_connection()
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
sql = """
SELECT COUNT(*) as image_count
FROM ai_article_images
WHERE article_id = %s
"""
cursor.execute(sql, (article_id,))
result = cursor.fetchone()
return result['image_count'] if result else 0
finally:
connection.close()
except Exception as e:
error_msg = f"查询文章图片数量异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return 0
def update_article_status(self, article_id: int, new_status: str) -> bool:
"""
更新文章状态
Args:
article_id: 文章ID
new_status: 新状态
Returns:
是否更新成功
"""
try:
connection = self.db_manager.get_connection()
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
# 更新文章状态
update_sql = """
UPDATE ai_articles
SET status = %s
WHERE id = %s
"""
cursor.execute(update_sql, (new_status, article_id))
connection.commit()
logger.info(f"成功更新文章 {article_id} 状态为 {new_status}")
return True
finally:
connection.close()
except Exception as e:
error_msg = f"更新文章状态异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return False
def insert_article_image_relation(self, article_id: int, image_data: Dict,
match_score: float) -> bool:
"""
将匹配结果插入ai_article_images表
Args:
article_id: 文章ID
image_data: 图片数据
match_score: 匹配分数
Returns:
是否插入成功
"""
try:
connection = self.db_manager.get_connection()
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
# 查询当前文章下已有图片的最大sort_order
query_max_sort = """
SELECT COALESCE(MAX(sort_order), 0) as max_sort_order
FROM ai_article_images
WHERE article_id = %s
"""
cursor.execute(query_max_sort, (article_id,))
result = cursor.fetchone()
max_sort_order = result.get('max_sort_order', 0)
new_sort_order = max_sort_order + 1
# 插入关联记录
insert_sql = """
INSERT INTO ai_article_images
(article_id, image_id, image_url, image_thumb_url, image_tag_id,
sort_order, keywords_id, keywords_name, department_id, department_name,
image_source, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())
"""
cursor.execute(insert_sql, (
article_id,
image_data['image_id'],
image_data['image_url'],
image_data['image_thumb_url'],
image_data['id'], # image_tag_id
new_sort_order,
image_data['keywords_id'],
image_data['keywords_name'],
image_data['department_id'],
image_data['department_name'],
image_data['image_source'] # 使用原始图片的image_source值
))
# 更新图片附加文章计数
update_sql = """
UPDATE ai_image_tags
SET image_attached_article_count = image_attached_article_count + 1
WHERE id = %s
"""
cursor.execute(update_sql, (image_data['id'],))
# 更新图片状态为published
update_image_status_sql = """
UPDATE ai_images
SET status = 'published'
WHERE id = %s
"""
cursor.execute(update_image_status_sql, (image_data['image_id'],))
# 调用RPA审核接口更新文章状态
if self.call_rpa_review_api([article_id], 13):
logger.info(f"已通过RPA接口更新文章 {article_id} 状态")
else:
logger.error(f"通过RPA接口更新文章 {article_id} 状态失败")
return False
connection.commit()
logger.info(f"成功插入文章图片关联 - 文章ID: {article_id}, 图片ID: {image_data['image_id']}, 分数: {match_score}")
return True
finally:
connection.close()
except Exception as e:
error_msg = f"插入文章图片关联异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return False
def get_article_info(self, article_id: int) -> Optional[Dict]:
"""
获取文章信息包括部门和关键词信息
Args:
article_id: 文章ID
Returns:
文章信息字典
"""
try:
connection = self.db_manager.get_connection()
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
sql = """
SELECT id, title, content, coze_tag, department, department_id
FROM ai_articles
WHERE id = %s
"""
cursor.execute(sql, (article_id,))
result = cursor.fetchone()
return result
finally:
connection.close()
except Exception as e:
error_msg = f"查询文章信息异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return None
def get_article_image_sources(self, article_id: int) -> List[int]:
"""
获取文章现有图片的image_source值列表
Args:
article_id: 文章ID
Returns:
image_source值列表
"""
try:
connection = self.db_manager.get_connection()
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
sql = """
SELECT image_source
FROM ai_article_images
WHERE article_id = %s
"""
cursor.execute(sql, (article_id,))
results = cursor.fetchall()
return [row['image_source'] for row in results] if results else []
finally:
connection.close()
except Exception as e:
error_msg = f"查询文章图片source异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return []
def add_text_to_image_local(self, image_path: str, text: str, output_path: str) -> bool:
"""
本地图片文字融合处理压字花
Args:
image_path: 原图路径
text: 要添加的文字文章标题
output_path: 输出路径
Returns:
是否处理成功
"""
try:
from PIL import Image, ImageDraw, ImageFont
import textwrap
# 打开图片
image = Image.open(image_path)
draw = ImageDraw.Draw(image)
# 获取图片尺寸
img_width, img_height = image.size
# 计算自适应字体大小基础120px
base_font_size = 120
font_size = int(base_font_size * min(img_width / 1920, img_height / 1080))
font_size = max(40, min(font_size, 150)) # 限制范围40-150px
# 尝试加载字体支持Windows和Linux
font = None
font_loaded = False
try:
# 根据操作系统选择字体路径
import platform
system = platform.system()
if system == 'Windows':
font_paths = [
'C:/Windows/Fonts/msyh.ttc', # 微软雅黑
'C:/Windows/Fonts/simhei.ttf', # 黑体
'C:/Windows/Fonts/simsun.ttc', # 宋体
'C:/Windows/Fonts/msyhbd.ttc', # 微软雅黑Bold
'C:/Windows/Fonts/simkai.ttf', # 楷体
]
else: # Linux/Unix
font_paths = [
'/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc', # 文泉驿正黑
'/usr/share/fonts/wqy-zenhei/wqy-zenhei.ttc', # 文泉驿正黑(旧路径)
'/usr/share/fonts/truetype/wqy/wqy-microhei.ttc', # 文泉驿微米黑
'/usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf', # Droid Sans
'/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc', # Noto Sans CJK
'/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc', # Noto Sans CJK
'/usr/share/fonts/truetype/arphic/uming.ttc', # AR PL UMing
'/usr/share/fonts/truetype/arphic/ukai.ttc', # AR PL UKai
]
logger.info(f"[压字花] 检测到操作系统: {system},尝试加载中文字体...")
for font_path in font_paths:
try:
if os.path.exists(font_path):
font = ImageFont.truetype(font_path, font_size)
font_loaded = True
logger.info(f"[压字花] 成功加载字体: {font_path}")
break
else:
logger.debug(f"[压字花] 字体文件不存在: {font_path}")
except Exception as font_err:
logger.debug(f"[压字花] 字体加载失败 {font_path}: {font_err}")
continue
except Exception as e:
logger.warning(f"[压字花] 字体加载异常: {e}")
# 如果所有字体都加载失败,给出安装提示
if not font_loaded or font is None:
if system == 'Linux':
error_msg = (
"无法加载任何中文字体。请在Linux服务器上安装中文字体\n"
"Ubuntu/Debian: sudo apt-get install fonts-wqy-zenhei fonts-wqy-microhei\n"
"CentOS/RHEL: sudo yum install wqy-zenhei-fonts wqy-microhei-fonts\n"
"或: sudo yum install google-noto-sans-cjk-fonts"
)
else:
error_msg = "无法加载任何中文字体,压字花功能需要中文字体支持"
logger.error(f"[压字花] {error_msg}")
raise Exception(error_msg)
# 文字自动换行(每行最多12个字符避免过长)
max_chars_per_line = 12
lines = textwrap.wrap(text, width=max_chars_per_line)
# 如果标题过长,手动分割
if not lines:
lines = [text]
# 计算文字总高度
line_height = font_size + 20 # 行间距
total_text_height = len(lines) * line_height
# 计算文字起始Y坐标居中
start_y = (img_height - total_text_height) // 2
# 深褐色文字颜色 RGB(180, 60, 50)
text_color = (180, 60, 50)
# 白色描边颜色
outline_color = (255, 255, 255)
outline_width = 3 # 描边宽度
# 绘制每一行文字
for i, line in enumerate(lines):
# 计算文字宽度(居中)
try:
bbox = draw.textbbox((0, 0), line, font=font)
text_width = bbox[2] - bbox[0]
except:
# 兼容旧版本Pillow
text_width = len(line) * font_size * 0.6
x = (img_width - text_width) // 2
y = start_y + i * line_height
# 绘制白色描边(多次绘制形成描边效果)
for offset_x in range(-outline_width, outline_width + 1):
for offset_y in range(-outline_width, outline_width + 1):
if offset_x != 0 or offset_y != 0:
draw.text((x + offset_x, y + offset_y), line, font=font, fill=outline_color)
# 绘制深褐色文字
draw.text((x, y), line, font=font, fill=text_color)
# 保存图片
image.save(output_path, 'PNG', optimize=True, compress_level=6)
logger.info(f"[压字花] 文字融合成功: {output_path}")
return True
except Exception as e:
logger.error(f"[压字花] 文字融合失败: {e}")
return False
def upload_cover_image_with_text_fusion(self, image_path: str, article_id: int, article_title: str,
image_info: Dict) -> bool:
"""
本地化处理封面图压字花 + 上传 + 关联
Args:
image_path: 本地图片路径
article_id: 文章ID
article_title: 文章标题用于压字花
image_info: 图片信息字典
Returns:
是否上传成功
"""
try:
# 1. 本地压字花处理
logger.info(f"[封面图本地化] 开始压字花处理文章ID: {article_id}, 标题: {article_title}")
# 生成带文字的临时文件
import uuid
text_temp_path = f"temp_text_{uuid.uuid4().hex}.png"
fusion_success = self.add_text_to_image_local(
image_path=image_path,
text=article_title,
output_path=text_temp_path
)
if not fusion_success:
logger.error(f"[封面图本地化] 压字花处理失败")
return False
# 2. 使用通用图片上传接口
logger.info(f"[封面图本地化] 开始上传压字花图片")
upload_result = self.upload_image_to_server(text_temp_path, image_info['tag_image_id'])
if not upload_result:
# 删除临时文件
if os.path.exists(text_temp_path):
os.remove(text_temp_path)
logger.error(f"[封面图本地化] 上传失败")
return False
# 获取上传后的真实路径
uploaded_relative_path = upload_result.get('relative_path') or upload_result.get('image_url')
uploaded_thumb_path = upload_result.get('thumb_relative_path') or upload_result.get('image_thumb_url', '')
logger.info(f"[封面图本地化] 上传成功,相对路径: {uploaded_relative_path}")
# 3. 更新数据库中的图片URL
self.update_image_urls_after_upload(
image_id=image_info['image_id'],
tag_image_id=image_info['tag_image_id'],
image_url=uploaded_relative_path,
image_thumb_url=uploaded_thumb_path
)
# 4. 插入文章图片关联记录image_source=12表示封面图
article_image_id = self.insert_article_image_relation_for_generated(
article_id=article_id,
image_id=image_info['image_id'],
image_url=uploaded_relative_path,
image_thumb_url=uploaded_thumb_path,
tag_image_id=image_info['tag_image_id'],
keywords_id=image_info['keywords_id'],
keywords_name=image_info['keywords_name'],
department_id=image_info['department_id'],
department_name=image_info['department_name'],
image_source=12 # 封面图固定为12(实拍图)
)
if article_image_id:
logger.info(f"[封面图本地化] 文章图片关联信息已创建ai_article_images.id: {article_image_id}")
else:
logger.error(f"[封面图本地化] 文章图片关联创建失败")
return False
# 删除临时文件
if os.path.exists(text_temp_path):
os.remove(text_temp_path)
logger.info(f"[封面图本地化] 处理完成")
return True
except Exception as e:
logger.error(f"[封面图本地化] 处理失败: {e}")
return False
def generate_image_with_gemini(self, prompt: str, article_tags: List[str], article_id: int, image_type: str = "默认") -> Optional[str]:
"""
使用Gemini生成图片并上传到服务器
Args:
prompt: 图片生成提示词
article_tags: 文章标签列表用于查询department和keywords
article_id: 文章ID用于关联图片
image_type: 图片类型封面图/详情图/海报图
Returns:
上传后的图片URL失败返回None
"""
try:
# 从文章表获取文章的部门信息
article_info = self.get_article_info(article_id)
article_department = article_info.get('department', '') if article_info else ''
article_department_id = article_info.get('department_id', 0) if article_info else 0
# 导入必要的库
from google import genai
from google.genai.client import HttpOptions
client = genai.Client(
http_options=HttpOptions(base_url="https://work.poloapi.com"),
api_key="sk-V4tPnDgzFPa7nxWrvKnNJsW8ZcBXXPuGmjfgvPVRnwpHoeob"
)
logger.info(f"正在调用Gemini API生成图片提示词: {prompt[:50]}...")
# 生成内容
response = client.models.generate_content(
model="gemini-3-pro-image-preview",
contents=[prompt],
)
# 检查是否有候选答案
if not response.candidates:
raise Exception("Gemini API未返回任何候选答案")
# 处理响应
candidate = response.candidates[0]
if not candidate.content or not candidate.content.parts:
raise Exception("Gemini API返回的候选答案中没有内容部分")
for part in candidate.content.parts:
if hasattr(part, 'inline_data') and part.inline_data is not None:
image_data = part.inline_data
if image_data.data is not None:
# 生成临时文件名
timestamp_ms = int(time.time() * 1000)
temp_filename = f"temp_generated_image_{timestamp_ms}.png"
# 保存图片数据到临时文件
with open(temp_filename, 'wb') as f:
f.write(image_data.data)
logger.info(f"Gemini生成图片成功: {temp_filename}")
# 先上传图片到服务器,获取真实的文件名和路径
# 注意需要先插入ai_image_tags表获取tag_image_id才能上传
# 所以这里先使用临时路径插入数据库
today_date = datetime.now().strftime("%Y%m%d")
temp_image_path = f"{today_date}/{timestamp_ms}.png"
# 先将图片信息插入数据库(使用临时路径)
image_info = self.insert_generated_image_to_db(
f"{timestamp_ms}.png", # 临时文件名
temp_image_path, # 临时路径
article_department=article_department,
article_department_id=article_department_id,
article_keywords='',
article_keywords_id=0,
article_tags=article_tags
)
if not image_info:
os.remove(temp_filename)
raise Exception("插入图片信息到数据库失败")
logger.info(f"图片信息已插入数据库tag_image_id: {image_info['tag_image_id']}, image_id: {image_info['image_id']}")
# 使用tag_image_id上传图片到服务器
upload_result = self.upload_image_to_server(temp_filename, image_info['tag_image_id'])
if not upload_result:
os.remove(temp_filename)
raise Exception("图片上传失败")
# 从上传响应中获取真实的文件名和路径
uploaded_relative_path = upload_result.get('relative_path') or upload_result.get('image_url')
uploaded_thumb_path = upload_result.get('thumb_relative_path') or upload_result.get('image_thumb_url', '')
logger.info(f"图片上传成功,真实相对路径: {uploaded_relative_path}")
# 更新数据库中的图片URL为上传后的真实路径
self.update_image_urls_after_upload(
image_id=image_info['image_id'],
tag_image_id=image_info['tag_image_id'],
image_url=uploaded_relative_path,
image_thumb_url=uploaded_thumb_path
)
# 更新image_info为上传后的真实路径
image_info['image_url'] = uploaded_relative_path
image_info['image_thumb_url'] = uploaded_thumb_path
# ⭐ 关键修改:封面图本地化处理(本地压字花 + 通用上传 + 关联)
if image_type == '封面图':
# 获取文章标题用于压字花
article_info = self.get_article_info(article_id)
article_title = article_info.get('title', '') if article_info else ''
# 本地化处理封面图(绕过网络接口问题)
upload_success = self.upload_cover_image_with_text_fusion(
image_path=temp_filename,
article_id=article_id,
article_title=article_title,
image_info=image_info
)
if upload_success:
logger.info(f"[封面图] 文章 {article_id} 封面图本地化处理成功(已完成压字花和数据库关联)")
else:
logger.error(f"[封面图] 文章 {article_id} 封面图本地化处理失败")
# 删除临时文件
if os.path.exists(temp_filename):
os.remove(temp_filename)
return None
else:
# 详情图使用原有逻辑插入关联image_source=13
article_image_id = self.insert_article_image_relation_for_generated(
article_id=article_id,
image_id=image_info['image_id'],
image_url=image_info['image_url'],
image_thumb_url=image_info['image_thumb_url'],
tag_image_id=image_info['tag_image_id'],
keywords_id=image_info['keywords_id'],
keywords_name=image_info['keywords_name'],
department_id=image_info['department_id'],
department_name=image_info['department_name'],
image_source=13 # 详情图固定为13(AI生成图)
)
if article_image_id:
logger.info(f"[详情图] 文章图片关联信息已创建ai_article_images.id: {article_image_id}")
else:
logger.error(f"[详情图] 文章 {article_id} 图片关联创建失败")
# 删除临时文件
os.remove(temp_filename)
return uploaded_relative_path
raise Exception("Gemini API未返回有效的图片数据")
except ImportError:
logger.error("错误未安装google-genai库请运行 'pip install google-genai' 进行安装")
return None
except Exception as e:
error_msg = f"Gemini生成图片异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return None
def insert_generated_image_to_db(self, image_name: str, image_url: str, article_department: str = "", article_department_id: int = 0, article_keywords: str = "", article_keywords_id: int = 0, article_tags: List[str] = []) -> Optional[Dict]:
"""
将Gemini生成的图片信息插入数据库
Args:
image_name: 图片文件名
image_url: 图片URL路径
article_department: 文章部门名称
article_department_id: 文章部门ID
article_keywords: 文章关键词名称
article_keywords_id: 文章关键词ID
article_tags: 文章标签列表
Returns:
包含插入信息的字典
"""
try:
connection = self.db_manager.get_connection()
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
# 使用文章的部门和关键词信息,如果没有则使用默认值
department_id = article_department_id if article_department_id > 0 else 1
keywords_id = article_keywords_id if article_keywords_id > 0 else 1
department = article_department if article_department else 'AI生成'
keywords = article_keywords if article_keywords else 'AI图片'
# 先确保文章的标签存在于ai_tags表中
tag_id = 1 # 默认tag_id
if article_tags:
# 首先查询ai_tags表中是否已存在该标签
query_tag = """
SELECT id
FROM ai_tags
WHERE tag_name = %s
LIMIT 1
"""
cursor.execute(query_tag, (article_tags[0],))
tag_info = cursor.fetchone()
if tag_info:
# 如果标签已存在,使用现有的信息
tag_id = tag_info['id']
tag_name = article_tags[0]
else:
# 如果标签不存在,则插入新标签
insert_tag_query = """
INSERT INTO ai_tags (tag_name, created_at, updated_at)
VALUES (%s, NOW(), NOW())
"""
cursor.execute(insert_tag_query, (article_tags[0],))
tag_id = cursor.lastrowid
tag_name = article_tags[0]
else:
# 如果没有文章标签,使用默认值
tag_name = "AI生成"
# 插入ai_images表
insert_image_query = """
INSERT INTO ai_images
(image_name, image_url, image_thumb_url, department, keywords, image_type, upload_user_id, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_image_query, (
image_name, image_url, '', department, keywords, 'medical', 1, 'active'
))
image_id = cursor.lastrowid
logger.info(f"图片信息已插入ai_images表image_id: {image_id}")
# 插入ai_image_tags表default_tag_id和default_tag_name与tag_id和tag_name保持一致
insert_tag_query = """
INSERT INTO ai_image_tags
(image_id, image_name, image_url, image_thumb_url, tag_id, tag_name,
keywords_id, keywords_name, department_id, department_name,
default_tag_id, default_tag_name, image_source, created_user_id, image_attached_article_count, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_tag_query, (
image_id, image_name, image_url, '', tag_id, tag_name,
keywords_id, keywords, department_id, department,
tag_id, tag_name, # default_tag_id和default_tag_name与tag_id和tag_name一致
3, 1, 0, # image_source: 3表示AI生成
'generate' # status字段设置为generate
))
tag_image_id = cursor.lastrowid
logger.info(f"图片标签信息已插入ai_image_tags表tag_image_id: {tag_image_id}")
connection.commit()
return {
'tag_image_id': tag_image_id,
'image_id': image_id,
'image_url': image_url,
'image_thumb_url': '',
'keywords_id': keywords_id,
'keywords_name': keywords,
'department_id': department_id,
'department_name': department
}
finally:
connection.close()
except Exception as e:
logger.error(f"插入图片信息到数据库失败: {e}")
return None
def upload_image_to_server(self, image_path: str, tag_image_id: int) -> Optional[Dict]:
"""
上传图片到服务器
Args:
image_path: 本地图片路径
tag_image_id: 图片标签ID
Returns:
上传响应数据字典包含relative_path等字段
"""
base_url = "http://47.99.184.230:8324"
jwt_token = self.login_and_get_jwt_token(base_url)
if not jwt_token:
raise Exception("获取JWT token失败无法上传图片")
upload_url = f"{base_url}/api/images/upload"
headers = {'Authorization': f'Bearer {jwt_token}'}
with open(image_path, 'rb') as image_file:
files = {'file': image_file}
data = {'tag_image_id': tag_image_id}
response = requests.post(upload_url, headers=headers, files=files, data=data)
logger.info(f"图片上传响应状态码: {response.status_code}")
if response.status_code == 200:
result = response.json()
if result.get('code') == 200:
upload_data = result['data']
logger.info(f"上传成功,相对路径: {upload_data.get('relative_path')}")
return upload_data
else:
raise Exception(f"图片上传失败: {result.get('message', '未知错误')}")
else:
raise Exception(f"图片上传请求失败,状态码: {response.status_code}")
def update_image_urls_after_upload(self, image_id: int, tag_image_id: int, image_url: str, image_thumb_url: str) -> bool:
"""
上传成功后更新数据库中的图片URL
Args:
image_id: ai_images表的图片ID
tag_image_id: ai_image_tags表的标签ID
image_url: 上传后的相对路径
image_thumb_url: 缩略图相对路径
Returns:
是否更新成功
"""
try:
connection = self.db_manager.get_connection()
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
# 更新ai_images表
update_images_sql = """
UPDATE ai_images
SET image_url = %s, image_thumb_url = %s
WHERE id = %s
"""
cursor.execute(update_images_sql, (image_url, image_thumb_url, image_id))
logger.info(f"已更新ai_images表image_id: {image_id}, URL: {image_url}")
# 更新ai_image_tags表
update_tags_sql = """
UPDATE ai_image_tags
SET image_url = %s, image_thumb_url = %s
WHERE id = %s
"""
cursor.execute(update_tags_sql, (image_url, image_thumb_url, tag_image_id))
logger.info(f"已更新ai_image_tags表tag_image_id: {tag_image_id}, URL: {image_url}")
connection.commit()
return True
finally:
connection.close()
except Exception as e:
logger.error(f"更新图片URL失败: {e}")
return False
def login_and_get_jwt_token(self, base_url: str) -> Optional[str]:
"""登录获取JWT token"""
login_url = f"{base_url}/api/auth/login"
login_data = {"username": "user010", "password": "@5^2W6R7"}
try:
response = requests.post(login_url, json=login_data, headers={'Content-Type': 'application/json'})
if response.status_code == 200:
result = response.json()
if result.get('code') == 200:
logger.info("JWT token获取成功")
return result['data']['token']
logger.error(f"登录失败: {response.status_code}")
return None
except Exception as e:
logger.error(f"登录异常: {e}")
return None
def call_rpa_review_api(self, article_ids: List[int], image_source: int = 0) -> bool:
"""
调用RPA审核接口
Args:
article_ids: 文章ID列表
image_source: 图片来源类型 (11=模板图, 12=实拍图, 13=AI生成图)
Returns:
是否调用成功
"""
try:
base_url = "http://47.99.184.230:8324" # API基础URL
jwt_token = self.login_and_get_jwt_token(base_url)
if not jwt_token:
logger.error("获取JWT token失败无法调用RPA审核接口")
return False
# 准备请求数据
api_url = f"{base_url}/api/articles/rpa/review"
headers = {
'Authorization': f'Bearer {jwt_token}',
'Content-Type': 'application/json'
}
payload = {
"article_ids": article_ids,
"image_source": image_source
}
logger.info(f"调用RPA审核接口: {api_url}")
logger.info(f"请求参数: article_ids={article_ids}, image_source={image_source}")
response = requests.post(api_url, json=payload, headers=headers, timeout=30)
logger.info(f"RPA审核接口响应状态码: {response.status_code}")
logger.info(f"RPA审核接口响应内容: {response.text}")
if response.status_code == 200:
result = response.json()
if result.get('code') == 200:
logger.info(f"RPA审核接口调用成功: {result.get('message', '操作完成')}")
return True
else:
logger.error(f"RPA审核接口返回错误: {result.get('message', '未知错误')}")
return False
else:
logger.error(f"RPA审核接口调用失败状态码: {response.status_code}")
return False
except requests.exceptions.Timeout:
logger.error("RPA审核接口调用超时")
return False
except Exception as e:
logger.error(f"调用RPA审核接口异常: {e}")
return False
def insert_article_image_relation_for_generated(self, article_id: int, image_id: int, image_url: str,
image_thumb_url: str, tag_image_id: int, keywords_id: int,
keywords_name: str, department_id: int, department_name: str,
image_source: int = 0) -> Optional[int]:
"""
将文章与生成图片的关联信息处理不调用RPA接口
注意根据新要求只插入关联信息等所有图片生成完成后统一调用RPA接口
"""
try:
# 1. 首先插入ai_article_images表保持原有逻辑
connection = self.db_manager.get_connection()
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
# 查询当前文章下已有图片的最大sort_order
query_max_sort = """
SELECT COALESCE(MAX(sort_order), 0) as max_sort_order
FROM ai_article_images
WHERE article_id = %s
"""
cursor.execute(query_max_sort, (article_id,))
result = cursor.fetchone()
max_sort_order = result['max_sort_order'] if result else 0
new_sort_order = max_sort_order + 1
# 插入ai_article_images表
insert_query = """
INSERT INTO ai_article_images
(article_id, image_id, image_url, image_thumb_url, image_tag_id, sort_order,
keywords_id, keywords_name, department_id, department_name, image_source)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_query, (
article_id, image_id, image_url, image_thumb_url, tag_image_id, new_sort_order,
keywords_id, keywords_name, department_id, department_name, image_source # 使用传入的image_source值
))
article_image_id = cursor.lastrowid
logger.info(f"文章图片关联信息已插入ai_article_images表id: {article_image_id}, sort_order: {new_sort_order}, image_source: {image_source}")
# 提交插入操作
connection.commit()
finally:
connection.close()
# 注意不再在这里调用RPA接口而是在所有图片都生成完成后统一调用
# 记录操作日志到ai_logs表
self.log_to_database('INFO', f"AI生成图片关联完成", f"文章ID: {article_id}, 图片ID: {image_id}, 关联记录ID: {article_image_id}")
return article_image_id
except Exception as e:
logger.error(f"处理文章图片关联信息失败: {e}")
return None
def match_article_with_images(self, article_data: Dict) -> bool:
"""
为单篇文章匹配图片未成功匹配时调用Gemini生图
Args:
article_data: 文章数据
Returns:
是否匹配成功
"""
article_id = article_data['article_id']
article_title = article_data.get('title', '')
article_content = article_data.get('content', '')
coze_tag = article_data.get('coze_tag', '')
article_department = article_data.get('department', '')
article_department_id = article_data.get('department_id', 0)
try:
# 解析文章标签
if not coze_tag:
logger.warning(f"文章 {article_id} 没有标签信息,跳过")
return False
article_tags = self.parse_article_tags(coze_tag)
if not article_tags:
logger.warning(f"文章 {article_id} 没有有效标签,跳过")
return False
logger.info(f"开始为文章 {article_id} 匹配图片 - 标题: {article_title}, 标签: {article_tags}, 科室: {article_department}, 科室ID: {article_department_id}")
# 根据文章科室ID获取可用图片
available_images = self.get_available_images_with_tags(article_department_id)
if not available_images:
logger.info(f"文章 {article_id} 没有找到对应科室的可用图片将进行Gemini生图")
# 根据图片类型(实拍图/模板图)进行分类处理
# 根据image_source字段1=clean_images(模板图), 2=Flower_character(实拍图)
actual_photos = [] # 实拍图
template_photos = [] # 模板图
for img in available_images:
image_source = img.get('image_source', 1) # 默认为模板图
if image_source == 2: # 实拍图
actual_photos.append(img)
else: # 模板图
template_photos.append(img)
# 按照挂载次数排序,优先选择挂载次数少的图片
actual_photos.sort(key=lambda x: x['image_attached_article_count'])
template_photos.sort(key=lambda x: x['image_attached_article_count'])
# 合并图片列表,优先使用实拍图,然后是模板图
filtered_images = actual_photos + template_photos
best_match = None
best_score = 0.0
# 遍历筛选后的可用图片,找到最佳匹配
for image_data in filtered_images:
image_tags = [image_data['tag_name']]
image_keywords = image_data.get('keywords_name', '')
image_department = image_data.get('department_name', '')
# 调用通义千问评估匹配度
is_match, score = self.call_qwen_for_matching(
article_title, article_tags, image_tags, image_keywords
)
# 记录评估结果
logger.info(f"文章 {article_id} vs 图片 {image_data['image_id']} - 匹配: {is_match}, 分数: {score}, 科室: {image_department}")
# 更新最佳匹配
if is_match and score > best_score and score >= MATCH_THRESHOLD:
best_score = score
best_match = image_data
# 如果找到匹配的图片,插入关联记录
if best_match:
if self.insert_article_image_relation(article_id, best_match, best_score):
logger.info(f"文章 {article_id} 成功匹配图片 {best_match['image_id']}, 分数: {best_score}")
self.log_to_database('INFO', f"文章匹配成功",
f"文章ID: {article_id}, 图片ID: {best_match['image_id']}, 分数: {best_score}")
return True
else:
return False
else:
# 未找到合适的匹配图片,根据当前图片数量采用不同策略生成图片
current_image_count = self.get_article_image_count(article_id)
logger.info(f"文章 {article_id} 当前已有 {current_image_count} 张图片,采用相应生成策略")
images_to_generate = []
if current_image_count == 0:
# 0张图生成1张封面图(实拍图) + 2张详情图(AI生成图)
images_to_generate = ['封面图', '详情图', '详情图']
logger.info(f"文章 {article_id} 无图片将生成1张封面图(image_source=12)和2张详情图(image_source=13)")
elif current_image_count == 1:
# 1张图根据现有图片类型决定生成策略
# 查询现有图片的image_source
existing_image_sources = self.get_article_image_sources(article_id)
if 12 not in existing_image_sources:
# 缺少实拍图生成1张封面图
images_to_generate = ['封面图']
logger.info(f"文章 {article_id} 缺少实拍图将生成1张封面图(image_source=12)")
elif existing_image_sources.count(13) < 2:
# 缺少AI生成图生成详情图补充到2张
need_count = 2 - existing_image_sources.count(13)
images_to_generate = ['详情图'] * need_count
logger.info(f"文章 {article_id} 缺少AI生成图将生成{need_count}张详情图(image_source=13)")
else:
logger.info(f"文章 {article_id} 已满足图片要求,无需生成更多图片")
return True
else:
# 2张或以上检查是否满足要求
existing_image_sources = self.get_article_image_sources(article_id)
need_cover = 12 not in existing_image_sources
need_template = existing_image_sources.count(13) < 2
if need_cover or need_template:
if need_cover:
images_to_generate.append('封面图')
if need_template:
need_count = 2 - existing_image_sources.count(13)
images_to_generate.extend(['详情图'] * need_count)
logger.info(f"文章 {article_id} 需要补充图片: 实拍图={need_cover}, AI生成图={need_template}, 将生成{len(images_to_generate)}张图片")
else:
logger.info(f"文章 {article_id} 已满足图片要求,无需生成更多图片")
return True
# 生成相应数量和类型的图片
generated_count = 0
for image_type in images_to_generate:
# 构建针对不同类型图片的生成提示词
if image_type == '封面图':
prompt = f"为文章'{article_title}'生成封面图,要求:主题突出、视觉冲击力强、适合首页展示,标签: {', '.join(article_tags)}"
elif image_type == '详情图':
prompt = f"为文章'{article_title}'生成详情说明图,要求:内容相关、清晰易懂、辅助理解文章内容,标签: {', '.join(article_tags)}"
else: # 海报图
prompt = f"为文章'{article_title}'生成宣传海报图,要求:吸引眼球、信息明确、适合推广传播,标签: {', '.join(article_tags)}"
generated_image_url = self.generate_image_with_gemini(prompt, article_tags, article_id, image_type)
if generated_image_url:
generated_count += 1
logger.info(f"文章 {article_id} 成功生成{image_type}: {generated_image_url}")
self.log_to_database('INFO', f"文章生成{image_type}成功",
f"文章ID: {article_id}, 图片URL: {generated_image_url}, 类型: {image_type}")
else:
logger.error(f"文章 {article_id} 生成{image_type}失败")
self.log_to_database('ERROR', f"文章生成{image_type}失败", f"文章ID: {article_id}, 类型: {image_type}")
# 检查是否所有图片都生成成功
if generated_count == len(images_to_generate):
logger.info(f"文章 {article_id} 共成功生成 {generated_count} 张图片所有图片都已生成现在调用RPA接口")
# 所有图片都生成成功后才调用RPA接口
if self.call_rpa_review_api([article_id]):
logger.info(f"文章 {article_id} RPA审核接口调用成功")
return True
else:
logger.error(f"文章 {article_id} RPA审核接口调用失败")
return False
elif generated_count > 0:
logger.warning(f"文章 {article_id} 只成功生成 {generated_count}/{len(images_to_generate)} 张图片未达到要求不调用RPA接口")
return False
else:
logger.error(f"文章 {article_id} 生成图片全部失败")
return False
except Exception as e:
error_msg = f"匹配文章图片异常 - 文章ID: {article_id}, 错误: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
return False
def get_next_available_article(self, pending_articles: List[Dict]) -> Optional[Dict]:
"""线程安全地获取下一篇待处理文章"""
with self.processing_lock:
for article_data in pending_articles:
article_id = article_data['article_id']
if article_id not in self.processed_articles:
self.processed_articles.add(article_id)
return article_data
return None
def worker_process_articles(self, pending_articles: List[Dict],
available_images: Optional[List[Dict]], worker_id: int) -> int:
"""Worker线程处理文章匹配"""
processed_count = 0
thread_name = f"Worker-{worker_id}"
threading.current_thread().name = thread_name
logger.info(f"[{thread_name}] 启动,准备处理文章匹配")
while True:
# 线程安全地获取下一篇待处理文章
article_data = self.get_next_available_article(pending_articles)
if not article_data:
logger.info(f"[{thread_name}] 没有更多待处理文章,退出")
break
# 匹配文章与图片
if self.match_article_with_images(article_data):
processed_count += 1
logger.info(f"[{thread_name}] 成功处理文章: {article_data['article_id']}")
else:
logger.warning(f"[{thread_name}] 文章处理失败: {article_data['article_id']}")
logger.info(f"[{thread_name}] 完成,共处理 {processed_count} 篇文章")
return processed_count
def run_matching(self):
"""运行文章图片匹配流程"""
logger.info("开始文章图片智能匹配...")
self.log_to_database('INFO', '启动文章图片智能匹配服务', f'worker数量: {WORKER_COUNT}')
try:
# 获取需要匹配的文章
pending_articles = self.get_articles_with_tags()
if not pending_articles:
logger.info("没有需要匹配的文章")
return
logger.info(f"开始匹配 {len(pending_articles)} 篇文章")
self.log_to_database('INFO', '开始批量匹配',
f'文章数: {len(pending_articles)}')
# 清空已处理记录集合
with self.processing_lock:
self.processed_articles.clear()
# 使用线程池并行处理
with ThreadPoolExecutor(max_workers=WORKER_COUNT, thread_name_prefix="MatchWorker") as executor:
# 提交worker任务
future_to_worker = {}
for worker_id in range(1, WORKER_COUNT + 1):
future = executor.submit(
self.worker_process_articles,
pending_articles,
None,
worker_id
)
future_to_worker[future] = worker_id
# 等待所有worker完成
total_processed = 0
for future in as_completed(future_to_worker):
worker_id = future_to_worker[future]
try:
processed_count = future.result()
total_processed += processed_count
logger.info(f"Worker-{worker_id} 完成,处理了 {processed_count} 篇文章")
except Exception as e:
logger.error(f"Worker-{worker_id} 执行异常: {e}")
self.log_to_database('ERROR', f'Worker-{worker_id} 执行异常', str(e))
logger.info(f"匹配完成,共处理 {total_processed} 篇文章")
self.log_to_database('INFO', '匹配任务完成', f'共处理 {total_processed} 篇文章')
except Exception as e:
error_msg = f"匹配流程异常: {e}"
logger.error(error_msg)
self.log_to_database('ERROR', error_msg, traceback.format_exc())
def main():
"""主函数"""
matcher = ArticleImageMatcher()
try:
# 运行匹配
matcher.run_matching()
except Exception as e:
logger.error(f"程序运行异常: {e}")
matcher.log_to_database('ERROR', f'程序运行异常: {e}', traceback.format_exc())
if __name__ == "__main__":
main()