910 lines
35 KiB
Python
910 lines
35 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
文章与图片智能挂靠脚本
|
||
根据文章标签匹配ai_image_tags表中的图片,使用大模型进行处理,
|
||
如果挂靠失败或没有相同标签的图片,则使用Gemini生成图片
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import re
|
||
import requests
|
||
import csv
|
||
import pymysql
|
||
from typing import List, Dict, Tuple, Optional
|
||
from collections import defaultdict
|
||
from database_config import db_manager
|
||
from log_config import setup_logger
|
||
import time
|
||
import random
|
||
from requests.adapters import HTTPAdapter
|
||
from urllib3.util.retry import Retry
|
||
|
||
|
||
def get_articles_with_tags_from_db() -> List[Dict]:
|
||
"""
|
||
从数据库获取文章及其标签
|
||
|
||
Returns:
|
||
包含文章信息的字典列表
|
||
"""
|
||
# 设置日志记录器
|
||
logger = setup_logger('article_matching', 'logs/article_matching.log', 'logs/article_matching_error.log')
|
||
|
||
articles = []
|
||
|
||
try:
|
||
# 查询审核通过的文章,包含内容和标签
|
||
sql = """
|
||
SELECT id, title, content, coze_tag
|
||
FROM ai_articles
|
||
WHERE status = 'approved'
|
||
ORDER BY id
|
||
"""
|
||
|
||
logger.info("开始查询审核通过的文章数据...")
|
||
results = db_manager.execute_query(sql)
|
||
|
||
if not results:
|
||
logger.warning("没有找到状态为approved的文章")
|
||
print("没有找到状态为approved的文章")
|
||
return articles
|
||
|
||
logger.info(f"查询到 {len(results)} 条审核通过的文章")
|
||
print(f"查询到 {len(results)} 条审核通过的文章")
|
||
|
||
for row in results:
|
||
article_id, title, content, coze_tag = row
|
||
|
||
# 解析标签
|
||
tags = []
|
||
if coze_tag:
|
||
try:
|
||
# 尝试解析JSON格式的标签
|
||
tags_data = json.loads(coze_tag)
|
||
if isinstance(tags_data, list):
|
||
tags = tags_data
|
||
elif isinstance(tags_data, dict):
|
||
# 如果是字典格式,提取值
|
||
tags = list(tags_data.values()) if isinstance(list(tags_data.values())[0], list) else list(tags_data.values())
|
||
else:
|
||
# 如果是字符串,尝试按逗号分割
|
||
tags = [tag.strip() for tag in str(tags_data).split(',') if tag.strip()]
|
||
except json.JSONDecodeError:
|
||
# 如果不是JSON格式,按逗号分割
|
||
tags = [tag.strip() for tag in str(coze_tag).split(',') if tag.strip()]
|
||
|
||
articles.append({
|
||
'id': article_id,
|
||
'title': title,
|
||
'content': content,
|
||
'tags': tags
|
||
})
|
||
except Exception as e:
|
||
logger.error(f"从数据库获取文章数据时发生错误: {e}", exc_info=True)
|
||
print(f"从数据库获取文章数据时发生错误: {e}")
|
||
raise
|
||
|
||
return articles
|
||
|
||
|
||
def get_images_by_tags_from_db(tags: List[str] = [], used_counts: Dict[str, int] = {}) -> List[Dict]:
|
||
"""
|
||
从数据库根据标签获取图片
|
||
|
||
Args:
|
||
tags: 标签列表
|
||
used_counts: 已使用次数的字典,key为图片ID,value为使用次数
|
||
|
||
Returns:
|
||
包含图片信息的字典列表
|
||
"""
|
||
if not tags:
|
||
return []
|
||
|
||
# 设置日志记录器
|
||
logger = setup_logger('article_matching', 'logs/article_matching.log', 'logs/article_matching_error.log')
|
||
|
||
images = []
|
||
|
||
try:
|
||
# 查询符合条件的图像标签数据
|
||
sql = """
|
||
SELECT id, image_id, image_name, image_url, tag_name, keywords_name, department_name, image_attached_article_count
|
||
FROM ai_image_tags
|
||
WHERE image_attached_article_count < 5
|
||
ORDER BY id
|
||
"""
|
||
|
||
logger.info("开始查询符合条件的图像标签数据...")
|
||
results = db_manager.execute_query(sql)
|
||
|
||
if not results:
|
||
logger.warning("没有找到符合条件的图像标签数据 (image_attached_article_count < 5)")
|
||
print("没有找到符合条件的图像标签数据 (image_attached_article_count < 5)")
|
||
return images
|
||
|
||
logger.info(f"查询到 {len(results)} 条符合条件的图像标签数据")
|
||
print(f"查询到 {len(results)} 条符合条件的图像标签数据")
|
||
|
||
for row in results:
|
||
(
|
||
image_id, db_image_id, image_name, image_url, tag_name,
|
||
keywords_name, department_name, base_count
|
||
) = row
|
||
|
||
# 检查图片的附加文章数量是否小于5,考虑已使用次数
|
||
used_count = used_counts.get(str(image_id), 0)
|
||
total_count = base_count + used_count
|
||
|
||
if total_count >= 5:
|
||
continue
|
||
|
||
# 检查标签是否匹配
|
||
if any(tag.lower() in tag_name.lower() for tag in tags):
|
||
images.append({
|
||
'id': str(image_id),
|
||
'image_id': db_image_id,
|
||
'image_name': image_name,
|
||
'image_url': image_url,
|
||
'tag_name': tag_name,
|
||
'keywords_name': keywords_name,
|
||
'department_name': department_name,
|
||
'base_count': base_count
|
||
})
|
||
except Exception as e:
|
||
logger.error(f"从数据库获取图片数据时发生错误: {e}", exc_info=True)
|
||
print(f"从数据库获取图片数据时发生错误: {e}")
|
||
raise
|
||
|
||
print(f"从数据库找到 {len(images)} 张符合条件的匹配图片")
|
||
return images
|
||
|
||
|
||
def call_qwen_model(article: Dict, image_urls: List[str]) -> bool:
|
||
"""
|
||
调用通义千问大模型进行文章与图片挂靠评估
|
||
|
||
Args:
|
||
article: 文章信息
|
||
image_urls: 图片URL列表
|
||
|
||
Returns:
|
||
挂靠是否成功
|
||
"""
|
||
# 通义千问API配置
|
||
api_key = "sk-e6a38204022a4b538b8954f0584712af"
|
||
api_url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"
|
||
|
||
# 构建请求内容
|
||
content = f"""
|
||
请评估以下文章与图片的匹配度:
|
||
|
||
文章标题: {article['title']}
|
||
文章内容: {article['content'][:500]}... # 限制内容长度
|
||
|
||
图片URLs: {', '.join(image_urls)}
|
||
|
||
请判断这些图片是否适合用于这篇文章。如果匹配度高,请回复"匹配成功";如果匹配度低,请回复"匹配失败"。
|
||
"""
|
||
|
||
headers = {
|
||
'Authorization': f'Bearer {api_key}',
|
||
'Content-Type': 'application/json'
|
||
}
|
||
|
||
payload = {
|
||
"model": "qwen-max", # 或其他合适的模型
|
||
"input": {
|
||
"messages": [
|
||
{
|
||
"role": "user",
|
||
"content": content
|
||
}
|
||
]
|
||
},
|
||
"parameters": {
|
||
"temperature": 0.7
|
||
}
|
||
}
|
||
|
||
try:
|
||
response = requests.post(api_url, headers=headers, json=payload)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
# 解析响应,判断匹配结果
|
||
if 'output' in result and 'text' in result['output']:
|
||
response_text = result['output']['text'].lower()
|
||
# 根据响应内容判断是否匹配
|
||
if '匹配成功' in response_text or '是的' in response_text or '合适' in response_text:
|
||
print(f"通义千问评估结果: 匹配成功 - 文章 '{article['title']}'")
|
||
return True
|
||
else:
|
||
print(f"通义千问评估结果: 匹配失败 - 文章 '{article['title']}'")
|
||
return False
|
||
else:
|
||
print(f"通义千问API响应格式异常: {result}")
|
||
return False
|
||
else:
|
||
print(f"通义千问API调用失败: {response.status_code} - {response.text}")
|
||
# API调用失败时,仍然尝试匹配,这里返回False触发图片生成
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"调用通义千问API时发生错误: {e}")
|
||
# 发生错误时,返回False以触发图片生成
|
||
return False
|
||
|
||
|
||
def insert_generated_image_to_db(image_name: str, image_url: str, article_tags: List[str]) -> Optional[Dict]:
|
||
"""
|
||
将Gemini生成的图片信息插入数据库
|
||
|
||
Args:
|
||
image_name: 图片文件名,如 "1755310671174988.png"
|
||
image_url: 图片URL路径,如 "20250816/1755310671174988.png"
|
||
article_tags: 文章标签列表,用于查询department和keywords
|
||
|
||
Returns:
|
||
包含插入信息的字典:{
|
||
'tag_image_id': tag_image_id,
|
||
'image_id': image_id,
|
||
'image_url': image_url,
|
||
'image_thumb_url': image_thumb_url,
|
||
'keywords_id': keywords_id,
|
||
'keywords_name': keywords_name,
|
||
'department_id': department_id,
|
||
'department_name': department_name
|
||
}
|
||
"""
|
||
connection = db_manager.get_connection()
|
||
if connection is None:
|
||
print("无法连接到数据库")
|
||
return None
|
||
|
||
try:
|
||
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
|
||
# 1. 根据文章标签查询ai_image_tags表,获取department和keywords信息
|
||
if article_tags:
|
||
# 使用第一个标签查询
|
||
query = """
|
||
SELECT department_name, keywords_name, department_id, keywords_id, tag_id
|
||
FROM ai_image_tags
|
||
WHERE tag_name = %s
|
||
LIMIT 1
|
||
"""
|
||
cursor.execute(query, (article_tags[0],))
|
||
tag_info = cursor.fetchone()
|
||
|
||
if tag_info:
|
||
department = tag_info['department_name']
|
||
keywords = tag_info['keywords_name']
|
||
department_id = tag_info['department_id']
|
||
keywords_id = tag_info['keywords_id']
|
||
tag_id = tag_info['tag_id']
|
||
tag_name = article_tags[0]
|
||
else:
|
||
# 如果没有找到,使用默认值
|
||
department = "AI生成"
|
||
keywords = "AI图片"
|
||
department_id = 1
|
||
keywords_id = 1
|
||
tag_id = 1
|
||
tag_name = article_tags[0] if article_tags else "AI生成"
|
||
else:
|
||
# 没有标签,使用默认值
|
||
department = "AI生成"
|
||
keywords = "AI图片"
|
||
department_id = 1
|
||
keywords_id = 1
|
||
tag_id = 1
|
||
tag_name = "AI生成"
|
||
|
||
# 2. 插入ai_images表
|
||
insert_image_query = """
|
||
INSERT INTO ai_images
|
||
(image_name, image_url, image_thumb_url, department, keywords, image_type, upload_user_id, status)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
||
"""
|
||
cursor.execute(insert_image_query, (
|
||
image_name,
|
||
image_url,
|
||
'', # image_thumb_url
|
||
department,
|
||
keywords,
|
||
'medical', # image_type
|
||
1, # upload_user_id(默认用户ID)
|
||
'active' # status
|
||
))
|
||
image_id = cursor.lastrowid
|
||
print(f"图片信息已插入ai_images表,image_id: {image_id}")
|
||
|
||
# 3. 插入ai_image_tags表
|
||
insert_tag_query = """
|
||
INSERT INTO ai_image_tags
|
||
(image_id, image_name, image_url, image_thumb_url, tag_id, tag_name,
|
||
keywords_id, keywords_name, department_id, department_name,
|
||
image_source, created_user_id, image_attached_article_count)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
"""
|
||
cursor.execute(insert_tag_query, (
|
||
image_id,
|
||
image_name,
|
||
image_url,
|
||
'', # image_thumb_url
|
||
tag_id,
|
||
tag_name,
|
||
keywords_id,
|
||
keywords,
|
||
department_id,
|
||
department,
|
||
3, # image_source: 3表示AI生成
|
||
1, # created_user_id
|
||
0 # image_attached_article_count
|
||
))
|
||
tag_image_id = cursor.lastrowid
|
||
print(f"图片标签信息已插入ai_image_tags表,tag_image_id: {tag_image_id}")
|
||
|
||
# 提交事务
|
||
connection.commit()
|
||
|
||
# 返回包含所有需要信息的字典
|
||
return {
|
||
'tag_image_id': tag_image_id,
|
||
'image_id': image_id,
|
||
'image_url': image_url,
|
||
'image_thumb_url': '',
|
||
'keywords_id': keywords_id,
|
||
'keywords_name': keywords,
|
||
'department_id': department_id,
|
||
'department_name': department
|
||
}
|
||
|
||
except Exception as e:
|
||
print(f"插入图片信息到数据库失败: {e}")
|
||
connection.rollback()
|
||
return None
|
||
finally:
|
||
connection.close()
|
||
|
||
|
||
def insert_article_image_relation(article_id: int, image_id: int, image_url: str, image_thumb_url: str,
|
||
tag_image_id: int, keywords_id: int, keywords_name: str,
|
||
department_id: int, department_name: str, image_source: int = 0) -> Optional[int]:
|
||
"""
|
||
将文章与图片的关联信息插入ai_article_images表
|
||
|
||
Args:
|
||
article_id: 文章ID
|
||
image_id: 图片ID(ai_images表的id)
|
||
image_url: 图片URL
|
||
image_thumb_url: 缩略图URL
|
||
tag_image_id: 图片标签ID(ai_image_tags表的id)
|
||
keywords_id: 关键词ID
|
||
keywords_name: 关键词名称
|
||
department_id: 部门ID
|
||
department_name: 部门名称
|
||
image_source: 图片来源(0表示默认)
|
||
|
||
Returns:
|
||
插入的ai_article_images表的ID
|
||
"""
|
||
connection = db_manager.get_connection()
|
||
if connection is None:
|
||
print("无法连接到数据库")
|
||
return None
|
||
|
||
try:
|
||
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
|
||
# 1. 查询当前文章下已有图片的最大sort_order
|
||
query_max_sort = """
|
||
SELECT COALESCE(MAX(sort_order), 0) as max_sort_order
|
||
FROM ai_article_images
|
||
WHERE article_id = %s
|
||
"""
|
||
cursor.execute(query_max_sort, (article_id,))
|
||
result = cursor.fetchone()
|
||
max_sort_order = result['max_sort_order'] if result else 0
|
||
new_sort_order = max_sort_order + 1
|
||
|
||
print(f"文章 {article_id} 当前最大sort_order: {max_sort_order}, 新图片sort_order: {new_sort_order}")
|
||
|
||
# 2. 插入ai_article_images表
|
||
insert_query = """
|
||
INSERT INTO ai_article_images
|
||
(article_id, image_id, image_url, image_thumb_url, image_tag_id, sort_order,
|
||
keywords_id, keywords_name, department_id, department_name, image_source)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||
"""
|
||
cursor.execute(insert_query, (
|
||
article_id,
|
||
image_id,
|
||
image_url,
|
||
image_thumb_url,
|
||
tag_image_id,
|
||
new_sort_order,
|
||
keywords_id,
|
||
keywords_name,
|
||
department_id,
|
||
department_name,
|
||
image_source
|
||
))
|
||
article_image_id = cursor.lastrowid
|
||
print(f"文章图片关联信息已插入ai_article_images表,id: {article_image_id}")
|
||
|
||
# 提交事务
|
||
connection.commit()
|
||
|
||
return article_image_id
|
||
|
||
except Exception as e:
|
||
print(f"插入文章图片关联信息失败: {e}")
|
||
connection.rollback()
|
||
return None
|
||
finally:
|
||
connection.close()
|
||
|
||
|
||
def generate_image_with_gemini(prompt: str, article_tags: List[str], article_id: int) -> str:
|
||
"""
|
||
使用Gemini生成图片并上传到服务器
|
||
|
||
Args:
|
||
prompt: 图片生成提示词
|
||
article_tags: 文章标签列表,用于查询department和keywords
|
||
article_id: 文章ID,用于关联图片
|
||
|
||
Returns:
|
||
上传后的图片URL
|
||
"""
|
||
# 导入必要的库
|
||
try:
|
||
from google import genai
|
||
from google.genai import types
|
||
from google.genai.client import HttpOptions
|
||
|
||
except ImportError:
|
||
print("错误:未安装google-genai库,请运行 'pip install google-genai' 进行安装")
|
||
raise
|
||
|
||
client = genai.Client(http_options=HttpOptions(base_url="https://work.poloapi.com"),
|
||
api_key="sk-V4tPnDgzFPa7nxWrvKnNJsW8ZcBXXPuGmjfgvPVRnwpHoeob")
|
||
|
||
print(f"正在调用Gemini API生成图片,提示词: {prompt[:50]}...")
|
||
|
||
# 生成内容
|
||
response = client.models.generate_content(
|
||
model="gemini-3-pro-image-preview",
|
||
contents=[prompt],
|
||
)
|
||
|
||
# 检查是否有候选答案
|
||
if not response.candidates:
|
||
raise Exception("Gemini API未返回任何候选答案")
|
||
|
||
# 处理响应 - 遍历第一个候选答案的内容部分
|
||
candidate = response.candidates[0]
|
||
if not candidate.content or not candidate.content.parts:
|
||
raise Exception("Gemini API返回的候选答案中没有内容部分")
|
||
|
||
for part in candidate.content.parts:
|
||
if hasattr(part, 'text') and part.text is not None:
|
||
print(f"Gemini响应文本: {part.text}")
|
||
elif hasattr(part, 'inline_data') and part.inline_data is not None:
|
||
image_data = part.inline_data
|
||
if image_data.data is not None:
|
||
# 生成唯一的文件名(基于时间戳)
|
||
import time
|
||
import os
|
||
from datetime import datetime
|
||
|
||
timestamp_ms = int(time.time() * 1000) # 毫秒级时间戳
|
||
image_filename = f"{timestamp_ms}.png"
|
||
today_date = datetime.now().strftime("%Y%m%d")
|
||
image_url_path = f"{today_date}/{image_filename}"
|
||
|
||
temp_filename = f"temp_generated_image_{timestamp_ms}.png"
|
||
# 保存图片数据到临时文件
|
||
with open(temp_filename, 'wb') as f:
|
||
f.write(image_data.data)
|
||
print(f"Gemini生成图片成功: {temp_filename}")
|
||
|
||
# 先将图片信息插入数据库,获取相关信息
|
||
image_info = insert_generated_image_to_db(image_filename, image_url_path, article_tags)
|
||
|
||
if not image_info:
|
||
raise Exception("插入图片信息到数据库失败")
|
||
|
||
print(f"图片信息已插入数据库,tag_image_id: {image_info['tag_image_id']}, image_id: {image_info['image_id']}")
|
||
|
||
# 使用tag_image_id上传图片到服务器
|
||
uploaded_url = upload_image_to_server(temp_filename, image_info['tag_image_id'])
|
||
|
||
# 将文章与图片的关联信息插入ai_article_images表
|
||
article_image_id = insert_article_image_relation(
|
||
article_id=article_id,
|
||
image_id=image_info['image_id'],
|
||
image_url=image_info['image_url'],
|
||
image_thumb_url=image_info['image_thumb_url'],
|
||
tag_image_id=image_info['tag_image_id'],
|
||
keywords_id=image_info['keywords_id'],
|
||
keywords_name=image_info['keywords_name'],
|
||
department_id=image_info['department_id'],
|
||
department_name=image_info['department_name'],
|
||
image_source=0 # 默认值
|
||
)
|
||
|
||
if article_image_id:
|
||
print(f"文章图片关联信息已创建,ai_article_images.id: {article_image_id}")
|
||
|
||
# 删除临时文件
|
||
os.remove(temp_filename)
|
||
|
||
print(f"图片已上传到服务器: {uploaded_url}")
|
||
# 返回上传后的图片URL
|
||
return uploaded_url
|
||
|
||
# 如果没有返回图片数据,抛出异常
|
||
raise Exception("Gemini API未返回有效的图片数据")
|
||
|
||
|
||
def upload_image_to_server(image_path: str, tag_image_id: int) -> str:
|
||
"""
|
||
上传图片到服务器
|
||
|
||
Args:
|
||
image_path: 本地图片路径
|
||
tag_image_id: 图片标签ID
|
||
|
||
Returns:
|
||
服务器上的图片URL
|
||
"""
|
||
import requests
|
||
import json
|
||
|
||
# 登录获取JWT token
|
||
base_url = "http://47.99.184.230:8324" # 使用外网API地址
|
||
jwt_token = login_and_get_jwt_token(base_url)
|
||
|
||
if not jwt_token:
|
||
raise Exception("获取JWT token失败,无法上传图片")
|
||
|
||
# 准备上传请求
|
||
upload_url = f"{base_url}/api/images/upload"
|
||
headers = {
|
||
'Authorization': f'Bearer {jwt_token}',
|
||
}
|
||
|
||
# 读取图片文件
|
||
with open(image_path, 'rb') as image_file:
|
||
files = {'file': image_file}
|
||
data = {'tag_image_id': tag_image_id} # 添加必传参数
|
||
|
||
response = requests.post(upload_url, headers=headers, files=files, data=data)
|
||
|
||
print(f"图片上传响应状态码: {response.status_code}")
|
||
print(f"图片上传响应内容: {response.text}")
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get('code') == 200:
|
||
# 返回服务器上的图片URL
|
||
return result['data']['http_image_url']
|
||
else:
|
||
raise Exception(f"图片上传失败: {result.get('message', '未知错误')}")
|
||
else:
|
||
raise Exception(f"图片上传请求失败,状态码: {response.status_code}, 响应: {response.text}")
|
||
|
||
|
||
def login_and_get_jwt_token(base_url: str) -> Optional[str]:
|
||
"""
|
||
登录获取JWT token
|
||
"""
|
||
login_url = f"{base_url}/api/auth/login"
|
||
login_data = {
|
||
"username": "user010", # 使用固定的账号
|
||
"password": "@5^2W6R7"
|
||
}
|
||
|
||
print(f"尝试登录: {login_data['username']}")
|
||
print(f"登录URL: {login_url}")
|
||
|
||
try:
|
||
response = requests.post(login_url, json=login_data, headers={'Content-Type': 'application/json'})
|
||
print(f"响应状态码: {response.status_code}")
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
if result.get('code') == 200:
|
||
jwt_token = result['data']['token']
|
||
print("JWT token获取成功")
|
||
return jwt_token
|
||
else:
|
||
print(f"登录失败: {result.get('message', '未知错误')}")
|
||
return None
|
||
else:
|
||
print(f"登录请求失败: {response.status_code}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
print(f"登录异常: {e}")
|
||
return None
|
||
|
||
|
||
def batch_publish_articles(base_url: str, jwt_token: str, article_ids: List[int]) -> bool:
|
||
"""
|
||
批量提交文章到/api/articles/batch-publish-auto接口
|
||
"""
|
||
try:
|
||
print(f"开始批量提交 {len(article_ids)} 篇文章到batch-publish-auto接口")
|
||
|
||
# 构建批量发布数据
|
||
publish_data = {
|
||
"article_ids": article_ids
|
||
}
|
||
|
||
print(f"准备批量提交的数据: {json.dumps(publish_data, ensure_ascii=False)}")
|
||
|
||
# 发送请求
|
||
upload_url = f"{base_url}/api/articles/batch-publish-auto"
|
||
headers = {
|
||
'Authorization': f'Bearer {jwt_token}',
|
||
'Content-Type': 'application/json',
|
||
'Accept': 'application/json'
|
||
}
|
||
|
||
response = requests.post(upload_url, json=publish_data, headers=headers)
|
||
|
||
print(f"批量提交响应状态码: {response.status_code}")
|
||
|
||
if response.status_code == 200:
|
||
try:
|
||
result = response.json()
|
||
print(f"批量提交响应内容: {result}")
|
||
|
||
# 根据接口实际返回格式判断成功
|
||
if result.get('code') == 200:
|
||
data = result.get('data', {})
|
||
published_count = data.get('published_count', 0)
|
||
failed_count = data.get('failed_count', 0)
|
||
|
||
success_msg = f"批量提交成功,发布: {published_count}篇,失败: {failed_count}篇"
|
||
print(success_msg)
|
||
return True
|
||
else:
|
||
print(f"批量提交失败: {result.get('message', '未知错误')}")
|
||
return False
|
||
except json.JSONDecodeError as e:
|
||
print(f"解析批量提交响应失败: {e}")
|
||
return False
|
||
elif response.status_code == 401:
|
||
# Token过期
|
||
print("收到401错误,JWT token可能已过期")
|
||
return False
|
||
else:
|
||
print(f"批量提交请求失败,状态码: {response.status_code}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"批量提交异常: {e}")
|
||
return False
|
||
|
||
|
||
def process_single_article(article, used_image_counts, match_results):
|
||
"""
|
||
处理单个文章与图片的匹配和挂靠
|
||
|
||
Args:
|
||
article: 单个文章数据
|
||
used_image_counts: 图片使用计数
|
||
match_results: 匹配结果列表
|
||
|
||
Returns:
|
||
是否处理成功
|
||
"""
|
||
print(f"\n处理文章: {article['title']} (ID: {article['id']})")
|
||
|
||
# 根据文章标签获取匹配的图片(考虑已使用次数)
|
||
matched_images = get_images_by_tags_from_db(article['tags'], used_image_counts)
|
||
|
||
if matched_images:
|
||
print(f"找到 {len(matched_images)} 张符合条件的匹配图片")
|
||
|
||
# 按基础使用次数排序,优先使用基础计数较低的图片
|
||
matched_images.sort(key=lambda x: x['base_count'])
|
||
|
||
matched = False
|
||
for img in matched_images:
|
||
# 提取图片URL并添加前缀
|
||
image_url = "http://images11.bxmkb.cn/Images/" + img['image_url']
|
||
|
||
if image_url: # 确保图片URL存在
|
||
# 调用通义千问大模型进行挂靠评估
|
||
match_success = call_qwen_model(article, [image_url])
|
||
|
||
if match_success:
|
||
print(f"文章与图片挂靠成功: {article['title']}")
|
||
|
||
# 更新图片使用次数
|
||
used_image_counts[img['id']] += 1
|
||
|
||
# 记录匹配结果
|
||
match_results.append({
|
||
'文章ID': article['id'],
|
||
'文章标题': article['title'],
|
||
'文章内容': article['content'][:100] + '...' if len(article['content']) > 100 else article['content'], # 限制内容长度
|
||
'标签': ', '.join(article['tags']),
|
||
'匹配的图片URL': image_url,
|
||
'图片ID': img['id'],
|
||
'图片名称': img['image_name'],
|
||
'图片标签': img['tag_name'],
|
||
'图片关键词': img['keywords_name'],
|
||
'图片部门': img['department_name'],
|
||
'匹配状态': '成功'
|
||
})
|
||
|
||
return True
|
||
|
||
if not matched:
|
||
print(f"文章未能与任何图片成功匹配,使用Gemini生成图片: {article['title']}")
|
||
|
||
# 使用文章标题和标签生成提示词
|
||
prompt = f"与'{article['title']}'相关的插图,标签: {', '.join(article['tags'])}"
|
||
generated_image_url = generate_image_with_gemini(prompt, article['tags'], article['id'])
|
||
print(f"生成的图片URL: {generated_image_url}")
|
||
|
||
# 记录生成图片的结果
|
||
match_results.append({
|
||
'文章ID': article['id'],
|
||
'文章标题': article['title'],
|
||
'文章内容': article['content'][:100] + '...' if len(article['content']) > 100 else article['content'],
|
||
'标签': ', '.join(article['tags']),
|
||
'匹配的图片URL': generated_image_url,
|
||
'图片ID': 'N/A',
|
||
'图片名称': 'Generated',
|
||
'图片标签': 'N/A',
|
||
'图片关键词': 'N/A',
|
||
'图片部门': 'N/A',
|
||
'匹配状态': '生成图片'
|
||
})
|
||
|
||
return True
|
||
else:
|
||
print(f"没有找到符合条件的匹配图片,使用Gemini生成图片: {article['title']}")
|
||
|
||
# 使用文章标题和标签生成提示词
|
||
prompt = f"与'{article['title']}'相关的插图,标签: {', '.join(article['tags'])}"
|
||
generated_image_url = generate_image_with_gemini(prompt, article['tags'], article['id'])
|
||
print(f"生成的图片URL: {generated_image_url}")
|
||
|
||
# 记录生成图片的结果
|
||
match_results.append({
|
||
'文章ID': article['id'],
|
||
'文章标题': article['title'],
|
||
'文章内容': article['content'][:100] + '...' if len(article['content']) > 100 else article['content'],
|
||
'标签': ', '.join(article['tags']),
|
||
'匹配的图片URL': generated_image_url,
|
||
'图片ID': 'N/A',
|
||
'图片名称': 'Generated',
|
||
'图片标签': 'N/A',
|
||
'图片关键词': 'N/A',
|
||
'图片部门': 'N/A',
|
||
'匹配状态': '生成图片'
|
||
})
|
||
|
||
return True
|
||
|
||
|
||
def process_article_image_matching(test_mode=False, test_count=None):
|
||
"""
|
||
处理文章与图片的匹配和挂靠
|
||
|
||
Args:
|
||
test_mode: 是否为测试模式
|
||
test_count: 测试文章数量(仅在测试模式下使用)
|
||
"""
|
||
# 用于跟踪每张图片的使用次数
|
||
used_image_counts = defaultdict(int)
|
||
# 存储匹配结果
|
||
match_results = []
|
||
|
||
try:
|
||
# 根据模式决定获取哪些文章
|
||
articles = get_articles_with_tags_from_db()
|
||
|
||
if not articles:
|
||
print("没有找到文章")
|
||
return
|
||
|
||
# 如果是测试模式,只取前test_count条数据
|
||
if test_mode:
|
||
if test_count is None:
|
||
test_count = 3 # 默认测试前3条
|
||
articles = articles[:test_count]
|
||
print(f"测试模式:处理前 {len(articles)} 篇文章")
|
||
|
||
success_count = 0
|
||
generated_count = 0
|
||
|
||
# 收集所有处理后的文章ID用于发布
|
||
processed_article_ids = []
|
||
|
||
for article in articles:
|
||
if process_single_article(article, used_image_counts, match_results):
|
||
success_count += 1
|
||
processed_article_ids.append(article['id'])
|
||
else:
|
||
print(f"处理文章 {article['id']} 失败")
|
||
|
||
# 将匹配结果写入CSV文件
|
||
output_csv = 'article_image_match_results.csv'
|
||
with open(output_csv, 'w', newline='', encoding='utf-8-sig') as csvfile:
|
||
fieldnames = [
|
||
'文章ID', '文章标题', '文章内容', '标签',
|
||
'匹配的图片URL', '图片ID', '图片名称',
|
||
'图片标签', '图片关键词', '图片部门', '匹配状态'
|
||
]
|
||
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
||
|
||
writer.writeheader()
|
||
for result in match_results:
|
||
writer.writerow(result)
|
||
|
||
if not test_mode:
|
||
print(f"\n处理完成! 成功挂靠: {success_count} 篇, 生成图片: {generated_count} 篇")
|
||
print(f"匹配结果已保存至: {output_csv}")
|
||
|
||
# 如果有处理过的文章,将它们提交到发布接口
|
||
if processed_article_ids:
|
||
print(f"\n开始发布处理过的 {len(processed_article_ids)} 篇文章...")
|
||
|
||
# 登录获取JWT token
|
||
base_url = "http://47.99.184.230:8324" # 使用外网API地址
|
||
jwt_token = login_and_get_jwt_token(base_url)
|
||
|
||
if jwt_token:
|
||
# 批量发布文章
|
||
if batch_publish_articles(base_url, jwt_token, processed_article_ids):
|
||
print(f"成功发布 {len(processed_article_ids)} 篇文章")
|
||
else:
|
||
print("批量发布失败")
|
||
else:
|
||
print("获取JWT token失败,无法发布文章")
|
||
else:
|
||
print("\n没有处理过的文章,跳过发布步骤")
|
||
else:
|
||
print(f"\n测试模式完成! 处理了 {len(articles)} 篇文章,成功挂靠: {success_count} 篇, 生成图片: {len([r for r in match_results if r['匹配状态'] == '生成图片'])} 篇")
|
||
print(f"处理结果已保存至: {output_csv}")
|
||
|
||
except Exception as e:
|
||
print(f"处理文章图片匹配时发生错误: {e}")
|
||
raise
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import sys
|
||
|
||
print("开始处理文章与图片的智能挂靠...")
|
||
|
||
# 检查命令行参数
|
||
if len(sys.argv) > 1:
|
||
if sys.argv[1] == "--test" and len(sys.argv) > 2:
|
||
# 测试模式:处理前N篇文章
|
||
test_count = int(sys.argv[2])
|
||
print(f"启动测试模式,处理前 {test_count} 篇文章")
|
||
process_article_image_matching(test_mode=True, test_count=test_count)
|
||
elif sys.argv[1] == "--test" and len(sys.argv) == 2:
|
||
# 提示用户输入要测试的文章数量
|
||
test_count_input = input("请输入要测试的文章数量 (默认3): ")
|
||
test_count = int(test_count_input) if test_count_input.strip().isdigit() else 3
|
||
print(f"启动测试模式,处理前 {test_count} 篇文章")
|
||
process_article_image_matching(test_mode=True, test_count=test_count)
|
||
else:
|
||
print("使用方法:")
|
||
print(" 正常模式: python match_article_images.py")
|
||
print(" 测试模式: python match_article_images.py --test [文章ID]")
|
||
else:
|
||
# 正常模式:处理所有文章
|
||
process_article_image_matching() |