Files
ai_wht_B/ver_2512121646/image_routes.py
“shengyudong” 5a384b694e 2026-1-6
2026-01-06 14:18:39 +08:00

1414 lines
57 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
图片库管理接口
"""
from flask import Blueprint, request, jsonify
import logging
import os
import time
import random
from datetime import datetime
from werkzeug.utils import secure_filename
from PIL import Image
import io
from auth_utils import require_auth, AuthUtils
from database_config import get_db_manager, format_datetime_fields
from log_utils import log_create, log_update, log_delete, log_error, log_operation
from oss_image import SyncImageToOSS
logger = logging.getLogger('article_server')
# 图片上传目录
#IMAGE_UPLOAD_DIR = "D:/baijiahao/tags_images/Images"
IMAGE_UPLOAD_DIR = "/home/work/baijiahao/tags_images/Images"
# 图片基础URL
#IMAGE_BASE_URL_CDN1 = "http://images11.bxmkb.cn/Images/"
IMAGE_BASE_URL_CDN1 = "https://bxmkb-beijing.oss-cn-beijing.aliyuncs.com/Images/"
# 支持的图片格式
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'bmp', 'tiff', 'webp'}
# 创建蓝图
image_bp = Blueprint('image', __name__, url_prefix='/api/images')
@image_bp.route('/list', methods=['GET'])
@require_auth
def get_images_list():
"""获取图片列表"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
# 获取查询参数
page = int(request.args.get('page', 1))
page_size = int(request.args.get('pageSize', 20))
keyword = request.args.get('keyword', '').strip()
product_id = request.args.get('product_id', '').strip()
image_type = request.args.get('type', '').strip()
# 构建查询条件
where_conditions = ["enterprise_id = %s", "status = %s"]
params = [enterprise_id, 'active']
if keyword:
where_conditions.append("(image_name LIKE %s OR keywords LIKE %s)")
keyword_pattern = f"%{keyword}%"
params.extend([keyword_pattern, keyword_pattern])
if product_id:
# ✅ 直接使用 ai_images 表的 product_id 字段筛选
where_conditions.append("product_id = %s")
params.append(product_id)
if image_type:
where_conditions.append("image_type_name LIKE %s")
params.append(f"%{image_type}%")
where_clause = " AND ".join(where_conditions)
# 计算偏移量
offset = (page - 1) * page_size
db_manager = get_db_manager()
# 查询总数
count_sql = f"""
SELECT COUNT(*) as total
FROM ai_images
WHERE {where_clause}
"""
count_result = db_manager.execute_query(count_sql, params)
total = count_result[0]['total']
# 查询图片列表
sql = f"""
SELECT id, product_id, product_name, image_name, image_url, image_thumb_url, thumbnail_url,
image_type_id, image_type_name, department, keywords,
size_type, file_size, width, height, status,
created_at, updated_at
FROM ai_images
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""
params.extend([page_size, offset])
images = db_manager.execute_query(sql, params)
# 为图片 URL 添加 CDN 前缀
for img in images:
if img.get('image_url') and not img['image_url'].startswith('http'):
img['image_url'] = IMAGE_BASE_URL_CDN1 + img['image_url']
if img.get('image_thumb_url') and not img['image_thumb_url'].startswith('http'):
img['image_thumb_url'] = IMAGE_BASE_URL_CDN1 + img['image_thumb_url']
if img.get('thumbnail_url') and not img['thumbnail_url'].startswith('http'):
img['thumbnail_url'] = IMAGE_BASE_URL_CDN1 + img['thumbnail_url']
# ✅ product_id 和 product_name 已经在查询中直接获取,无需额外关联查询
# 关联标签信息(从 ai_image_tags 表直接获取)
for img in images:
image_id = img.get('id')
# 查询关联的标签信息
tags_sql = """
SELECT tag_id, tag_name
FROM ai_image_tags
WHERE image_id = %s AND enterprise_id = %s
ORDER BY created_at
"""
tags_result = db_manager.execute_query(tags_sql, [image_id, enterprise_id])
# 聚合标签信息到图片对象中
if tags_result and len(tags_result) > 0:
img['tags'] = tags_result
else:
img['tags'] = []
# 格式化时间字段
images = format_datetime_fields(images)
logger.info(f"获取图片列表成功,总数: {total}")
return jsonify({
'code': 200,
'message': 'success',
'data': {
'total': total,
'list': images
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取图片列表] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@image_bp.route('/list_dashboard', methods=['GET'])
@require_auth
def get_images_dashboard():
"""获取图片库仪表盘统计数据"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
db_manager = get_db_manager()
# 1. 图片总数:查询 ai_images 表
images_total_sql = """
SELECT COUNT(id) as total
FROM ai_images
WHERE enterprise_id = %s AND status = 'active'
"""
images_result = db_manager.execute_query(images_total_sql, [enterprise_id])
images_total = images_result[0]['total'] if images_result else 0
# 2. 产品图片总数:✅ 直接从 ai_images 表统计
product_total_sql = """
SELECT COUNT(id) as total
FROM ai_images
WHERE enterprise_id = %s AND status = 'active' AND product_id > 0
"""
product_result = db_manager.execute_query(product_total_sql, [enterprise_id])
product_total = product_result[0]['total'] if product_result else 0
# 3. 产品类型总数:✅ 直接从 ai_images 表统计不同的产品ID
product_images_total_sql = """
SELECT COUNT(DISTINCT product_id) as total
FROM ai_images
WHERE enterprise_id = %s AND status = 'active' AND product_id > 0
"""
product_images_result = db_manager.execute_query(product_images_total_sql, [enterprise_id])
product_images_total = product_images_result[0]['total'] if product_images_result else 0
# 4. 场景图片总数:查询 ai_images 表中 image_type_name 包含“场景”的图片
scene_images_total_sql = """
SELECT COUNT(id) as total
FROM ai_images
WHERE enterprise_id = %s AND status = 'active'
AND image_type_name LIKE %s
"""
scene_images_result = db_manager.execute_query(scene_images_total_sql, [enterprise_id, '%场景%'])
scene_images_total = scene_images_result[0]['total'] if scene_images_result else 0
logger.info(f"获取图片库仪表盘数据成功企业ID: {enterprise_id}")
return jsonify({
'code': 200,
'message': 'success',
'data': {
'images_total': images_total,
'product_total': product_total,
'product_images_total': product_images_total,
'scene_images_total': scene_images_total
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取图片库仪表盘数据] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@image_bp.route('/upload', methods=['POST'])
@require_auth
def upload_image():
"""上传图片(支持真实文件上传)"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
user_id = current_user.get('user_id', 0)
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
# 检查是否有上传的文件
if 'image' in request.files:
# 真实文件上传流程
file = request.files['image']
if file.filename == '':
return jsonify({
'code': 400,
'message': '没有选择文件',
'data': None
}), 400
# 验证文件类型
file_ext = file.filename.rsplit('.', 1)[1].lower() if '.' in file.filename else ''
if file_ext not in ALLOWED_EXTENSIONS:
return jsonify({
'code': 400,
'message': f'不支持的文件格式,仅支持: {", ".join(ALLOWED_EXTENSIONS)}',
'data': None
}), 400
# 获取表单数据
image_name = request.form.get('image_name', '')
image_type_name = request.form.get('image_type_name', '')
image_type_id = request.form.get('image_type_id', '').strip() # ✅ 新增图片类型ID
tag_keywords = request.form.get('tag_keywords', '')
product_id = request.form.get('product_id', '').strip() # 产品ID
product_name = request.form.get('product_name', '').strip() # ✅ 新增:产品名称
description = request.form.get('description', '') # 可选的图片描述
if not image_type_name:
return jsonify({
'code': 400,
'message': '缺少必需字段: image_type_name',
'data': None
}), 400
# 生成新的文件名
current_time = datetime.now()
date_str = current_time.strftime('%Y%m%d')
timestamp = int(time.time() * 1000) # 毫秒时间戳
random_num = random.randint(100, 999) # 3位随机数
base_filename = f"{timestamp}{random_num}"
new_filename = f"{base_filename}.png"
thumb_filename = f"{base_filename}_thumb.png"
# 创建日期目录
date_dir = os.path.join(IMAGE_UPLOAD_DIR, date_str)
os.makedirs(date_dir, exist_ok=True)
# 处理图片:优化压缩和生成缩略图
img_width = 0 # 图片宽度
img_height = 0 # 图片高度
file_size = 0 # 文件大小
try:
# 读取上传的图片
image_data = file.read()
file_size = len(image_data) # 获取文件大小
original_image = Image.open(io.BytesIO(image_data))
# 记录原始尺寸
img_width, img_height = original_image.size
# 转换为RGB模式确保兼容性
if original_image.mode in ('RGBA', 'LA', 'P'):
background = Image.new('RGB', original_image.size, (255, 255, 255))
if original_image.mode == 'P':
original_image = original_image.convert('RGBA')
background.paste(original_image, mask=original_image.split()[-1] if original_image.mode == 'RGBA' else None)
original_image = background
elif original_image.mode != 'RGB':
original_image = original_image.convert('RGB')
# 1. 保存优化后的原图
file_path = os.path.join(date_dir, new_filename)
# 如果图片过大,先进行适当缩放
max_size = (1920, 1080)
if original_image.size[0] > max_size[0] or original_image.size[1] > max_size[1]:
original_image.thumbnail(max_size, Image.Resampling.LANCZOS)
logger.info(f"[上传图片] 图片尺寸优化: 缩放到 {original_image.size}")
# 保存优化后的原图(高质量压缩)
original_image.save(file_path, 'PNG', optimize=True, compress_level=6)
logger.info(f"[上传图片] 优化原图保存成功: {file_path}")
# 2. 生成缩略图 (120x160)
thumb_path = os.path.join(date_dir, thumb_filename)
thumb_image = original_image.copy()
# 使用高质量重采样算法生成缩略图
thumb_size = (120, 160)
# 计算缩放比例,保持宽高比
img_ratio = thumb_image.size[0] / thumb_image.size[1]
thumb_ratio = thumb_size[0] / thumb_size[1]
if img_ratio > thumb_ratio:
new_height = thumb_size[1]
new_width = int(new_height * img_ratio)
thumb_image = thumb_image.resize((new_width, new_height), Image.Resampling.LANCZOS)
left = (new_width - thumb_size[0]) // 2
thumb_image = thumb_image.crop((left, 0, left + thumb_size[0], thumb_size[1]))
else:
new_width = thumb_size[0]
new_height = int(new_width / img_ratio)
thumb_image = thumb_image.resize((new_width, new_height), Image.Resampling.LANCZOS)
top = (new_height - thumb_size[1]) // 2
thumb_image = thumb_image.crop((0, top, thumb_size[0], top + thumb_size[1]))
# 保存缩略图
thumb_image.save(thumb_path, 'PNG', optimize=True, compress_level=9)
logger.info(f"[上传图片] 缩略图生成成功: {thumb_path} (尺寸: {thumb_image.size})")
except Exception as img_error:
logger.error(f"[上传图片] 图片处理失败: {str(img_error)}", exc_info=True)
file.seek(0)
file_path = os.path.join(date_dir, new_filename)
file.save(file_path)
logger.info(f"[上传图片] 回退保存成功: {file_path}")
# 生成相对路径用于数据库存储
relative_path = f"{date_str}/{new_filename}"
thumb_relative_path = f"{date_str}/{thumb_filename}"
# 图片上传成功后,调用 TransformerImage 方法处理原图、缩图
try:
logger.info(f"[上传图片] 开始调用 TransformerImage 处理图片")
# 创建 SyncImageToOSS 实例
sync_oss = SyncImageToOSS()
# 调用 TransformerImage 方法处理原图
original_result = sync_oss.TransformerImage(file_path)
logger.info(f"[上传图片] 原图上传结果: {original_result}")
# 调用 TransformerImage 方法处理缩图
thumb_result = sync_oss.TransformerImage(thumb_path)
logger.info(f"[上传图片] 缩图上传结果: {thumb_result}")
# 检查所有上传是否成功
if not (original_result['success'] and thumb_result['success']):
logger.warning(f"[上传图片] OSS上传部分失败 - 原图: {original_result['success']}, 缩图: {thumb_result['success']}")
except Exception as e:
logger.error(f"[上传图片] TransformerImage 调用失败: {str(e)}")
db_manager = get_db_manager()
# 1插入 ai_images 表
image_sql = """
INSERT INTO ai_images
(enterprise_id, product_id, product_name, image_name, image_url, image_thumb_url, thumbnail_url,
image_type_id, image_type_name, department, keywords, size_type,
file_size, width, height, upload_user_id, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
image_id = db_manager.execute_insert(image_sql, (
enterprise_id,
int(product_id) if product_id and product_id.isdigit() else 0, # product_id
product_name, # ✅ 新增product_name
image_name or new_filename,
relative_path,
thumb_relative_path,
thumb_relative_path,
int(image_type_id) if image_type_id and image_type_id.isdigit() else 0, # ✅ 新增image_type_id
image_type_name,
'',
tag_keywords,
'medical',
file_size, # ✅ 新增file_size
img_width, # ✅ 新增width
img_height, # ✅ 新增height
user_id,
'active'
))
logger.info(f"[上传图片] 步骤(1) ai_images 入库成功: image_id={image_id}")
# 2处理 tag_keywords插入 ai_image_tags_name 表
tag_ids_map = {}
if tag_keywords:
tags = [tag.strip() for tag in tag_keywords.split(',') if tag.strip()]
for tag_name in tags:
check_tag_sql = "SELECT id FROM ai_image_tags_name WHERE enterprise_id = %s AND tag_name = %s"
existing_tag = db_manager.execute_query(check_tag_sql, (enterprise_id, tag_name))
if existing_tag:
tag_id = existing_tag[0]['id']
logger.info(f"[上传图片] 步骤(2) 标签已存在: tag_name={tag_name}, tag_id={tag_id}")
else:
insert_tag_sql = """
INSERT INTO ai_image_tags_name
(enterprise_id, tag_name, status)
VALUES (%s, %s, %s)
"""
tag_id = db_manager.execute_insert(insert_tag_sql, (enterprise_id, tag_name, 'active'))
logger.info(f"[上传图片] 步骤(2) 新标签入库成功: tag_name={tag_name}, tag_id={tag_id}")
tag_ids_map[tag_name] = tag_id
# 3插入 ai_image_tags 关系映射表
relation_count = 0
for tag_name, tag_id in tag_ids_map.items():
try:
relation_sql = """
INSERT INTO ai_image_tags
(enterprise_id, product_id, image_id, image_name, image_url, image_thumb_url,
tag_id, tag_name, created_user_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
db_manager.execute_insert(relation_sql, (
enterprise_id,
int(product_id) if product_id and product_id.isdigit() else 0, # product_id
image_id,
image_name or new_filename,
relative_path,
thumb_relative_path,
tag_id,
tag_name,
user_id
))
relation_count += 1
logger.info(f"[上传图片] 步骤(3) 关系映射入库成功: image_id={image_id}, tag_id={tag_id}, tag_name={tag_name}, product_id={product_id}")
except Exception as rel_error:
logger.warning(f"[上传图片] 步骤(3) 关系映射已存在: image_id={image_id}, tag_id={tag_id}, error={str(rel_error)}")
# 4插入 ai_product_images 产品图片关联表(只在有 product_id 时插入)
product_image_id = None
if product_id and product_id.isdigit():
try:
product_image_sql = """
INSERT INTO ai_product_images
(enterprise_id, product_id, product_name, image_id, image_name, image_url, thumbnail_url,
type_name, description, file_size, width, height, upload_user_id, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
product_image_id = db_manager.execute_insert(product_image_sql, (
enterprise_id,
int(product_id),
product_name, # ✅ 新增冗余字段product_name
image_id,
image_name or new_filename,
relative_path,
thumb_relative_path,
image_type_name,
description,
file_size,
img_width,
img_height,
user_id,
'active'
))
logger.info(f"[上传图片] 步骤(4) ai_product_images 入库成功: product_image_id={product_image_id}, product_id={product_id}, image_id={image_id}")
except Exception as prod_error:
logger.error(f"[上传图片] 步骤(4) ai_product_images 入库失败: {str(prod_error)}", exc_info=True)
logger.info(f"[上传图片] 全部完成: image_id={image_id}, 标签数={len(tag_ids_map)}, 关系数={relation_count}, product_image_id={product_image_id}")
# 构建返回数据
response_data = {
'id': image_id,
'url': IMAGE_BASE_URL_CDN1 + relative_path,
'relative_path': relative_path,
'image_thumb_url': IMAGE_BASE_URL_CDN1 + thumb_relative_path,
'thumb_relative_path': thumb_relative_path,
'tags_count': len(tag_ids_map),
'relations_count': relation_count,
'width': img_width,
'height': img_height,
'file_size': file_size
}
# 如果插入了产品图片关联,添加相关信息
if product_image_id:
response_data['product_image_id'] = product_image_id
response_data['product_id'] = int(product_id)
return jsonify({
'code': 200,
'message': '上传成功',
'data': response_data,
'timestamp': int(datetime.now().timestamp() * 1000)
})
else:
# 必须上传真实文件
return jsonify({
'code': 400,
'message': '缺少上传文件,请使用 multipart/form-data 上传真实图片文件',
'data': None
}), 400
except Exception as e:
logger.error(f"[上传图片] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@image_bp.route('/batch-upload', methods=['POST'])
@require_auth
def batch_upload_images():
"""批量上传图片"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
data = request.get_json()
if not data or not data.get('images'):
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
db_manager = get_db_manager()
uploaded_ids = []
for img in data['images']:
# 验证必需字段
if not img.get('image_url') or not img.get('image_type_name'):
continue
# 插入图片记录
sql = """
INSERT INTO ai_images
(image_name, image_url, image_thumb_url, thumbnail_url, image_type_id, image_type_name,
department, keywords, size_type, upload_user_id, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
image_id = db_manager.execute_insert(sql, (
img.get('image_name', ''),
img['image_url'],
img.get('image_thumb_url', ''),
img.get('thumbnail_url', ''),
img.get('image_type_id', 0),
img['image_type_name'],
img.get('department', ''),
img.get('keywords', ''),
img.get('size_type', 'medical'),
current_user.get('user_id', 0),
'active'
))
uploaded_ids.append(image_id)
logger.info(f"批量上传图片成功: {len(uploaded_ids)}")
return jsonify({
'code': 200,
'message': '批量上传成功',
'data': {
'uploaded_count': len(uploaded_ids),
'ids': uploaded_ids
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[批量上传图片] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@image_bp.route('/<int:image_id>', methods=['DELETE'])
@require_auth
def delete_image(image_id):
"""删除图片"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
db_manager = get_db_manager()
# 检查图片是否存在
check_sql = "SELECT id FROM ai_images WHERE id = %s"
existing = db_manager.execute_query(check_sql, (image_id,))
if not existing:
return jsonify({
'code': 404,
'message': '图片不存在',
'data': None
}), 404
# 删除图片记录(软删除)
sql = "UPDATE ai_images SET status = 'deleted' WHERE id = %s"
db_manager.execute_update(sql, (image_id,))
logger.info(f"删除图片成功: ID {image_id}")
return jsonify({
'code': 200,
'message': '删除成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[删除图片] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
# ==================== 图片标签名称管理 (ai_image_tags_name) ====================
@image_bp.route('/tags/names/list', methods=['GET'])
@require_auth
def get_tag_names_list():
"""获取标签名称列表"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
page = int(request.args.get('page', 1))
page_size = int(request.args.get('pageSize', 20))
keyword = request.args.get('keyword', '').strip()
status = request.args.get('status', '').strip()
where_conditions = ["enterprise_id = %s"]
params = [enterprise_id]
if keyword:
where_conditions.append("(tag_name LIKE %s OR description LIKE %s)")
keyword_pattern = f"%{keyword}%"
params.extend([keyword_pattern, keyword_pattern])
if status:
where_conditions.append("status = %s")
params.append(status)
where_clause = " AND ".join(where_conditions)
offset = (page - 1) * page_size
db_manager = get_db_manager()
# 查询总数
count_sql = f"SELECT COUNT(*) as total FROM ai_image_tags_name WHERE {where_clause}"
count_result = db_manager.execute_query(count_sql, params)
total = count_result[0]['total']
# 查询列表
sql = f"""
SELECT id, enterprise_id, tag_name, tag_category, department, description, status, created_at, updated_at
FROM ai_image_tags_name
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""
params.extend([page_size, offset])
tags = db_manager.execute_query(sql, params)
# 格式化时间字段
tags = format_datetime_fields(tags)
return jsonify({'code': 200, 'message': 'success', 'data': {'total': total, 'list': tags}})
except Exception as e:
logger.error(f"[获取标签名称列表] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@image_bp.route('/tags/names/create', methods=['POST'])
@require_auth
def create_tag_name():
"""创建标签名称"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
data = request.get_json()
if not data or not data.get('tag_name'):
return jsonify({'code': 400, 'message': '标签名称不能为空', 'data': None}), 400
db_manager = get_db_manager()
# 检查标签名称是否已存在
check_sql = "SELECT id FROM ai_image_tags_name WHERE enterprise_id = %s AND tag_name = %s"
existing = db_manager.execute_query(check_sql, (enterprise_id, data['tag_name']))
if existing:
return jsonify({'code': 409, 'message': '标签名称已存在', 'data': None}), 409
# 插入标签名称
sql = """
INSERT INTO ai_image_tags_name
(enterprise_id, tag_name, tag_category, department, description, status)
VALUES (%s, %s, %s, %s, %s, %s)
"""
tag_id = db_manager.execute_insert(sql, (
enterprise_id,
data['tag_name'],
data.get('tag_category', ''),
data.get('department', ''),
data.get('description', ''),
data.get('status', 'active')
))
logger.info(f"创建标签名称成功: ID {tag_id}")
return jsonify({'code': 200, 'message': '创建成功', 'data': {'id': tag_id}})
except Exception as e:
logger.error(f"[创建标签名称] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@image_bp.route('/tags/names/<int:tag_id>', methods=['PUT'])
@require_auth
def update_tag_name(tag_id):
"""更新标签名称"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
data = request.get_json()
if not data:
return jsonify({'code': 400, 'message': '请求参数错误', 'data': None}), 400
db_manager = get_db_manager()
# 检查标签是否存在
check_sql = "SELECT id FROM ai_image_tags_name WHERE id = %s AND enterprise_id = %s"
existing = db_manager.execute_query(check_sql, (tag_id, enterprise_id))
if not existing:
return jsonify({'code': 404, 'message': '标签不存在', 'data': None}), 404
# 更新标签
update_fields = []
params = []
if 'tag_name' in data:
update_fields.append("tag_name = %s")
params.append(data['tag_name'])
if 'tag_category' in data:
update_fields.append("tag_category = %s")
params.append(data['tag_category'])
if 'department' in data:
update_fields.append("department = %s")
params.append(data['department'])
if 'description' in data:
update_fields.append("description = %s")
params.append(data['description'])
if 'status' in data:
update_fields.append("status = %s")
params.append(data['status'])
if not update_fields:
return jsonify({'code': 400, 'message': '没有可更新的字段', 'data': None}), 400
params.extend([tag_id, enterprise_id])
sql = f"UPDATE ai_image_tags_name SET {', '.join(update_fields)} WHERE id = %s AND enterprise_id = %s"
db_manager.execute_update(sql, tuple(params))
logger.info(f"更新标签名称成功: ID {tag_id}")
return jsonify({'code': 200, 'message': '更新成功', 'data': None})
except Exception as e:
logger.error(f"[更新标签名称] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@image_bp.route('/tags/names/<int:tag_id>', methods=['DELETE'])
@require_auth
def delete_tag_name(tag_id):
"""删除标签名称"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 检查标签是否存在
check_sql = "SELECT id FROM ai_image_tags_name WHERE id = %s AND enterprise_id = %s"
existing = db_manager.execute_query(check_sql, (tag_id, enterprise_id))
if not existing:
return jsonify({'code': 404, 'message': '标签不存在', 'data': None}), 404
# 删除标签(物理删除)
sql = "DELETE FROM ai_image_tags_name WHERE id = %s AND enterprise_id = %s"
db_manager.execute_update(sql, (tag_id, enterprise_id))
logger.info(f"删除标签名称成功: ID {tag_id}")
return jsonify({'code': 200, 'message': '删除成功', 'data': None})
except Exception as e:
logger.error(f"[删除标签名称] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
# ==================== 图片标签关系管理 (ai_image_tags) ====================
@image_bp.route('/tags/relations/list', methods=['GET'])
@require_auth
def get_tag_relations_list():
"""获取图片标签关系列表"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
page = int(request.args.get('page', 1))
page_size = int(request.args.get('pageSize', 20))
image_id = request.args.get('image_id', '').strip()
tag_id = request.args.get('tag_id', '').strip()
where_conditions = ["enterprise_id = %s"]
params = [enterprise_id]
if image_id:
where_conditions.append("image_id = %s")
params.append(image_id)
if tag_id:
where_conditions.append("tag_id = %s")
params.append(tag_id)
where_clause = " AND ".join(where_conditions)
offset = (page - 1) * page_size
db_manager = get_db_manager()
# 查询总数
count_sql = f"SELECT COUNT(*) as total FROM ai_image_tags WHERE {where_clause}"
count_result = db_manager.execute_query(count_sql, params)
total = count_result[0]['total']
# 查询列表
sql = f"""
SELECT id, enterprise_id, image_id, tag_id, tag_name, created_at, updated_at
FROM ai_image_tags
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""
params.extend([page_size, offset])
relations = db_manager.execute_query(sql, params)
# 格式化时间字段
relations = format_datetime_fields(relations)
return jsonify({'code': 200, 'message': 'success', 'data': {'total': total, 'list': relations}})
except Exception as e:
logger.error(f"[获取图片标签关系列表] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@image_bp.route('/tags/relations/create', methods=['POST'])
@require_auth
def create_tag_relation():
"""创建图片标签关系"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
user_id = current_user.get('user_id', 0)
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
data = request.get_json()
if not data or not data.get('image_id') or not data.get('tag_id'):
return jsonify({'code': 400, 'message': '图片ID和标签ID不能为空', 'data': None}), 400
db_manager = get_db_manager()
# 检查关系是否已存在
check_sql = "SELECT id FROM ai_image_tags WHERE enterprise_id = %s AND image_id = %s AND tag_id = %s"
existing = db_manager.execute_query(check_sql, (enterprise_id, data['image_id'], data['tag_id']))
if existing:
return jsonify({'code': 409, 'message': '该标签关系已存在', 'data': None}), 409
# 插入关系
sql = """
INSERT INTO ai_image_tags
(enterprise_id, image_id, tag_id, tag_name)
VALUES (%s, %s, %s, %s)
"""
relation_id = db_manager.execute_insert(sql, (
enterprise_id,
data['image_id'],
data['tag_id'],
data.get('tag_name', '')
))
logger.info(f"创建图片标签关系成功: ID {relation_id}")
return jsonify({'code': 200, 'message': '创建成功', 'data': {'id': relation_id}})
except Exception as e:
logger.error(f"[创建图片标签关系] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@image_bp.route('/tags/relations/<int:relation_id>', methods=['DELETE'])
@require_auth
def delete_tag_relation(relation_id):
"""删除图片标签关系"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 检查关系是否存在
check_sql = "SELECT id FROM ai_image_tags WHERE id = %s AND enterprise_id = %s"
existing = db_manager.execute_query(check_sql, (relation_id, enterprise_id))
if not existing:
return jsonify({'code': 404, 'message': '标签关系不存在', 'data': None}), 404
# 删除关系
sql = "DELETE FROM ai_image_tags WHERE id = %s AND enterprise_id = %s"
db_manager.execute_update(sql, (relation_id, enterprise_id))
logger.info(f"删除图片标签关系成功: ID {relation_id}")
return jsonify({'code': 200, 'message': '删除成功', 'data': None})
except Exception as e:
logger.error(f"[删除图片标签关系] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
# ==================== 图片类型管理 (ai_image_type) ====================
@image_bp.route('/types/list', methods=['GET'])
@require_auth
def get_image_types_list():
"""获取图片类型列表"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
logger.info(f"[获取图片类型列表] current_user: {current_user}, enterprise_id: {enterprise_id}")
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
page = int(request.args.get('page', 1))
page_size = int(request.args.get('pageSize', 20))
keyword = request.args.get('keyword', '').strip()
where_conditions = ["enterprise_id = %s"]
params = [enterprise_id]
if keyword:
where_conditions.append("(type_name LIKE %s OR keywords_name LIKE %s)")
keyword_pattern = f"%{keyword}%"
params.extend([keyword_pattern, keyword_pattern])
where_clause = " AND ".join(where_conditions)
offset = (page - 1) * page_size
db_manager = get_db_manager()
# 查询总数
count_sql = f"SELECT COUNT(*) as total FROM ai_image_type WHERE {where_clause}"
count_result = db_manager.execute_query(count_sql, params)
total = count_result[0]['total']
# 查询列表
sql = f"""
SELECT id, enterprise_id, type_name, keywords_id, keywords_name,
department_id, department_name, created_user_id, created_at, updated_at
FROM ai_image_type
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""
params.extend([page_size, offset])
types = db_manager.execute_query(sql, params)
# 格式化时间字段
types = format_datetime_fields(types)
return jsonify({'code': 200, 'message': 'success', 'data': {'total': total, 'list': types}})
except Exception as e:
logger.error(f"[获取图片类型列表] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@image_bp.route('/types/create', methods=['POST'])
@require_auth
def create_image_type():
"""创建图片类型"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
user_id = current_user.get('user_id', 0)
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
data = request.get_json()
if not data or not data.get('type_name'):
return jsonify({'code': 400, 'message': '类型名称不能为空', 'data': None}), 400
db_manager = get_db_manager()
# 插入类型
sql = """
INSERT INTO ai_image_type
(enterprise_id, type_name, keywords_id, keywords_name, department_id, department_name, created_user_id)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
type_id = db_manager.execute_insert(sql, (
enterprise_id,
data['type_name'],
data.get('keywords_id', 0),
data.get('keywords_name', ''),
data.get('department_id', 0),
data.get('department_name', ''),
user_id
))
logger.info(f"创建图片类型成功: ID {type_id}")
return jsonify({'code': 200, 'message': '创建成功', 'data': {'id': type_id}})
except Exception as e:
logger.error(f"[创建图片类型] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@image_bp.route('/types/<int:type_id>', methods=['PUT'])
@require_auth
def update_image_type(type_id):
"""更新图片类型"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
data = request.get_json()
if not data:
return jsonify({'code': 400, 'message': '请求参数错误', 'data': None}), 400
db_manager = get_db_manager()
# 检查类型是否存在
check_sql = "SELECT id FROM ai_image_type WHERE id = %s AND enterprise_id = %s"
existing = db_manager.execute_query(check_sql, (type_id, enterprise_id))
if not existing:
return jsonify({'code': 404, 'message': '图片类型不存在', 'data': None}), 404
# 更新类型
update_fields = []
params = []
if 'type_name' in data:
update_fields.append("type_name = %s")
params.append(data['type_name'])
if 'keywords_id' in data:
update_fields.append("keywords_id = %s")
params.append(data['keywords_id'])
if 'keywords_name' in data:
update_fields.append("keywords_name = %s")
params.append(data['keywords_name'])
if 'department_id' in data:
update_fields.append("department_id = %s")
params.append(data['department_id'])
if 'department_name' in data:
update_fields.append("department_name = %s")
params.append(data['department_name'])
if not update_fields:
return jsonify({'code': 400, 'message': '没有可更新的字段', 'data': None}), 400
params.extend([type_id, enterprise_id])
sql = f"UPDATE ai_image_type SET {', '.join(update_fields)} WHERE id = %s AND enterprise_id = %s"
db_manager.execute_update(sql, tuple(params))
logger.info(f"更新图片类型成功: ID {type_id}")
return jsonify({'code': 200, 'message': '更新成功', 'data': None})
except Exception as e:
logger.error(f"[更新图片类型] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@image_bp.route('/types/<int:type_id>', methods=['DELETE'])
@require_auth
def delete_image_type(type_id):
"""删除图片类型"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 检查类型是否存在
check_sql = "SELECT id FROM ai_image_type WHERE id = %s AND enterprise_id = %s"
existing = db_manager.execute_query(check_sql, (type_id, enterprise_id))
if not existing:
return jsonify({'code': 404, 'message': '图片类型不存在', 'data': None}), 404
# 删除类型
sql = "DELETE FROM ai_image_type WHERE id = %s AND enterprise_id = %s"
db_manager.execute_update(sql, (type_id, enterprise_id))
logger.info(f"删除图片类型成功: ID {type_id}")
return jsonify({'code': 200, 'message': '删除成功', 'data': None})
except Exception as e:
logger.error(f"[删除图片类型] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
# ==================== 图片标签管理 (ai_image_tags) ====================
@image_bp.route('/tags/list', methods=['GET'])
@require_auth
def get_image_tags_list():
"""获取图片标签列表(综合表)"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
page = int(request.args.get('page', 1))
page_size = int(request.args.get('pageSize', 20))
image_id = request.args.get('image_id', '').strip()
tag_id = request.args.get('tag_id', '').strip()
department_id = request.args.get('department_id', '').strip()
where_conditions = ["enterprise_id = %s"]
params = [enterprise_id]
if image_id:
where_conditions.append("image_id = %s")
params.append(image_id)
if tag_id:
where_conditions.append("tag_id = %s")
params.append(tag_id)
if department_id:
where_conditions.append("department_id = %s")
params.append(department_id)
where_clause = " AND ".join(where_conditions)
offset = (page - 1) * page_size
db_manager = get_db_manager()
# 查询总数
count_sql = f"SELECT COUNT(*) as total FROM ai_image_tags WHERE {where_clause}"
count_result = db_manager.execute_query(count_sql, params)
total = count_result[0]['total']
# 查询列表
sql = f"""
SELECT id, enterprise_id, image_id, image_name, image_url, image_thumb_url,
tag_id, tag_name, keywords_id, keywords_name, department_id, department_name,
created_user_id, created_at, updated_at
FROM ai_image_tags
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""
params.extend([page_size, offset])
tags = db_manager.execute_query(sql, params)
# 为图片 URL 添加 CDN 前缀
for tag in tags:
if tag.get('image_url') and not tag['image_url'].startswith('http'):
tag['image_url'] = IMAGE_BASE_URL_CDN1 + tag['image_url']
if tag.get('image_thumb_url') and not tag['image_thumb_url'].startswith('http'):
tag['image_thumb_url'] = IMAGE_BASE_URL_CDN1 + tag['image_thumb_url']
# 格式化时间字段
tags = format_datetime_fields(tags)
return jsonify({'code': 200, 'message': 'success', 'data': {'total': total, 'list': tags}})
except Exception as e:
logger.error(f"[获取图片标签列表] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@image_bp.route('/tags/create', methods=['POST'])
@require_auth
def create_image_tag():
"""创建图片标签"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
user_id = current_user.get('user_id', 0)
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
data = request.get_json()
if not data or not data.get('image_id') or not data.get('tag_id'):
return jsonify({'code': 400, 'message': '图片ID和标签ID不能为空', 'data': None}), 400
db_manager = get_db_manager()
# 检查是否已存在
check_sql = "SELECT id FROM ai_image_tags WHERE enterprise_id = %s AND image_id = %s AND tag_id = %s"
existing = db_manager.execute_query(check_sql, (enterprise_id, data['image_id'], data['tag_id']))
if existing:
return jsonify({'code': 409, 'message': '该标签已存在', 'data': None}), 409
# 插入标签
sql = """
INSERT INTO ai_image_tags
(enterprise_id, image_id, image_name, image_url, image_thumb_url, tag_id, tag_name,
keywords_id, keywords_name, department_id, department_name, created_user_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
tag_record_id = db_manager.execute_insert(sql, (
enterprise_id,
data['image_id'],
data.get('image_name', ''),
data.get('image_url', ''),
data.get('image_thumb_url', ''),
data['tag_id'],
data.get('tag_name', ''),
data.get('keywords_id', 0),
data.get('keywords_name', ''),
data.get('department_id', 0),
data.get('department_name', ''),
user_id
))
logger.info(f"创建图片标签成功: ID {tag_record_id}")
return jsonify({'code': 200, 'message': '创建成功', 'data': {'id': tag_record_id}})
except Exception as e:
logger.error(f"[创建图片标签] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@image_bp.route('/tags/<int:tag_record_id>', methods=['PUT'])
@require_auth
def update_image_tag(tag_record_id):
"""更新图片标签"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
data = request.get_json()
if not data:
return jsonify({'code': 400, 'message': '请求参数错误', 'data': None}), 400
db_manager = get_db_manager()
# 检查标签是否存在
check_sql = "SELECT id FROM ai_image_tags WHERE id = %s AND enterprise_id = %s"
existing = db_manager.execute_query(check_sql, (tag_record_id, enterprise_id))
if not existing:
return jsonify({'code': 404, 'message': '标签不存在', 'data': None}), 404
# 更新标签
update_fields = []
params = []
if 'image_name' in data:
update_fields.append("image_name = %s")
params.append(data['image_name'])
if 'tag_name' in data:
update_fields.append("tag_name = %s")
params.append(data['tag_name'])
if 'keywords_name' in data:
update_fields.append("keywords_name = %s")
params.append(data['keywords_name'])
if 'department_name' in data:
update_fields.append("department_name = %s")
params.append(data['department_name'])
if not update_fields:
return jsonify({'code': 400, 'message': '没有可更新的字段', 'data': None}), 400
params.extend([tag_record_id, enterprise_id])
sql = f"UPDATE ai_image_tags SET {', '.join(update_fields)} WHERE id = %s AND enterprise_id = %s"
db_manager.execute_update(sql, tuple(params))
logger.info(f"更新图片标签成功: ID {tag_record_id}")
return jsonify({'code': 200, 'message': '更新成功', 'data': None})
except Exception as e:
logger.error(f"[更新图片标签] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@image_bp.route('/tags/<int:tag_record_id>', methods=['DELETE'])
@require_auth
def delete_image_tag(tag_record_id):
"""删除图片标签"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 检查标签是否存在
check_sql = "SELECT id FROM ai_image_tags WHERE id = %s AND enterprise_id = %s"
existing = db_manager.execute_query(check_sql, (tag_record_id, enterprise_id))
if not existing:
return jsonify({'code': 404, 'message': '标签不存在', 'data': None}), 404
# 删除标签
sql = "DELETE FROM ai_image_tags WHERE id = %s AND enterprise_id = %s"
db_manager.execute_update(sql, (tag_record_id, enterprise_id))
logger.info(f"删除图片标签成功: ID {tag_record_id}")
return jsonify({'code': 200, 'message': '删除成功', 'data': None})
except Exception as e:
logger.error(f"[删除图片标签] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500