#!/usr/bin/env python # -*- coding: utf-8 -*- """ 图片库管理接口 """ from flask import Blueprint, request, jsonify import logging import os import time import random from datetime import datetime from werkzeug.utils import secure_filename from PIL import Image import io from auth_utils import require_auth, AuthUtils from database_config import get_db_manager, format_datetime_fields from log_utils import log_create, log_update, log_delete, log_error, log_operation from oss_image import SyncImageToOSS logger = logging.getLogger('article_server') # 图片上传目录 #IMAGE_UPLOAD_DIR = "D:/baijiahao/tags_images/Images" IMAGE_UPLOAD_DIR = "/home/work/baijiahao/tags_images/Images" # 图片基础URL #IMAGE_BASE_URL_CDN1 = "http://images11.bxmkb.cn/Images/" IMAGE_BASE_URL_CDN1 = "https://bxmkb-beijing.oss-cn-beijing.aliyuncs.com/Images/" # 支持的图片格式 ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'bmp', 'tiff', 'webp'} # 创建蓝图 image_bp = Blueprint('image', __name__, url_prefix='/api/images') @image_bp.route('/list', methods=['GET']) @require_auth def get_images_list(): """获取图片列表""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 # 获取查询参数 page = int(request.args.get('page', 1)) page_size = int(request.args.get('pageSize', 20)) keyword = request.args.get('keyword', '').strip() product_id = request.args.get('product_id', '').strip() image_type = request.args.get('type', '').strip() # 构建查询条件 where_conditions = ["enterprise_id = %s", "status = %s"] params = [enterprise_id, 'active'] if keyword: where_conditions.append("(image_name LIKE %s OR keywords LIKE %s)") keyword_pattern = f"%{keyword}%" params.extend([keyword_pattern, keyword_pattern]) if product_id: # ✅ 直接使用 ai_images 表的 product_id 字段筛选 where_conditions.append("product_id = %s") params.append(product_id) if image_type: where_conditions.append("image_type_name LIKE %s") params.append(f"%{image_type}%") where_clause = " AND ".join(where_conditions) # 计算偏移量 offset = (page - 1) * page_size db_manager = get_db_manager() # 查询总数 count_sql = f""" SELECT COUNT(*) as total FROM ai_images WHERE {where_clause} """ count_result = db_manager.execute_query(count_sql, params) total = count_result[0]['total'] # 查询图片列表 sql = f""" SELECT id, product_id, product_name, image_name, image_url, image_thumb_url, thumbnail_url, image_type_id, image_type_name, department, keywords, size_type, file_size, width, height, status, created_at, updated_at FROM ai_images WHERE {where_clause} ORDER BY created_at DESC LIMIT %s OFFSET %s """ params.extend([page_size, offset]) images = db_manager.execute_query(sql, params) # 为图片 URL 添加 CDN 前缀 for img in images: if img.get('image_url') and not img['image_url'].startswith('http'): img['image_url'] = IMAGE_BASE_URL_CDN1 + img['image_url'] if img.get('image_thumb_url') and not img['image_thumb_url'].startswith('http'): img['image_thumb_url'] = IMAGE_BASE_URL_CDN1 + img['image_thumb_url'] if img.get('thumbnail_url') and not img['thumbnail_url'].startswith('http'): img['thumbnail_url'] = IMAGE_BASE_URL_CDN1 + img['thumbnail_url'] # ✅ product_id 和 product_name 已经在查询中直接获取,无需额外关联查询 # 关联标签信息(从 ai_image_tags 表直接获取) for img in images: image_id = img.get('id') # 查询关联的标签信息 tags_sql = """ SELECT tag_id, tag_name FROM ai_image_tags WHERE image_id = %s AND enterprise_id = %s ORDER BY created_at """ tags_result = db_manager.execute_query(tags_sql, [image_id, enterprise_id]) # 聚合标签信息到图片对象中 if tags_result and len(tags_result) > 0: img['tags'] = tags_result else: img['tags'] = [] # 格式化时间字段 images = format_datetime_fields(images) logger.info(f"获取图片列表成功,总数: {total}") return jsonify({ 'code': 200, 'message': 'success', 'data': { 'total': total, 'list': images }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取图片列表] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @image_bp.route('/list_dashboard', methods=['GET']) @require_auth def get_images_dashboard(): """获取图片库仪表盘统计数据""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 db_manager = get_db_manager() # 1. 图片总数:查询 ai_images 表 images_total_sql = """ SELECT COUNT(id) as total FROM ai_images WHERE enterprise_id = %s AND status = 'active' """ images_result = db_manager.execute_query(images_total_sql, [enterprise_id]) images_total = images_result[0]['total'] if images_result else 0 # 2. 产品图片总数:✅ 直接从 ai_images 表统计 product_total_sql = """ SELECT COUNT(id) as total FROM ai_images WHERE enterprise_id = %s AND status = 'active' AND product_id > 0 """ product_result = db_manager.execute_query(product_total_sql, [enterprise_id]) product_total = product_result[0]['total'] if product_result else 0 # 3. 产品类型总数:✅ 直接从 ai_images 表统计不同的产品ID product_images_total_sql = """ SELECT COUNT(DISTINCT product_id) as total FROM ai_images WHERE enterprise_id = %s AND status = 'active' AND product_id > 0 """ product_images_result = db_manager.execute_query(product_images_total_sql, [enterprise_id]) product_images_total = product_images_result[0]['total'] if product_images_result else 0 # 4. 场景图片总数:查询 ai_images 表中 image_type_name 包含“场景”的图片 scene_images_total_sql = """ SELECT COUNT(id) as total FROM ai_images WHERE enterprise_id = %s AND status = 'active' AND image_type_name LIKE %s """ scene_images_result = db_manager.execute_query(scene_images_total_sql, [enterprise_id, '%场景%']) scene_images_total = scene_images_result[0]['total'] if scene_images_result else 0 logger.info(f"获取图片库仪表盘数据成功,企业ID: {enterprise_id}") return jsonify({ 'code': 200, 'message': 'success', 'data': { 'images_total': images_total, 'product_total': product_total, 'product_images_total': product_images_total, 'scene_images_total': scene_images_total }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取图片库仪表盘数据] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @image_bp.route('/upload', methods=['POST']) @require_auth def upload_image(): """上传图片(支持真实文件上传)""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') user_id = current_user.get('user_id', 0) if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 # 检查是否有上传的文件 if 'image' in request.files: # 真实文件上传流程 file = request.files['image'] if file.filename == '': return jsonify({ 'code': 400, 'message': '没有选择文件', 'data': None }), 400 # 验证文件类型 file_ext = file.filename.rsplit('.', 1)[1].lower() if '.' in file.filename else '' if file_ext not in ALLOWED_EXTENSIONS: return jsonify({ 'code': 400, 'message': f'不支持的文件格式,仅支持: {", ".join(ALLOWED_EXTENSIONS)}', 'data': None }), 400 # 获取表单数据 image_name = request.form.get('image_name', '') image_type_name = request.form.get('image_type_name', '') image_type_id = request.form.get('image_type_id', '').strip() # ✅ 新增:图片类型ID tag_keywords = request.form.get('tag_keywords', '') product_id = request.form.get('product_id', '').strip() # 产品ID product_name = request.form.get('product_name', '').strip() # ✅ 新增:产品名称 description = request.form.get('description', '') # 可选的图片描述 if not image_type_name: return jsonify({ 'code': 400, 'message': '缺少必需字段: image_type_name', 'data': None }), 400 # 生成新的文件名 current_time = datetime.now() date_str = current_time.strftime('%Y%m%d') timestamp = int(time.time() * 1000) # 毫秒时间戳 random_num = random.randint(100, 999) # 3位随机数 base_filename = f"{timestamp}{random_num}" new_filename = f"{base_filename}.png" thumb_filename = f"{base_filename}_thumb.png" # 创建日期目录 date_dir = os.path.join(IMAGE_UPLOAD_DIR, date_str) os.makedirs(date_dir, exist_ok=True) # 处理图片:优化压缩和生成缩略图 img_width = 0 # 图片宽度 img_height = 0 # 图片高度 file_size = 0 # 文件大小 try: # 读取上传的图片 image_data = file.read() file_size = len(image_data) # 获取文件大小 original_image = Image.open(io.BytesIO(image_data)) # 记录原始尺寸 img_width, img_height = original_image.size # 转换为RGB模式(确保兼容性) if original_image.mode in ('RGBA', 'LA', 'P'): background = Image.new('RGB', original_image.size, (255, 255, 255)) if original_image.mode == 'P': original_image = original_image.convert('RGBA') background.paste(original_image, mask=original_image.split()[-1] if original_image.mode == 'RGBA' else None) original_image = background elif original_image.mode != 'RGB': original_image = original_image.convert('RGB') # 1. 保存优化后的原图 file_path = os.path.join(date_dir, new_filename) # 如果图片过大,先进行适当缩放 max_size = (1920, 1080) if original_image.size[0] > max_size[0] or original_image.size[1] > max_size[1]: original_image.thumbnail(max_size, Image.Resampling.LANCZOS) logger.info(f"[上传图片] 图片尺寸优化: 缩放到 {original_image.size}") # 保存优化后的原图(高质量压缩) original_image.save(file_path, 'PNG', optimize=True, compress_level=6) logger.info(f"[上传图片] 优化原图保存成功: {file_path}") # 2. 生成缩略图 (120x160) thumb_path = os.path.join(date_dir, thumb_filename) thumb_image = original_image.copy() # 使用高质量重采样算法生成缩略图 thumb_size = (120, 160) # 计算缩放比例,保持宽高比 img_ratio = thumb_image.size[0] / thumb_image.size[1] thumb_ratio = thumb_size[0] / thumb_size[1] if img_ratio > thumb_ratio: new_height = thumb_size[1] new_width = int(new_height * img_ratio) thumb_image = thumb_image.resize((new_width, new_height), Image.Resampling.LANCZOS) left = (new_width - thumb_size[0]) // 2 thumb_image = thumb_image.crop((left, 0, left + thumb_size[0], thumb_size[1])) else: new_width = thumb_size[0] new_height = int(new_width / img_ratio) thumb_image = thumb_image.resize((new_width, new_height), Image.Resampling.LANCZOS) top = (new_height - thumb_size[1]) // 2 thumb_image = thumb_image.crop((0, top, thumb_size[0], top + thumb_size[1])) # 保存缩略图 thumb_image.save(thumb_path, 'PNG', optimize=True, compress_level=9) logger.info(f"[上传图片] 缩略图生成成功: {thumb_path} (尺寸: {thumb_image.size})") except Exception as img_error: logger.error(f"[上传图片] 图片处理失败: {str(img_error)}", exc_info=True) file.seek(0) file_path = os.path.join(date_dir, new_filename) file.save(file_path) logger.info(f"[上传图片] 回退保存成功: {file_path}") # 生成相对路径用于数据库存储 relative_path = f"{date_str}/{new_filename}" thumb_relative_path = f"{date_str}/{thumb_filename}" # 图片上传成功后,调用 TransformerImage 方法处理原图、缩图 try: logger.info(f"[上传图片] 开始调用 TransformerImage 处理图片") # 创建 SyncImageToOSS 实例 sync_oss = SyncImageToOSS() # 调用 TransformerImage 方法处理原图 original_result = sync_oss.TransformerImage(file_path) logger.info(f"[上传图片] 原图上传结果: {original_result}") # 调用 TransformerImage 方法处理缩图 thumb_result = sync_oss.TransformerImage(thumb_path) logger.info(f"[上传图片] 缩图上传结果: {thumb_result}") # 检查所有上传是否成功 if not (original_result['success'] and thumb_result['success']): logger.warning(f"[上传图片] OSS上传部分失败 - 原图: {original_result['success']}, 缩图: {thumb_result['success']}") except Exception as e: logger.error(f"[上传图片] TransformerImage 调用失败: {str(e)}") db_manager = get_db_manager() # (1)插入 ai_images 表 image_sql = """ INSERT INTO ai_images (enterprise_id, product_id, product_name, image_name, image_url, image_thumb_url, thumbnail_url, image_type_id, image_type_name, department, keywords, size_type, file_size, width, height, upload_user_id, status) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ image_id = db_manager.execute_insert(image_sql, ( enterprise_id, int(product_id) if product_id and product_id.isdigit() else 0, # product_id product_name, # ✅ 新增:product_name image_name or new_filename, relative_path, thumb_relative_path, thumb_relative_path, int(image_type_id) if image_type_id and image_type_id.isdigit() else 0, # ✅ 新增:image_type_id image_type_name, '', tag_keywords, 'medical', file_size, # ✅ 新增:file_size img_width, # ✅ 新增:width img_height, # ✅ 新增:height user_id, 'active' )) logger.info(f"[上传图片] 步骤(1) ai_images 入库成功: image_id={image_id}") # (2)处理 tag_keywords,插入 ai_image_tags_name 表 tag_ids_map = {} if tag_keywords: tags = [tag.strip() for tag in tag_keywords.split(',') if tag.strip()] for tag_name in tags: check_tag_sql = "SELECT id FROM ai_image_tags_name WHERE enterprise_id = %s AND tag_name = %s" existing_tag = db_manager.execute_query(check_tag_sql, (enterprise_id, tag_name)) if existing_tag: tag_id = existing_tag[0]['id'] logger.info(f"[上传图片] 步骤(2) 标签已存在: tag_name={tag_name}, tag_id={tag_id}") else: insert_tag_sql = """ INSERT INTO ai_image_tags_name (enterprise_id, tag_name, status) VALUES (%s, %s, %s) """ tag_id = db_manager.execute_insert(insert_tag_sql, (enterprise_id, tag_name, 'active')) logger.info(f"[上传图片] 步骤(2) 新标签入库成功: tag_name={tag_name}, tag_id={tag_id}") tag_ids_map[tag_name] = tag_id # (3)插入 ai_image_tags 关系映射表 relation_count = 0 for tag_name, tag_id in tag_ids_map.items(): try: relation_sql = """ INSERT INTO ai_image_tags (enterprise_id, product_id, image_id, image_name, image_url, image_thumb_url, tag_id, tag_name, created_user_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """ db_manager.execute_insert(relation_sql, ( enterprise_id, int(product_id) if product_id and product_id.isdigit() else 0, # product_id image_id, image_name or new_filename, relative_path, thumb_relative_path, tag_id, tag_name, user_id )) relation_count += 1 logger.info(f"[上传图片] 步骤(3) 关系映射入库成功: image_id={image_id}, tag_id={tag_id}, tag_name={tag_name}, product_id={product_id}") except Exception as rel_error: logger.warning(f"[上传图片] 步骤(3) 关系映射已存在: image_id={image_id}, tag_id={tag_id}, error={str(rel_error)}") # (4)插入 ai_product_images 产品图片关联表(只在有 product_id 时插入) product_image_id = None if product_id and product_id.isdigit(): try: product_image_sql = """ INSERT INTO ai_product_images (enterprise_id, product_id, product_name, image_id, image_name, image_url, thumbnail_url, type_name, description, file_size, width, height, upload_user_id, status) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ product_image_id = db_manager.execute_insert(product_image_sql, ( enterprise_id, int(product_id), product_name, # ✅ 新增冗余字段:product_name image_id, image_name or new_filename, relative_path, thumb_relative_path, image_type_name, description, file_size, img_width, img_height, user_id, 'active' )) logger.info(f"[上传图片] 步骤(4) ai_product_images 入库成功: product_image_id={product_image_id}, product_id={product_id}, image_id={image_id}") except Exception as prod_error: logger.error(f"[上传图片] 步骤(4) ai_product_images 入库失败: {str(prod_error)}", exc_info=True) logger.info(f"[上传图片] 全部完成: image_id={image_id}, 标签数={len(tag_ids_map)}, 关系数={relation_count}, product_image_id={product_image_id}") # 构建返回数据 response_data = { 'id': image_id, 'url': IMAGE_BASE_URL_CDN1 + relative_path, 'relative_path': relative_path, 'image_thumb_url': IMAGE_BASE_URL_CDN1 + thumb_relative_path, 'thumb_relative_path': thumb_relative_path, 'tags_count': len(tag_ids_map), 'relations_count': relation_count, 'width': img_width, 'height': img_height, 'file_size': file_size } # 如果插入了产品图片关联,添加相关信息 if product_image_id: response_data['product_image_id'] = product_image_id response_data['product_id'] = int(product_id) return jsonify({ 'code': 200, 'message': '上传成功', 'data': response_data, 'timestamp': int(datetime.now().timestamp() * 1000) }) else: # 必须上传真实文件 return jsonify({ 'code': 400, 'message': '缺少上传文件,请使用 multipart/form-data 上传真实图片文件', 'data': None }), 400 except Exception as e: logger.error(f"[上传图片] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @image_bp.route('/batch-upload', methods=['POST']) @require_auth def batch_upload_images(): """批量上传图片""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 data = request.get_json() if not data or not data.get('images'): return jsonify({ 'code': 400, 'message': '请求参数错误', 'data': None }), 400 db_manager = get_db_manager() uploaded_ids = [] for img in data['images']: # 验证必需字段 if not img.get('image_url') or not img.get('image_type_name'): continue # 插入图片记录 sql = """ INSERT INTO ai_images (image_name, image_url, image_thumb_url, thumbnail_url, image_type_id, image_type_name, department, keywords, size_type, upload_user_id, status) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ image_id = db_manager.execute_insert(sql, ( img.get('image_name', ''), img['image_url'], img.get('image_thumb_url', ''), img.get('thumbnail_url', ''), img.get('image_type_id', 0), img['image_type_name'], img.get('department', ''), img.get('keywords', ''), img.get('size_type', 'medical'), current_user.get('user_id', 0), 'active' )) uploaded_ids.append(image_id) logger.info(f"批量上传图片成功: {len(uploaded_ids)}张") return jsonify({ 'code': 200, 'message': '批量上传成功', 'data': { 'uploaded_count': len(uploaded_ids), 'ids': uploaded_ids }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[批量上传图片] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @image_bp.route('/', methods=['DELETE']) @require_auth def delete_image(image_id): """删除图片""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 db_manager = get_db_manager() # 检查图片是否存在 check_sql = "SELECT id FROM ai_images WHERE id = %s" existing = db_manager.execute_query(check_sql, (image_id,)) if not existing: return jsonify({ 'code': 404, 'message': '图片不存在', 'data': None }), 404 # 删除图片记录(软删除) sql = "UPDATE ai_images SET status = 'deleted' WHERE id = %s" db_manager.execute_update(sql, (image_id,)) logger.info(f"删除图片成功: ID {image_id}") return jsonify({ 'code': 200, 'message': '删除成功', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[删除图片] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 # ==================== 图片标签名称管理 (ai_image_tags_name) ==================== @image_bp.route('/tags/names/list', methods=['GET']) @require_auth def get_tag_names_list(): """获取标签名称列表""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 page = int(request.args.get('page', 1)) page_size = int(request.args.get('pageSize', 20)) keyword = request.args.get('keyword', '').strip() status = request.args.get('status', '').strip() where_conditions = ["enterprise_id = %s"] params = [enterprise_id] if keyword: where_conditions.append("(tag_name LIKE %s OR description LIKE %s)") keyword_pattern = f"%{keyword}%" params.extend([keyword_pattern, keyword_pattern]) if status: where_conditions.append("status = %s") params.append(status) where_clause = " AND ".join(where_conditions) offset = (page - 1) * page_size db_manager = get_db_manager() # 查询总数 count_sql = f"SELECT COUNT(*) as total FROM ai_image_tags_name WHERE {where_clause}" count_result = db_manager.execute_query(count_sql, params) total = count_result[0]['total'] # 查询列表 sql = f""" SELECT id, enterprise_id, tag_name, tag_category, department, description, status, created_at, updated_at FROM ai_image_tags_name WHERE {where_clause} ORDER BY created_at DESC LIMIT %s OFFSET %s """ params.extend([page_size, offset]) tags = db_manager.execute_query(sql, params) # 格式化时间字段 tags = format_datetime_fields(tags) return jsonify({'code': 200, 'message': 'success', 'data': {'total': total, 'list': tags}}) except Exception as e: logger.error(f"[获取标签名称列表] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @image_bp.route('/tags/names/create', methods=['POST']) @require_auth def create_tag_name(): """创建标签名称""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 data = request.get_json() if not data or not data.get('tag_name'): return jsonify({'code': 400, 'message': '标签名称不能为空', 'data': None}), 400 db_manager = get_db_manager() # 检查标签名称是否已存在 check_sql = "SELECT id FROM ai_image_tags_name WHERE enterprise_id = %s AND tag_name = %s" existing = db_manager.execute_query(check_sql, (enterprise_id, data['tag_name'])) if existing: return jsonify({'code': 409, 'message': '标签名称已存在', 'data': None}), 409 # 插入标签名称 sql = """ INSERT INTO ai_image_tags_name (enterprise_id, tag_name, tag_category, department, description, status) VALUES (%s, %s, %s, %s, %s, %s) """ tag_id = db_manager.execute_insert(sql, ( enterprise_id, data['tag_name'], data.get('tag_category', ''), data.get('department', ''), data.get('description', ''), data.get('status', 'active') )) logger.info(f"创建标签名称成功: ID {tag_id}") return jsonify({'code': 200, 'message': '创建成功', 'data': {'id': tag_id}}) except Exception as e: logger.error(f"[创建标签名称] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @image_bp.route('/tags/names/', methods=['PUT']) @require_auth def update_tag_name(tag_id): """更新标签名称""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 data = request.get_json() if not data: return jsonify({'code': 400, 'message': '请求参数错误', 'data': None}), 400 db_manager = get_db_manager() # 检查标签是否存在 check_sql = "SELECT id FROM ai_image_tags_name WHERE id = %s AND enterprise_id = %s" existing = db_manager.execute_query(check_sql, (tag_id, enterprise_id)) if not existing: return jsonify({'code': 404, 'message': '标签不存在', 'data': None}), 404 # 更新标签 update_fields = [] params = [] if 'tag_name' in data: update_fields.append("tag_name = %s") params.append(data['tag_name']) if 'tag_category' in data: update_fields.append("tag_category = %s") params.append(data['tag_category']) if 'department' in data: update_fields.append("department = %s") params.append(data['department']) if 'description' in data: update_fields.append("description = %s") params.append(data['description']) if 'status' in data: update_fields.append("status = %s") params.append(data['status']) if not update_fields: return jsonify({'code': 400, 'message': '没有可更新的字段', 'data': None}), 400 params.extend([tag_id, enterprise_id]) sql = f"UPDATE ai_image_tags_name SET {', '.join(update_fields)} WHERE id = %s AND enterprise_id = %s" db_manager.execute_update(sql, tuple(params)) logger.info(f"更新标签名称成功: ID {tag_id}") return jsonify({'code': 200, 'message': '更新成功', 'data': None}) except Exception as e: logger.error(f"[更新标签名称] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @image_bp.route('/tags/names/', methods=['DELETE']) @require_auth def delete_tag_name(tag_id): """删除标签名称""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 db_manager = get_db_manager() # 检查标签是否存在 check_sql = "SELECT id FROM ai_image_tags_name WHERE id = %s AND enterprise_id = %s" existing = db_manager.execute_query(check_sql, (tag_id, enterprise_id)) if not existing: return jsonify({'code': 404, 'message': '标签不存在', 'data': None}), 404 # 删除标签(物理删除) sql = "DELETE FROM ai_image_tags_name WHERE id = %s AND enterprise_id = %s" db_manager.execute_update(sql, (tag_id, enterprise_id)) logger.info(f"删除标签名称成功: ID {tag_id}") return jsonify({'code': 200, 'message': '删除成功', 'data': None}) except Exception as e: logger.error(f"[删除标签名称] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 # ==================== 图片标签关系管理 (ai_image_tags) ==================== @image_bp.route('/tags/relations/list', methods=['GET']) @require_auth def get_tag_relations_list(): """获取图片标签关系列表""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 page = int(request.args.get('page', 1)) page_size = int(request.args.get('pageSize', 20)) image_id = request.args.get('image_id', '').strip() tag_id = request.args.get('tag_id', '').strip() where_conditions = ["enterprise_id = %s"] params = [enterprise_id] if image_id: where_conditions.append("image_id = %s") params.append(image_id) if tag_id: where_conditions.append("tag_id = %s") params.append(tag_id) where_clause = " AND ".join(where_conditions) offset = (page - 1) * page_size db_manager = get_db_manager() # 查询总数 count_sql = f"SELECT COUNT(*) as total FROM ai_image_tags WHERE {where_clause}" count_result = db_manager.execute_query(count_sql, params) total = count_result[0]['total'] # 查询列表 sql = f""" SELECT id, enterprise_id, image_id, tag_id, tag_name, created_at, updated_at FROM ai_image_tags WHERE {where_clause} ORDER BY created_at DESC LIMIT %s OFFSET %s """ params.extend([page_size, offset]) relations = db_manager.execute_query(sql, params) # 格式化时间字段 relations = format_datetime_fields(relations) return jsonify({'code': 200, 'message': 'success', 'data': {'total': total, 'list': relations}}) except Exception as e: logger.error(f"[获取图片标签关系列表] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @image_bp.route('/tags/relations/create', methods=['POST']) @require_auth def create_tag_relation(): """创建图片标签关系""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') user_id = current_user.get('user_id', 0) if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 data = request.get_json() if not data or not data.get('image_id') or not data.get('tag_id'): return jsonify({'code': 400, 'message': '图片ID和标签ID不能为空', 'data': None}), 400 db_manager = get_db_manager() # 检查关系是否已存在 check_sql = "SELECT id FROM ai_image_tags WHERE enterprise_id = %s AND image_id = %s AND tag_id = %s" existing = db_manager.execute_query(check_sql, (enterprise_id, data['image_id'], data['tag_id'])) if existing: return jsonify({'code': 409, 'message': '该标签关系已存在', 'data': None}), 409 # 插入关系 sql = """ INSERT INTO ai_image_tags (enterprise_id, image_id, tag_id, tag_name) VALUES (%s, %s, %s, %s) """ relation_id = db_manager.execute_insert(sql, ( enterprise_id, data['image_id'], data['tag_id'], data.get('tag_name', '') )) logger.info(f"创建图片标签关系成功: ID {relation_id}") return jsonify({'code': 200, 'message': '创建成功', 'data': {'id': relation_id}}) except Exception as e: logger.error(f"[创建图片标签关系] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @image_bp.route('/tags/relations/', methods=['DELETE']) @require_auth def delete_tag_relation(relation_id): """删除图片标签关系""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 db_manager = get_db_manager() # 检查关系是否存在 check_sql = "SELECT id FROM ai_image_tags WHERE id = %s AND enterprise_id = %s" existing = db_manager.execute_query(check_sql, (relation_id, enterprise_id)) if not existing: return jsonify({'code': 404, 'message': '标签关系不存在', 'data': None}), 404 # 删除关系 sql = "DELETE FROM ai_image_tags WHERE id = %s AND enterprise_id = %s" db_manager.execute_update(sql, (relation_id, enterprise_id)) logger.info(f"删除图片标签关系成功: ID {relation_id}") return jsonify({'code': 200, 'message': '删除成功', 'data': None}) except Exception as e: logger.error(f"[删除图片标签关系] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 # ==================== 图片类型管理 (ai_image_type) ==================== @image_bp.route('/types/list', methods=['GET']) @require_auth def get_image_types_list(): """获取图片类型列表""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') logger.info(f"[获取图片类型列表] current_user: {current_user}, enterprise_id: {enterprise_id}") if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 page = int(request.args.get('page', 1)) page_size = int(request.args.get('pageSize', 20)) keyword = request.args.get('keyword', '').strip() where_conditions = ["enterprise_id = %s"] params = [enterprise_id] if keyword: where_conditions.append("(type_name LIKE %s OR keywords_name LIKE %s)") keyword_pattern = f"%{keyword}%" params.extend([keyword_pattern, keyword_pattern]) where_clause = " AND ".join(where_conditions) offset = (page - 1) * page_size db_manager = get_db_manager() # 查询总数 count_sql = f"SELECT COUNT(*) as total FROM ai_image_type WHERE {where_clause}" count_result = db_manager.execute_query(count_sql, params) total = count_result[0]['total'] # 查询列表 sql = f""" SELECT id, enterprise_id, type_name, keywords_id, keywords_name, department_id, department_name, created_user_id, created_at, updated_at FROM ai_image_type WHERE {where_clause} ORDER BY created_at DESC LIMIT %s OFFSET %s """ params.extend([page_size, offset]) types = db_manager.execute_query(sql, params) # 格式化时间字段 types = format_datetime_fields(types) return jsonify({'code': 200, 'message': 'success', 'data': {'total': total, 'list': types}}) except Exception as e: logger.error(f"[获取图片类型列表] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @image_bp.route('/types/create', methods=['POST']) @require_auth def create_image_type(): """创建图片类型""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') user_id = current_user.get('user_id', 0) if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 data = request.get_json() if not data or not data.get('type_name'): return jsonify({'code': 400, 'message': '类型名称不能为空', 'data': None}), 400 db_manager = get_db_manager() # 插入类型 sql = """ INSERT INTO ai_image_type (enterprise_id, type_name, keywords_id, keywords_name, department_id, department_name, created_user_id) VALUES (%s, %s, %s, %s, %s, %s, %s) """ type_id = db_manager.execute_insert(sql, ( enterprise_id, data['type_name'], data.get('keywords_id', 0), data.get('keywords_name', ''), data.get('department_id', 0), data.get('department_name', ''), user_id )) logger.info(f"创建图片类型成功: ID {type_id}") return jsonify({'code': 200, 'message': '创建成功', 'data': {'id': type_id}}) except Exception as e: logger.error(f"[创建图片类型] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @image_bp.route('/types/', methods=['PUT']) @require_auth def update_image_type(type_id): """更新图片类型""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 data = request.get_json() if not data: return jsonify({'code': 400, 'message': '请求参数错误', 'data': None}), 400 db_manager = get_db_manager() # 检查类型是否存在 check_sql = "SELECT id FROM ai_image_type WHERE id = %s AND enterprise_id = %s" existing = db_manager.execute_query(check_sql, (type_id, enterprise_id)) if not existing: return jsonify({'code': 404, 'message': '图片类型不存在', 'data': None}), 404 # 更新类型 update_fields = [] params = [] if 'type_name' in data: update_fields.append("type_name = %s") params.append(data['type_name']) if 'keywords_id' in data: update_fields.append("keywords_id = %s") params.append(data['keywords_id']) if 'keywords_name' in data: update_fields.append("keywords_name = %s") params.append(data['keywords_name']) if 'department_id' in data: update_fields.append("department_id = %s") params.append(data['department_id']) if 'department_name' in data: update_fields.append("department_name = %s") params.append(data['department_name']) if not update_fields: return jsonify({'code': 400, 'message': '没有可更新的字段', 'data': None}), 400 params.extend([type_id, enterprise_id]) sql = f"UPDATE ai_image_type SET {', '.join(update_fields)} WHERE id = %s AND enterprise_id = %s" db_manager.execute_update(sql, tuple(params)) logger.info(f"更新图片类型成功: ID {type_id}") return jsonify({'code': 200, 'message': '更新成功', 'data': None}) except Exception as e: logger.error(f"[更新图片类型] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @image_bp.route('/types/', methods=['DELETE']) @require_auth def delete_image_type(type_id): """删除图片类型""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 db_manager = get_db_manager() # 检查类型是否存在 check_sql = "SELECT id FROM ai_image_type WHERE id = %s AND enterprise_id = %s" existing = db_manager.execute_query(check_sql, (type_id, enterprise_id)) if not existing: return jsonify({'code': 404, 'message': '图片类型不存在', 'data': None}), 404 # 删除类型 sql = "DELETE FROM ai_image_type WHERE id = %s AND enterprise_id = %s" db_manager.execute_update(sql, (type_id, enterprise_id)) logger.info(f"删除图片类型成功: ID {type_id}") return jsonify({'code': 200, 'message': '删除成功', 'data': None}) except Exception as e: logger.error(f"[删除图片类型] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 # ==================== 图片标签管理 (ai_image_tags) ==================== @image_bp.route('/tags/list', methods=['GET']) @require_auth def get_image_tags_list(): """获取图片标签列表(综合表)""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 page = int(request.args.get('page', 1)) page_size = int(request.args.get('pageSize', 20)) image_id = request.args.get('image_id', '').strip() tag_id = request.args.get('tag_id', '').strip() department_id = request.args.get('department_id', '').strip() where_conditions = ["enterprise_id = %s"] params = [enterprise_id] if image_id: where_conditions.append("image_id = %s") params.append(image_id) if tag_id: where_conditions.append("tag_id = %s") params.append(tag_id) if department_id: where_conditions.append("department_id = %s") params.append(department_id) where_clause = " AND ".join(where_conditions) offset = (page - 1) * page_size db_manager = get_db_manager() # 查询总数 count_sql = f"SELECT COUNT(*) as total FROM ai_image_tags WHERE {where_clause}" count_result = db_manager.execute_query(count_sql, params) total = count_result[0]['total'] # 查询列表 sql = f""" SELECT id, enterprise_id, image_id, image_name, image_url, image_thumb_url, tag_id, tag_name, keywords_id, keywords_name, department_id, department_name, created_user_id, created_at, updated_at FROM ai_image_tags WHERE {where_clause} ORDER BY created_at DESC LIMIT %s OFFSET %s """ params.extend([page_size, offset]) tags = db_manager.execute_query(sql, params) # 为图片 URL 添加 CDN 前缀 for tag in tags: if tag.get('image_url') and not tag['image_url'].startswith('http'): tag['image_url'] = IMAGE_BASE_URL_CDN1 + tag['image_url'] if tag.get('image_thumb_url') and not tag['image_thumb_url'].startswith('http'): tag['image_thumb_url'] = IMAGE_BASE_URL_CDN1 + tag['image_thumb_url'] # 格式化时间字段 tags = format_datetime_fields(tags) return jsonify({'code': 200, 'message': 'success', 'data': {'total': total, 'list': tags}}) except Exception as e: logger.error(f"[获取图片标签列表] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @image_bp.route('/tags/create', methods=['POST']) @require_auth def create_image_tag(): """创建图片标签""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') user_id = current_user.get('user_id', 0) if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 data = request.get_json() if not data or not data.get('image_id') or not data.get('tag_id'): return jsonify({'code': 400, 'message': '图片ID和标签ID不能为空', 'data': None}), 400 db_manager = get_db_manager() # 检查是否已存在 check_sql = "SELECT id FROM ai_image_tags WHERE enterprise_id = %s AND image_id = %s AND tag_id = %s" existing = db_manager.execute_query(check_sql, (enterprise_id, data['image_id'], data['tag_id'])) if existing: return jsonify({'code': 409, 'message': '该标签已存在', 'data': None}), 409 # 插入标签 sql = """ INSERT INTO ai_image_tags (enterprise_id, image_id, image_name, image_url, image_thumb_url, tag_id, tag_name, keywords_id, keywords_name, department_id, department_name, created_user_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ tag_record_id = db_manager.execute_insert(sql, ( enterprise_id, data['image_id'], data.get('image_name', ''), data.get('image_url', ''), data.get('image_thumb_url', ''), data['tag_id'], data.get('tag_name', ''), data.get('keywords_id', 0), data.get('keywords_name', ''), data.get('department_id', 0), data.get('department_name', ''), user_id )) logger.info(f"创建图片标签成功: ID {tag_record_id}") return jsonify({'code': 200, 'message': '创建成功', 'data': {'id': tag_record_id}}) except Exception as e: logger.error(f"[创建图片标签] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @image_bp.route('/tags/', methods=['PUT']) @require_auth def update_image_tag(tag_record_id): """更新图片标签""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 data = request.get_json() if not data: return jsonify({'code': 400, 'message': '请求参数错误', 'data': None}), 400 db_manager = get_db_manager() # 检查标签是否存在 check_sql = "SELECT id FROM ai_image_tags WHERE id = %s AND enterprise_id = %s" existing = db_manager.execute_query(check_sql, (tag_record_id, enterprise_id)) if not existing: return jsonify({'code': 404, 'message': '标签不存在', 'data': None}), 404 # 更新标签 update_fields = [] params = [] if 'image_name' in data: update_fields.append("image_name = %s") params.append(data['image_name']) if 'tag_name' in data: update_fields.append("tag_name = %s") params.append(data['tag_name']) if 'keywords_name' in data: update_fields.append("keywords_name = %s") params.append(data['keywords_name']) if 'department_name' in data: update_fields.append("department_name = %s") params.append(data['department_name']) if not update_fields: return jsonify({'code': 400, 'message': '没有可更新的字段', 'data': None}), 400 params.extend([tag_record_id, enterprise_id]) sql = f"UPDATE ai_image_tags SET {', '.join(update_fields)} WHERE id = %s AND enterprise_id = %s" db_manager.execute_update(sql, tuple(params)) logger.info(f"更新图片标签成功: ID {tag_record_id}") return jsonify({'code': 200, 'message': '更新成功', 'data': None}) except Exception as e: logger.error(f"[更新图片标签] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @image_bp.route('/tags/', methods=['DELETE']) @require_auth def delete_image_tag(tag_record_id): """删除图片标签""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 db_manager = get_db_manager() # 检查标签是否存在 check_sql = "SELECT id FROM ai_image_tags WHERE id = %s AND enterprise_id = %s" existing = db_manager.execute_query(check_sql, (tag_record_id, enterprise_id)) if not existing: return jsonify({'code': 404, 'message': '标签不存在', 'data': None}), 404 # 删除标签 sql = "DELETE FROM ai_image_tags WHERE id = %s AND enterprise_id = %s" db_manager.execute_update(sql, (tag_record_id, enterprise_id)) logger.info(f"删除图片标签成功: ID {tag_record_id}") return jsonify({'code': 200, 'message': '删除成功', 'data': None}) except Exception as e: logger.error(f"[删除图片标签] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500