Files
baijiahao_text_to_image/image_matching.py
shengyudong 97dcff8c8b feat: 添加封面图压字花功能和启动脚本
- 新增封面图本地化压字花处理(深褐色文字+白色描边,居中显示)
- 支持Linux/Windows跨平台字体加载
- 新增启动脚本 start_article_auto_image_matching.sh
- 优化图片生成策略(0张图/1张图/多张图不同处理)
- 绕过网络接口IncompleteRead问题,本地化处理更稳定
- 更新README文档,完善使用说明
2026-02-05 20:25:23 +08:00

910 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
文章与图片智能挂靠脚本
根据文章标签匹配ai_image_tags表中的图片使用大模型进行处理
如果挂靠失败或没有相同标签的图片则使用Gemini生成图片
"""
import json
import os
import re
import requests
import csv
import pymysql
from typing import List, Dict, Tuple, Optional
from collections import defaultdict
from database_config import db_manager
from log_config import setup_logger
import time
import random
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def get_articles_with_tags_from_db() -> List[Dict]:
"""
从数据库获取文章及其标签
Returns:
包含文章信息的字典列表
"""
# 设置日志记录器
logger = setup_logger('article_matching', 'logs/article_matching.log', 'logs/article_matching_error.log')
articles = []
try:
# 查询审核通过的文章,包含内容和标签
sql = """
SELECT id, title, content, coze_tag
FROM ai_articles
WHERE status = 'approved'
ORDER BY id
"""
logger.info("开始查询审核通过的文章数据...")
results = db_manager.execute_query(sql)
if not results:
logger.warning("没有找到状态为approved的文章")
print("没有找到状态为approved的文章")
return articles
logger.info(f"查询到 {len(results)} 条审核通过的文章")
print(f"查询到 {len(results)} 条审核通过的文章")
for row in results:
article_id, title, content, coze_tag = row
# 解析标签
tags = []
if coze_tag:
try:
# 尝试解析JSON格式的标签
tags_data = json.loads(coze_tag)
if isinstance(tags_data, list):
tags = tags_data
elif isinstance(tags_data, dict):
# 如果是字典格式,提取值
tags = list(tags_data.values()) if isinstance(list(tags_data.values())[0], list) else list(tags_data.values())
else:
# 如果是字符串,尝试按逗号分割
tags = [tag.strip() for tag in str(tags_data).split(',') if tag.strip()]
except json.JSONDecodeError:
# 如果不是JSON格式按逗号分割
tags = [tag.strip() for tag in str(coze_tag).split(',') if tag.strip()]
articles.append({
'id': article_id,
'title': title,
'content': content,
'tags': tags
})
except Exception as e:
logger.error(f"从数据库获取文章数据时发生错误: {e}", exc_info=True)
print(f"从数据库获取文章数据时发生错误: {e}")
raise
return articles
def get_images_by_tags_from_db(tags: List[str] = [], used_counts: Dict[str, int] = {}) -> List[Dict]:
"""
从数据库根据标签获取图片
Args:
tags: 标签列表
used_counts: 已使用次数的字典key为图片IDvalue为使用次数
Returns:
包含图片信息的字典列表
"""
if not tags:
return []
# 设置日志记录器
logger = setup_logger('article_matching', 'logs/article_matching.log', 'logs/article_matching_error.log')
images = []
try:
# 查询符合条件的图像标签数据
sql = """
SELECT id, image_id, image_name, image_url, tag_name, keywords_name, department_name, image_attached_article_count
FROM ai_image_tags
WHERE image_attached_article_count < 5
ORDER BY id
"""
logger.info("开始查询符合条件的图像标签数据...")
results = db_manager.execute_query(sql)
if not results:
logger.warning("没有找到符合条件的图像标签数据 (image_attached_article_count < 5)")
print("没有找到符合条件的图像标签数据 (image_attached_article_count < 5)")
return images
logger.info(f"查询到 {len(results)} 条符合条件的图像标签数据")
print(f"查询到 {len(results)} 条符合条件的图像标签数据")
for row in results:
(
image_id, db_image_id, image_name, image_url, tag_name,
keywords_name, department_name, base_count
) = row
# 检查图片的附加文章数量是否小于5考虑已使用次数
used_count = used_counts.get(str(image_id), 0)
total_count = base_count + used_count
if total_count >= 5:
continue
# 检查标签是否匹配
if any(tag.lower() in tag_name.lower() for tag in tags):
images.append({
'id': str(image_id),
'image_id': db_image_id,
'image_name': image_name,
'image_url': image_url,
'tag_name': tag_name,
'keywords_name': keywords_name,
'department_name': department_name,
'base_count': base_count
})
except Exception as e:
logger.error(f"从数据库获取图片数据时发生错误: {e}", exc_info=True)
print(f"从数据库获取图片数据时发生错误: {e}")
raise
print(f"从数据库找到 {len(images)} 张符合条件的匹配图片")
return images
def call_qwen_model(article: Dict, image_urls: List[str]) -> bool:
"""
调用通义千问大模型进行文章与图片挂靠评估
Args:
article: 文章信息
image_urls: 图片URL列表
Returns:
挂靠是否成功
"""
# 通义千问API配置
api_key = "sk-e6a38204022a4b538b8954f0584712af"
api_url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation"
# 构建请求内容
content = f"""
请评估以下文章与图片的匹配度:
文章标题: {article['title']}
文章内容: {article['content'][:500]}... # 限制内容长度
图片URLs: {', '.join(image_urls)}
请判断这些图片是否适合用于这篇文章。如果匹配度高,请回复"匹配成功";如果匹配度低,请回复"匹配失败"
"""
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
payload = {
"model": "qwen-max", # 或其他合适的模型
"input": {
"messages": [
{
"role": "user",
"content": content
}
]
},
"parameters": {
"temperature": 0.7
}
}
try:
response = requests.post(api_url, headers=headers, json=payload)
if response.status_code == 200:
result = response.json()
# 解析响应,判断匹配结果
if 'output' in result and 'text' in result['output']:
response_text = result['output']['text'].lower()
# 根据响应内容判断是否匹配
if '匹配成功' in response_text or '是的' in response_text or '合适' in response_text:
print(f"通义千问评估结果: 匹配成功 - 文章 '{article['title']}'")
return True
else:
print(f"通义千问评估结果: 匹配失败 - 文章 '{article['title']}'")
return False
else:
print(f"通义千问API响应格式异常: {result}")
return False
else:
print(f"通义千问API调用失败: {response.status_code} - {response.text}")
# API调用失败时仍然尝试匹配这里返回False触发图片生成
return False
except Exception as e:
print(f"调用通义千问API时发生错误: {e}")
# 发生错误时返回False以触发图片生成
return False
def insert_generated_image_to_db(image_name: str, image_url: str, article_tags: List[str]) -> Optional[Dict]:
"""
将Gemini生成的图片信息插入数据库
Args:
image_name: 图片文件名,如 "1755310671174988.png"
image_url: 图片URL路径"20250816/1755310671174988.png"
article_tags: 文章标签列表用于查询department和keywords
Returns:
包含插入信息的字典:{
'tag_image_id': tag_image_id,
'image_id': image_id,
'image_url': image_url,
'image_thumb_url': image_thumb_url,
'keywords_id': keywords_id,
'keywords_name': keywords_name,
'department_id': department_id,
'department_name': department_name
}
"""
connection = db_manager.get_connection()
if connection is None:
print("无法连接到数据库")
return None
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
# 1. 根据文章标签查询ai_image_tags表获取department和keywords信息
if article_tags:
# 使用第一个标签查询
query = """
SELECT department_name, keywords_name, department_id, keywords_id, tag_id
FROM ai_image_tags
WHERE tag_name = %s
LIMIT 1
"""
cursor.execute(query, (article_tags[0],))
tag_info = cursor.fetchone()
if tag_info:
department = tag_info['department_name']
keywords = tag_info['keywords_name']
department_id = tag_info['department_id']
keywords_id = tag_info['keywords_id']
tag_id = tag_info['tag_id']
tag_name = article_tags[0]
else:
# 如果没有找到,使用默认值
department = "AI生成"
keywords = "AI图片"
department_id = 1
keywords_id = 1
tag_id = 1
tag_name = article_tags[0] if article_tags else "AI生成"
else:
# 没有标签,使用默认值
department = "AI生成"
keywords = "AI图片"
department_id = 1
keywords_id = 1
tag_id = 1
tag_name = "AI生成"
# 2. 插入ai_images表
insert_image_query = """
INSERT INTO ai_images
(image_name, image_url, image_thumb_url, department, keywords, image_type, upload_user_id, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_image_query, (
image_name,
image_url,
'', # image_thumb_url
department,
keywords,
'medical', # image_type
1, # upload_user_id默认用户ID
'active' # status
))
image_id = cursor.lastrowid
print(f"图片信息已插入ai_images表image_id: {image_id}")
# 3. 插入ai_image_tags表
insert_tag_query = """
INSERT INTO ai_image_tags
(image_id, image_name, image_url, image_thumb_url, tag_id, tag_name,
keywords_id, keywords_name, department_id, department_name,
image_source, created_user_id, image_attached_article_count)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_tag_query, (
image_id,
image_name,
image_url,
'', # image_thumb_url
tag_id,
tag_name,
keywords_id,
keywords,
department_id,
department,
3, # image_source: 3表示AI生成
1, # created_user_id
0 # image_attached_article_count
))
tag_image_id = cursor.lastrowid
print(f"图片标签信息已插入ai_image_tags表tag_image_id: {tag_image_id}")
# 提交事务
connection.commit()
# 返回包含所有需要信息的字典
return {
'tag_image_id': tag_image_id,
'image_id': image_id,
'image_url': image_url,
'image_thumb_url': '',
'keywords_id': keywords_id,
'keywords_name': keywords,
'department_id': department_id,
'department_name': department
}
except Exception as e:
print(f"插入图片信息到数据库失败: {e}")
connection.rollback()
return None
finally:
connection.close()
def insert_article_image_relation(article_id: int, image_id: int, image_url: str, image_thumb_url: str,
tag_image_id: int, keywords_id: int, keywords_name: str,
department_id: int, department_name: str, image_source: int = 0) -> Optional[int]:
"""
将文章与图片的关联信息插入ai_article_images表
Args:
article_id: 文章ID
image_id: 图片IDai_images表的id
image_url: 图片URL
image_thumb_url: 缩略图URL
tag_image_id: 图片标签IDai_image_tags表的id
keywords_id: 关键词ID
keywords_name: 关键词名称
department_id: 部门ID
department_name: 部门名称
image_source: 图片来源0表示默认
Returns:
插入的ai_article_images表的ID
"""
connection = db_manager.get_connection()
if connection is None:
print("无法连接到数据库")
return None
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
# 1. 查询当前文章下已有图片的最大sort_order
query_max_sort = """
SELECT COALESCE(MAX(sort_order), 0) as max_sort_order
FROM ai_article_images
WHERE article_id = %s
"""
cursor.execute(query_max_sort, (article_id,))
result = cursor.fetchone()
max_sort_order = result['max_sort_order'] if result else 0
new_sort_order = max_sort_order + 1
print(f"文章 {article_id} 当前最大sort_order: {max_sort_order}, 新图片sort_order: {new_sort_order}")
# 2. 插入ai_article_images表
insert_query = """
INSERT INTO ai_article_images
(article_id, image_id, image_url, image_thumb_url, image_tag_id, sort_order,
keywords_id, keywords_name, department_id, department_name, image_source)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_query, (
article_id,
image_id,
image_url,
image_thumb_url,
tag_image_id,
new_sort_order,
keywords_id,
keywords_name,
department_id,
department_name,
image_source
))
article_image_id = cursor.lastrowid
print(f"文章图片关联信息已插入ai_article_images表id: {article_image_id}")
# 提交事务
connection.commit()
return article_image_id
except Exception as e:
print(f"插入文章图片关联信息失败: {e}")
connection.rollback()
return None
finally:
connection.close()
def generate_image_with_gemini(prompt: str, article_tags: List[str], article_id: int) -> str:
"""
使用Gemini生成图片并上传到服务器
Args:
prompt: 图片生成提示词
article_tags: 文章标签列表用于查询department和keywords
article_id: 文章ID用于关联图片
Returns:
上传后的图片URL
"""
# 导入必要的库
try:
from google import genai
from google.genai import types
from google.genai.client import HttpOptions
except ImportError:
print("错误未安装google-genai库请运行 'pip install google-genai' 进行安装")
raise
client = genai.Client(http_options=HttpOptions(base_url="https://work.poloapi.com"),
api_key="sk-V4tPnDgzFPa7nxWrvKnNJsW8ZcBXXPuGmjfgvPVRnwpHoeob")
print(f"正在调用Gemini API生成图片提示词: {prompt[:50]}...")
# 生成内容
response = client.models.generate_content(
model="gemini-3-pro-image-preview",
contents=[prompt],
)
# 检查是否有候选答案
if not response.candidates:
raise Exception("Gemini API未返回任何候选答案")
# 处理响应 - 遍历第一个候选答案的内容部分
candidate = response.candidates[0]
if not candidate.content or not candidate.content.parts:
raise Exception("Gemini API返回的候选答案中没有内容部分")
for part in candidate.content.parts:
if hasattr(part, 'text') and part.text is not None:
print(f"Gemini响应文本: {part.text}")
elif hasattr(part, 'inline_data') and part.inline_data is not None:
image_data = part.inline_data
if image_data.data is not None:
# 生成唯一的文件名(基于时间戳)
import time
import os
from datetime import datetime
timestamp_ms = int(time.time() * 1000) # 毫秒级时间戳
image_filename = f"{timestamp_ms}.png"
today_date = datetime.now().strftime("%Y%m%d")
image_url_path = f"{today_date}/{image_filename}"
temp_filename = f"temp_generated_image_{timestamp_ms}.png"
# 保存图片数据到临时文件
with open(temp_filename, 'wb') as f:
f.write(image_data.data)
print(f"Gemini生成图片成功: {temp_filename}")
# 先将图片信息插入数据库,获取相关信息
image_info = insert_generated_image_to_db(image_filename, image_url_path, article_tags)
if not image_info:
raise Exception("插入图片信息到数据库失败")
print(f"图片信息已插入数据库tag_image_id: {image_info['tag_image_id']}, image_id: {image_info['image_id']}")
# 使用tag_image_id上传图片到服务器
uploaded_url = upload_image_to_server(temp_filename, image_info['tag_image_id'])
# 将文章与图片的关联信息插入ai_article_images表
article_image_id = insert_article_image_relation(
article_id=article_id,
image_id=image_info['image_id'],
image_url=image_info['image_url'],
image_thumb_url=image_info['image_thumb_url'],
tag_image_id=image_info['tag_image_id'],
keywords_id=image_info['keywords_id'],
keywords_name=image_info['keywords_name'],
department_id=image_info['department_id'],
department_name=image_info['department_name'],
image_source=0 # 默认值
)
if article_image_id:
print(f"文章图片关联信息已创建ai_article_images.id: {article_image_id}")
# 删除临时文件
os.remove(temp_filename)
print(f"图片已上传到服务器: {uploaded_url}")
# 返回上传后的图片URL
return uploaded_url
# 如果没有返回图片数据,抛出异常
raise Exception("Gemini API未返回有效的图片数据")
def upload_image_to_server(image_path: str, tag_image_id: int) -> str:
"""
上传图片到服务器
Args:
image_path: 本地图片路径
tag_image_id: 图片标签ID
Returns:
服务器上的图片URL
"""
import requests
import json
# 登录获取JWT token
base_url = "http://47.99.184.230:8324" # 使用外网API地址
jwt_token = login_and_get_jwt_token(base_url)
if not jwt_token:
raise Exception("获取JWT token失败无法上传图片")
# 准备上传请求
upload_url = f"{base_url}/api/images/upload"
headers = {
'Authorization': f'Bearer {jwt_token}',
}
# 读取图片文件
with open(image_path, 'rb') as image_file:
files = {'file': image_file}
data = {'tag_image_id': tag_image_id} # 添加必传参数
response = requests.post(upload_url, headers=headers, files=files, data=data)
print(f"图片上传响应状态码: {response.status_code}")
print(f"图片上传响应内容: {response.text}")
if response.status_code == 200:
result = response.json()
if result.get('code') == 200:
# 返回服务器上的图片URL
return result['data']['http_image_url']
else:
raise Exception(f"图片上传失败: {result.get('message', '未知错误')}")
else:
raise Exception(f"图片上传请求失败,状态码: {response.status_code}, 响应: {response.text}")
def login_and_get_jwt_token(base_url: str) -> Optional[str]:
"""
登录获取JWT token
"""
login_url = f"{base_url}/api/auth/login"
login_data = {
"username": "user010", # 使用固定的账号
"password": "@5^2W6R7"
}
print(f"尝试登录: {login_data['username']}")
print(f"登录URL: {login_url}")
try:
response = requests.post(login_url, json=login_data, headers={'Content-Type': 'application/json'})
print(f"响应状态码: {response.status_code}")
if response.status_code == 200:
result = response.json()
if result.get('code') == 200:
jwt_token = result['data']['token']
print("JWT token获取成功")
return jwt_token
else:
print(f"登录失败: {result.get('message', '未知错误')}")
return None
else:
print(f"登录请求失败: {response.status_code}")
return None
except Exception as e:
print(f"登录异常: {e}")
return None
def batch_publish_articles(base_url: str, jwt_token: str, article_ids: List[int]) -> bool:
"""
批量提交文章到/api/articles/batch-publish-auto接口
"""
try:
print(f"开始批量提交 {len(article_ids)} 篇文章到batch-publish-auto接口")
# 构建批量发布数据
publish_data = {
"article_ids": article_ids
}
print(f"准备批量提交的数据: {json.dumps(publish_data, ensure_ascii=False)}")
# 发送请求
upload_url = f"{base_url}/api/articles/batch-publish-auto"
headers = {
'Authorization': f'Bearer {jwt_token}',
'Content-Type': 'application/json',
'Accept': 'application/json'
}
response = requests.post(upload_url, json=publish_data, headers=headers)
print(f"批量提交响应状态码: {response.status_code}")
if response.status_code == 200:
try:
result = response.json()
print(f"批量提交响应内容: {result}")
# 根据接口实际返回格式判断成功
if result.get('code') == 200:
data = result.get('data', {})
published_count = data.get('published_count', 0)
failed_count = data.get('failed_count', 0)
success_msg = f"批量提交成功,发布: {published_count}篇,失败: {failed_count}"
print(success_msg)
return True
else:
print(f"批量提交失败: {result.get('message', '未知错误')}")
return False
except json.JSONDecodeError as e:
print(f"解析批量提交响应失败: {e}")
return False
elif response.status_code == 401:
# Token过期
print("收到401错误JWT token可能已过期")
return False
else:
print(f"批量提交请求失败,状态码: {response.status_code}")
return False
except Exception as e:
print(f"批量提交异常: {e}")
return False
def process_single_article(article, used_image_counts, match_results):
"""
处理单个文章与图片的匹配和挂靠
Args:
article: 单个文章数据
used_image_counts: 图片使用计数
match_results: 匹配结果列表
Returns:
是否处理成功
"""
print(f"\n处理文章: {article['title']} (ID: {article['id']})")
# 根据文章标签获取匹配的图片(考虑已使用次数)
matched_images = get_images_by_tags_from_db(article['tags'], used_image_counts)
if matched_images:
print(f"找到 {len(matched_images)} 张符合条件的匹配图片")
# 按基础使用次数排序,优先使用基础计数较低的图片
matched_images.sort(key=lambda x: x['base_count'])
matched = False
for img in matched_images:
# 提取图片URL并添加前缀
image_url = "http://images11.bxmkb.cn/Images/" + img['image_url']
if image_url: # 确保图片URL存在
# 调用通义千问大模型进行挂靠评估
match_success = call_qwen_model(article, [image_url])
if match_success:
print(f"文章与图片挂靠成功: {article['title']}")
# 更新图片使用次数
used_image_counts[img['id']] += 1
# 记录匹配结果
match_results.append({
'文章ID': article['id'],
'文章标题': article['title'],
'文章内容': article['content'][:100] + '...' if len(article['content']) > 100 else article['content'], # 限制内容长度
'标签': ', '.join(article['tags']),
'匹配的图片URL': image_url,
'图片ID': img['id'],
'图片名称': img['image_name'],
'图片标签': img['tag_name'],
'图片关键词': img['keywords_name'],
'图片部门': img['department_name'],
'匹配状态': '成功'
})
return True
if not matched:
print(f"文章未能与任何图片成功匹配使用Gemini生成图片: {article['title']}")
# 使用文章标题和标签生成提示词
prompt = f"'{article['title']}'相关的插图,标签: {', '.join(article['tags'])}"
generated_image_url = generate_image_with_gemini(prompt, article['tags'], article['id'])
print(f"生成的图片URL: {generated_image_url}")
# 记录生成图片的结果
match_results.append({
'文章ID': article['id'],
'文章标题': article['title'],
'文章内容': article['content'][:100] + '...' if len(article['content']) > 100 else article['content'],
'标签': ', '.join(article['tags']),
'匹配的图片URL': generated_image_url,
'图片ID': 'N/A',
'图片名称': 'Generated',
'图片标签': 'N/A',
'图片关键词': 'N/A',
'图片部门': 'N/A',
'匹配状态': '生成图片'
})
return True
else:
print(f"没有找到符合条件的匹配图片使用Gemini生成图片: {article['title']}")
# 使用文章标题和标签生成提示词
prompt = f"'{article['title']}'相关的插图,标签: {', '.join(article['tags'])}"
generated_image_url = generate_image_with_gemini(prompt, article['tags'], article['id'])
print(f"生成的图片URL: {generated_image_url}")
# 记录生成图片的结果
match_results.append({
'文章ID': article['id'],
'文章标题': article['title'],
'文章内容': article['content'][:100] + '...' if len(article['content']) > 100 else article['content'],
'标签': ', '.join(article['tags']),
'匹配的图片URL': generated_image_url,
'图片ID': 'N/A',
'图片名称': 'Generated',
'图片标签': 'N/A',
'图片关键词': 'N/A',
'图片部门': 'N/A',
'匹配状态': '生成图片'
})
return True
def process_article_image_matching(test_mode=False, test_count=None):
"""
处理文章与图片的匹配和挂靠
Args:
test_mode: 是否为测试模式
test_count: 测试文章数量(仅在测试模式下使用)
"""
# 用于跟踪每张图片的使用次数
used_image_counts = defaultdict(int)
# 存储匹配结果
match_results = []
try:
# 根据模式决定获取哪些文章
articles = get_articles_with_tags_from_db()
if not articles:
print("没有找到文章")
return
# 如果是测试模式只取前test_count条数据
if test_mode:
if test_count is None:
test_count = 3 # 默认测试前3条
articles = articles[:test_count]
print(f"测试模式:处理前 {len(articles)} 篇文章")
success_count = 0
generated_count = 0
# 收集所有处理后的文章ID用于发布
processed_article_ids = []
for article in articles:
if process_single_article(article, used_image_counts, match_results):
success_count += 1
processed_article_ids.append(article['id'])
else:
print(f"处理文章 {article['id']} 失败")
# 将匹配结果写入CSV文件
output_csv = 'article_image_match_results.csv'
with open(output_csv, 'w', newline='', encoding='utf-8-sig') as csvfile:
fieldnames = [
'文章ID', '文章标题', '文章内容', '标签',
'匹配的图片URL', '图片ID', '图片名称',
'图片标签', '图片关键词', '图片部门', '匹配状态'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for result in match_results:
writer.writerow(result)
if not test_mode:
print(f"\n处理完成! 成功挂靠: {success_count} 篇, 生成图片: {generated_count}")
print(f"匹配结果已保存至: {output_csv}")
# 如果有处理过的文章,将它们提交到发布接口
if processed_article_ids:
print(f"\n开始发布处理过的 {len(processed_article_ids)} 篇文章...")
# 登录获取JWT token
base_url = "http://47.99.184.230:8324" # 使用外网API地址
jwt_token = login_and_get_jwt_token(base_url)
if jwt_token:
# 批量发布文章
if batch_publish_articles(base_url, jwt_token, processed_article_ids):
print(f"成功发布 {len(processed_article_ids)} 篇文章")
else:
print("批量发布失败")
else:
print("获取JWT token失败无法发布文章")
else:
print("\n没有处理过的文章,跳过发布步骤")
else:
print(f"\n测试模式完成! 处理了 {len(articles)} 篇文章,成功挂靠: {success_count} 篇, 生成图片: {len([r for r in match_results if r['匹配状态'] == '生成图片'])}")
print(f"处理结果已保存至: {output_csv}")
except Exception as e:
print(f"处理文章图片匹配时发生错误: {e}")
raise
if __name__ == "__main__":
import sys
print("开始处理文章与图片的智能挂靠...")
# 检查命令行参数
if len(sys.argv) > 1:
if sys.argv[1] == "--test" and len(sys.argv) > 2:
# 测试模式处理前N篇文章
test_count = int(sys.argv[2])
print(f"启动测试模式,处理前 {test_count} 篇文章")
process_article_image_matching(test_mode=True, test_count=test_count)
elif sys.argv[1] == "--test" and len(sys.argv) == 2:
# 提示用户输入要测试的文章数量
test_count_input = input("请输入要测试的文章数量 (默认3): ")
test_count = int(test_count_input) if test_count_input.strip().isdigit() else 3
print(f"启动测试模式,处理前 {test_count} 篇文章")
process_article_image_matching(test_mode=True, test_count=test_count)
else:
print("使用方法:")
print(" 正常模式: python match_article_images.py")
print(" 测试模式: python match_article_images.py --test [文章ID]")
else:
# 正常模式:处理所有文章
process_article_image_matching()