#!/usr/bin/env python # -*- coding: utf-8 -*- """ 产品管理接口 """ from flask import Blueprint, request, jsonify import logging from datetime import datetime from auth_utils import require_auth, AuthUtils from database_config import get_db_manager, format_datetime_fields from log_utils import log_create, log_update, log_delete, log_error, log_operation logger = logging.getLogger('article_server') # 创建蓝图 product_bp = Blueprint('product', __name__, url_prefix='/api/products') @product_bp.route('/list_info', methods=['GET']) @require_auth def get_products_list_info(): """获取产品列表(简化版,仅返回5个字段)""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[获取产品列表简化版] 开始处理请求, IP: {client_ip}") try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') logger.info(f"[获取产品列表简化版] 用户信息 - 用户ID: {current_user.get('user_id')}, 企业ID: {enterprise_id}, IP: {client_ip}") if not enterprise_id: logger.warning(f"[获取产品列表简化版] 无法获取企业ID, IP: {client_ip}") return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 db_manager = get_db_manager() # 查询产品列表(仅返回5个字段) # 使用 IN 替代 != 以优化索引使用 sql = """ SELECT id, name, status, created_at, updated_at FROM ai_products WHERE enterprise_id = %s AND status IN ('draft', 'active') ORDER BY created_at DESC """ products = db_manager.execute_query(sql, (enterprise_id,)) # 格式化日期时间字段 products = format_datetime_fields(products) logger.info(f"[获取产品列表简化版] 查询成功, 返回数量: {len(products)}, 企业ID: {enterprise_id}, IP: {client_ip}") return jsonify({ 'code': 200, 'message': 'success', 'data': { 'list': products }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取产品列表简化版] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @product_bp.route('/list', methods=['GET']) @require_auth def get_products_list(): """获取产品列表""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[获取产品列表] 开始处理请求, IP: {client_ip}") try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') logger.info(f"[获取产品列表] 用户信息 - 用户ID: {current_user.get('user_id')}, 企业ID: {enterprise_id}, IP: {client_ip}") if not enterprise_id: logger.warning(f"[获取产品列表] 无法获取企业ID, IP: {client_ip}") return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 # 获取查询参数 page = int(request.args.get('page', 1)) page_size = int(request.args.get('pageSize', 20)) keyword = request.args.get('keyword', '').strip() type_id = request.args.get('type', '').strip() status = request.args.get('status', '').strip() logger.info(f"[获取产品列表] 查询参数: page={page}, pageSize={page_size}, keyword={keyword}, type={type_id}, status={status}, 企业ID: {enterprise_id}, IP: {client_ip}") # 构建查询条件 where_conditions = ["enterprise_id = %s", "status != 'deleted'"] params = [enterprise_id] if keyword: where_conditions.append("name LIKE %s") params.append(f"%{keyword}%") if type_id: where_conditions.append("type_name LIKE %s") params.append(f"%{type_id}%") if status: where_conditions.append("status = %s") params.append(status) where_clause = " AND ".join(where_conditions) # 计算偏移量 offset = (page - 1) * page_size db_manager = get_db_manager() # 查询总数 count_sql = f"SELECT COUNT(*) as total FROM ai_products WHERE {where_clause}" count_result = db_manager.execute_query(count_sql, params) total = count_result[0]['total'] # 查询产品列表 sql = f""" SELECT p.id, p.name, p.type_name, p.knowledge, p.status, p.articles_total, p.published_total, p.image_url, p.image_thumbnail_url, p.created_at, p.updated_at FROM ai_products p WHERE {where_clause} ORDER BY p.created_at DESC LIMIT %s OFFSET %s """ params.extend([page_size, offset]) products = db_manager.execute_query(sql, params) # 查询产品标签 for product in products: tag_sql = """ SELECT id, tag_name FROM ai_product_tags WHERE product_id = %s """ tags = db_manager.execute_query(tag_sql, (product['id'],)) product['tags'] = tags # 格式化日期时间字段 products = format_datetime_fields(products) logger.info(f"[获取产品列表] 查询成功, 总数: {total}, 当前页: {page}, 每页: {page_size}, 返回数量: {len(products)}, 企业ID: {enterprise_id}, IP: {client_ip}") return jsonify({ 'code': 200, 'message': 'success', 'data': { 'total': total, 'list': products }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取产品列表] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @product_bp.route('/create', methods=['POST']) @require_auth def create_product(): """创建产品""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[创建产品] 开始处理创建产品请求, IP: {client_ip}") try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') logger.info(f"[创建产品] 用户信息 - 用户ID: {current_user.get('user_id')}, 企业ID: {enterprise_id}, IP: {client_ip}") if not enterprise_id: logger.warning(f"[创建产品] 无法获取企业ID, IP: {client_ip}") return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 data = request.get_json() if not data: logger.warning(f"[创建产品] 请求参数为空, 企业ID: {enterprise_id}, IP: {client_ip}") return jsonify({ 'code': 400, 'message': '请求参数错误', 'data': None }), 400 logger.info(f"[创建产品] 收到创建请求, 产品名称: {data.get('name')}, 类型: {data.get('type_name') or data.get('type_id')}, 企业ID: {enterprise_id}, IP: {client_ip}") # 验证必需字段 required_fields = ['name', 'type_id', 'knowledge', 'type_name'] for field in required_fields: if not data.get(field): logger.warning(f"[创建产品] 缺少必需字段: {field}, 企业ID: {enterprise_id}, IP: {client_ip}") return jsonify({ 'code': 400, 'message': f'缺少必需字段: {field}', 'data': None }), 400 db_manager = get_db_manager() # 产品名称判重:检查同一企业下是否已存在相同名称的产品 logger.info(f"[创建产品] 开始检查产品名称是否重复, 名称: {data['name']}, 企业ID: {enterprise_id}") check_duplicate_sql = """ SELECT id, name FROM ai_products WHERE enterprise_id = %s AND name = %s AND status != 'deleted' """ existing_product = db_manager.execute_query(check_duplicate_sql, (enterprise_id, data['name'])) if existing_product: logger.warning(f"[创建产品] 产品名称已存在, 名称: {data['name']}, 已存在ID: {existing_product[0]['id']}, 企业ID: {enterprise_id}, IP: {client_ip}") product_name = data['name'] return jsonify({ 'code': 409, 'message': f'产品名称「{product_name}」已存在,请使用其他名称', 'data': None }), 409 # 创建产品 logger.info(f"[创建产品] 开始插入产品数据, 名称: {data['name']}, 企业ID: {enterprise_id}") sql = """ INSERT INTO ai_products (enterprise_id, name, type_id, type_name, knowledge, status, articles_total, published_total, image_url, image_thumbnail_url) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ product_id = db_manager.execute_insert(sql, ( enterprise_id, data['name'], data.get('type_id', 0), data.get('type_name', ''), data['knowledge'], 'active', 0, 0, # articles_total, published_total data.get('image_url', ''), data.get('image_thumbnail_url', '') )) logger.info(f"[创建产品] 产品插入成功, ID: {product_id}, 名称: {data['name']}, 企业ID: {enterprise_id}") # 添加标签 if data.get('tags'): logger.info(f"[创建产品] 开始添加产品标签, 产品ID: {product_id}, 标签数量: {len(data['tags'])}") for tag_name in data['tags']: tag_sql = "INSERT INTO ai_product_tags (enterprise_id, product_id, tag_name) VALUES (%s, %s, %s)" db_manager.execute_insert(tag_sql, (enterprise_id, product_id, tag_name)) logger.info(f"[创建产品] 产品标签添加完成, 产品ID: {product_id}, 数量: {len(data['tags'])}") # 更新企业产品总数 logger.info(f"[创建产品] 开始更新企业产品总数, 企业ID: {enterprise_id}") update_sql = "UPDATE ai_enterprises SET products_total = products_total + 1 WHERE id = %s" db_manager.execute_update(update_sql, (enterprise_id,)) logger.info(f"[创建产品] 产品创建全部完成, ID: {product_id}, 名称: {data['name']}, 企业ID: {enterprise_id}, IP: {client_ip}") # 记录业务日志 log_create( target_type='product', target_id=product_id, description=f"创建产品: {data['name']}", request_data=data, response_data={'id': product_id} ) return jsonify({ 'code': 200, 'message': '创建成功', 'data': {'id': product_id}, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[创建产品] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @product_bp.route('/', methods=['PUT']) @require_auth def update_product(product_id): """更新产品""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 data = request.get_json() if not data: return jsonify({ 'code': 400, 'message': '请求参数错误', 'data': None }), 400 db_manager = get_db_manager() # 检查产品是否存在且属于当前企业 check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'" existing = db_manager.execute_query(check_sql, (product_id, enterprise_id)) if not existing: return jsonify({ 'code': 404, 'message': '产品不存在', 'data': None }), 404 # 构建更新字段 update_fields = [] params = [] if 'name' in data: update_fields.append("name = %s") params.append(data['name']) if 'type_name' in data or 'type_id' in data: update_fields.append("type_name = %s") params.append(data.get('type_name', data.get('type_id', ''))) if 'knowledge' in data: update_fields.append("knowledge = %s") params.append(data['knowledge']) if update_fields: params.append(product_id) sql = f"UPDATE ai_products SET {', '.join(update_fields)}, updated_at = NOW() WHERE id = %s" db_manager.execute_update(sql, params) # 更新标签 if 'tags' in data: # 删除旧标签 del_sql = "DELETE FROM ai_product_tags WHERE product_id = %s" db_manager.execute_update(del_sql, (product_id,)) # 添加新标签 for tag_name in data['tags']: tag_sql = "INSERT INTO ai_product_tags (enterprise_id, product_id, tag_name) VALUES (%s, %s, %s)" db_manager.execute_insert(tag_sql, (enterprise_id, product_id, tag_name)) logger.info(f"更新产品成功: ID {product_id}") # 记录业务日志 log_update( target_type='product', target_id=product_id, description=f"更新产品信息", request_data=data ) return jsonify({ 'code': 200, 'message': '更新成功', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[更新产品] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @product_bp.route('/', methods=['DELETE']) @require_auth def delete_product(product_id): """删除产品""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 db_manager = get_db_manager() # 检查产品是否存在且属于当前企业 check_sql = "SELECT id, name FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'" existing = db_manager.execute_query(check_sql, (product_id, enterprise_id)) if not existing: return jsonify({ 'code': 404, 'message': '产品不存在', 'data': None }), 404 # 软删除产品 sql = "UPDATE ai_products SET status = 'deleted', updated_at = NOW() WHERE id = %s" db_manager.execute_update(sql, (product_id,)) # 更新企业产品总数 update_sql = "UPDATE ai_enterprises SET products_total = products_total - 1 WHERE id = %s" db_manager.execute_update(update_sql, (enterprise_id,)) logger.info(f"删除产品成功: {existing[0]['name']}") # 记录业务日志 log_delete( target_type='product', target_id=product_id, description=f"删除产品: {existing[0]['name']}" ) return jsonify({ 'code': 200, 'message': '删除成功', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[删除产品] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @product_bp.route('/', methods=['GET']) @require_auth def get_product_detail(product_id): """获取产品详情""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 db_manager = get_db_manager() # 查询产品详情 sql = """ SELECT p.id, p.name, p.type_name, p.knowledge, p.status, p.articles_total, p.published_total, p.image_url, p.image_thumbnail_url, p.created_at, p.updated_at FROM ai_products p WHERE p.id = %s AND p.enterprise_id = %s AND p.status != 'deleted' """ result = db_manager.execute_query(sql, (product_id, enterprise_id)) if not result: return jsonify({ 'code': 404, 'message': '产品不存在', 'data': None }), 404 product = result[0] # 查询产品标签(直接从ai_product_tags查询) tag_sql = """ SELECT id, tag_name FROM ai_product_tags WHERE product_id = %s """ tags = db_manager.execute_query(tag_sql, (product_id,)) product['tags'] = tags # 暂不查询产品图片(图片关联表结构不同) product['images'] = [] logger.info(f"获取产品详情成功: ID {product_id}") return jsonify({ 'code': 200, 'message': 'success', 'data': product, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取产品详情] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 # ============================================================================ # 产品图片管理接口 (ai_product_images) # ============================================================================ @product_bp.route('//images', methods=['GET']) @require_auth def get_product_images(product_id): """获取产品图片列表""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[获取产品图片] 产品ID: {product_id}, IP: {client_ip}") try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 db_manager = get_db_manager() # 验证产品归属 check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'" if not db_manager.execute_query(check_sql, (product_id, enterprise_id)): return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404 # 获取分页参数 page = int(request.args.get('page', 1)) page_size = int(request.args.get('pageSize', 20)) status = request.args.get('status', 'active') offset = (page - 1) * page_size # 查询总数 count_sql = "SELECT COUNT(*) as total FROM ai_product_images WHERE product_id = %s AND status = %s" total = db_manager.execute_query(count_sql, (product_id, status))[0]['total'] # 查询图片列表 sql = """ SELECT id, image_id, image_name, image_url, thumbnail_url, type_name, description, file_size, width, height, upload_user_id, status, created_at, updated_at FROM ai_product_images WHERE product_id = %s AND status = %s ORDER BY created_at DESC LIMIT %s OFFSET %s """ images = db_manager.execute_query(sql, (product_id, status, page_size, offset)) logger.info(f"[获取产品图片] 成功, 产品ID: {product_id}, 总数: {total}, 返回: {len(images)}") return jsonify({ 'code': 200, 'message': 'success', 'data': {'total': total, 'list': images}, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取产品图片] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @product_bp.route('//images', methods=['POST']) @require_auth def add_product_image(product_id): """添加产品图片""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[添加产品图片] 产品ID: {product_id}, IP: {client_ip}") try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') user_id = current_user.get('user_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 data = request.get_json() if not data: return jsonify({'code': 400, 'message': '请求参数错误', 'data': None}), 400 # 验证必需字段 required = ['image_id', 'image_name', 'image_url'] for field in required: if not data.get(field): return jsonify({'code': 400, 'message': f'缺少必需字段: {field}', 'data': None}), 400 db_manager = get_db_manager() # 验证产品归属 check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'" if not db_manager.execute_query(check_sql, (product_id, enterprise_id)): return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404 # 插入图片 sql = """ INSERT INTO ai_product_images (enterprise_id, product_id, image_id, image_name, image_url, thumbnail_url, type_name, description, file_size, width, height, upload_user_id, status) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ image_id = db_manager.execute_insert(sql, ( enterprise_id, product_id, data['image_id'], data['image_name'], data['image_url'], data.get('thumbnail_url', ''), data.get('type_name', ''), data.get('description', ''), data.get('file_size'), data.get('width'), data.get('height'), user_id, 'active' )) logger.info(f"[添加产品图片] 成功, ID: {image_id}, 产品ID: {product_id}") log_create('product_image', image_id, f"添加产品图片: {data['image_name']}", data, {'id': image_id}) return jsonify({ 'code': 200, 'message': '添加成功', 'data': {'id': image_id}, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[添加产品图片] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @product_bp.route('//images/', methods=['PUT']) @require_auth def update_product_image(product_id, image_id): """更新产品图片""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 data = request.get_json() if not data: return jsonify({'code': 400, 'message': '请求参数错误', 'data': None}), 400 db_manager = get_db_manager() # 验证图片归属 check_sql = "SELECT id FROM ai_product_images WHERE id = %s AND product_id = %s AND enterprise_id = %s" if not db_manager.execute_query(check_sql, (image_id, product_id, enterprise_id)): return jsonify({'code': 404, 'message': '图片不存在', 'data': None}), 404 # 构建更新 update_fields = [] params = [] field_mapping = { 'image_name': 'image_name', 'image_url': 'image_url', 'thumbnail_url': 'thumbnail_url', 'type_name': 'type_name', 'description': 'description', 'file_size': 'file_size', 'width': 'width', 'height': 'height' } for key, db_field in field_mapping.items(): if key in data: update_fields.append(f"{db_field} = %s") params.append(data[key]) if update_fields: params.append(image_id) sql = f"UPDATE ai_product_images SET {', '.join(update_fields)}, updated_at = NOW() WHERE id = %s" db_manager.execute_update(sql, params) logger.info(f"[更新产品图片] 成功, ID: {image_id}") log_update('product_image', image_id, "更新产品图片", data) return jsonify({ 'code': 200, 'message': '更新成功', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[更新产品图片] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @product_bp.route('//images/', methods=['DELETE']) @require_auth def delete_product_image(product_id, image_id): """删除产品图片""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 db_manager = get_db_manager() # 验证图片归属 check_sql = "SELECT image_name FROM ai_product_images WHERE id = %s AND product_id = %s AND enterprise_id = %s" result = db_manager.execute_query(check_sql, (image_id, product_id, enterprise_id)) if not result: return jsonify({'code': 404, 'message': '图片不存在', 'data': None}), 404 # 软删除 sql = "UPDATE ai_product_images SET status = 'deleted', updated_at = NOW() WHERE id = %s" db_manager.execute_update(sql, (image_id,)) logger.info(f"[删除产品图片] 成功, ID: {image_id}") log_delete('product_image', image_id, f"删除产品图片: {result[0]['image_name']}") return jsonify({ 'code': 200, 'message': '删除成功', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[删除产品图片] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 # ============================================================================ # 产品标签管理接口 (ai_product_tags) # ============================================================================ @product_bp.route('//tags', methods=['GET']) @require_auth def get_product_tags(product_id): """获取产品标签列表""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[获取产品标签] 产品ID: {product_id}, IP: {client_ip}") try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 db_manager = get_db_manager() # 验证产品归属 check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'" if not db_manager.execute_query(check_sql, (product_id, enterprise_id)): return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404 # 查询标签 sql = """ SELECT id, tag_name, created_at FROM ai_product_tags WHERE product_id = %s AND enterprise_id = %s ORDER BY created_at DESC """ tags = db_manager.execute_query(sql, (product_id, enterprise_id)) logger.info(f"[获取产品标签] 成功, 产品ID: {product_id}, 数量: {len(tags)}") return jsonify({ 'code': 200, 'message': 'success', 'data': {'list': tags}, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取产品标签] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @product_bp.route('//tags', methods=['POST']) @require_auth def add_product_tag(product_id): """添加产品标签""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[添加产品标签] 产品ID: {product_id}, IP: {client_ip}") try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 data = request.get_json() if not data or not data.get('tag_name'): return jsonify({'code': 400, 'message': '标签名称不能为空', 'data': None}), 400 db_manager = get_db_manager() # 验证产品归属 check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'" if not db_manager.execute_query(check_sql, (product_id, enterprise_id)): return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404 # 检查标签是否已存在 exists_sql = "SELECT id FROM ai_product_tags WHERE product_id = %s AND enterprise_id = %s AND tag_name = %s" if db_manager.execute_query(exists_sql, (product_id, enterprise_id, data['tag_name'])): return jsonify({'code': 400, 'message': '标签已存在', 'data': None}), 400 # 插入标签 sql = "INSERT INTO ai_product_tags (enterprise_id, product_id, tag_name) VALUES (%s, %s, %s)" tag_id = db_manager.execute_insert(sql, (enterprise_id, product_id, data['tag_name'])) logger.info(f"[添加产品标签] 成功, ID: {tag_id}, 标签: {data['tag_name']}") log_create('product_tag', tag_id, f"添加产品标签: {data['tag_name']}", data, {'id': tag_id}) return jsonify({ 'code': 200, 'message': '添加成功', 'data': {'id': tag_id}, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[添加产品标签] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @product_bp.route('//tags/', methods=['DELETE']) @require_auth def delete_product_tag(product_id, tag_id): """删除产品标签""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 db_manager = get_db_manager() # 验证标签归属 check_sql = "SELECT tag_name FROM ai_product_tags WHERE id = %s AND product_id = %s AND enterprise_id = %s" result = db_manager.execute_query(check_sql, (tag_id, product_id, enterprise_id)) if not result: return jsonify({'code': 404, 'message': '标签不存在', 'data': None}), 404 # 删除标签 sql = "DELETE FROM ai_product_tags WHERE id = %s" db_manager.execute_update(sql, (tag_id,)) logger.info(f"[删除产品标签] 成功, ID: {tag_id}") log_delete('product_tag', tag_id, f"删除产品标签: {result[0]['tag_name']}") return jsonify({ 'code': 200, 'message': '删除成功', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[删除产品标签] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 # ============================================================================ # 产品类型管理接口 (ai_product_types) # ============================================================================ @product_bp.route('//types', methods=['GET']) @require_auth def get_product_types(product_id): """获取产品类型列表""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[获取产品类型] 产品ID: {product_id}, IP: {client_ip}") try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 db_manager = get_db_manager() # 验证产品归属 check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'" if not db_manager.execute_query(check_sql, (product_id, enterprise_id)): return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404 # 查询类型 sql = """ SELECT id, type_name, created_at FROM ai_product_types WHERE product_id = %s AND enterprise_id = %s ORDER BY created_at DESC """ types = db_manager.execute_query(sql, (product_id, enterprise_id)) logger.info(f"[获取产品类型] 成功, 产品ID: {product_id}, 数量: {len(types)}") return jsonify({ 'code': 200, 'message': 'success', 'data': {'list': types}, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取产品类型] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @product_bp.route('//types', methods=['POST']) @require_auth def add_product_type(product_id): """添加产品类型""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[添加产品类型] 产品ID: {product_id}, IP: {client_ip}") try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 data = request.get_json() if not data or not data.get('type_name'): return jsonify({'code': 400, 'message': '类型名称不能为空', 'data': None}), 400 db_manager = get_db_manager() # 验证产品归属 check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'" if not db_manager.execute_query(check_sql, (product_id, enterprise_id)): return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404 # 检查类型是否已存在 exists_sql = "SELECT id FROM ai_product_types WHERE product_id = %s AND enterprise_id = %s AND type_name = %s" if db_manager.execute_query(exists_sql, (product_id, enterprise_id, data['type_name'])): return jsonify({'code': 400, 'message': '类型已存在', 'data': None}), 400 # 插入类型 sql = "INSERT INTO ai_product_types (enterprise_id, product_id, type_name) VALUES (%s, %s, %s)" type_id = db_manager.execute_insert(sql, (enterprise_id, product_id, data['type_name'])) logger.info(f"[添加产品类型] 成功, ID: {type_id}, 类型: {data['type_name']}") log_create('product_type', type_id, f"添加产品类型: {data['type_name']}", data, {'id': type_id}) return jsonify({ 'code': 200, 'message': '添加成功', 'data': {'id': type_id}, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[添加产品类型] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @product_bp.route('//types/', methods=['DELETE']) @require_auth def delete_product_type(product_id, type_id): """删除产品类型""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 db_manager = get_db_manager() # 验证类型归属 check_sql = "SELECT type_name FROM ai_product_types WHERE id = %s AND product_id = %s AND enterprise_id = %s" result = db_manager.execute_query(check_sql, (type_id, product_id, enterprise_id)) if not result: return jsonify({'code': 404, 'message': '类型不存在', 'data': None}), 404 # 删除类型 sql = "DELETE FROM ai_product_types WHERE id = %s" db_manager.execute_update(sql, (type_id,)) logger.info(f"[删除产品类型] 成功, ID: {type_id}") log_delete('product_type', type_id, f"删除产品类型: {result[0]['type_name']}") return jsonify({ 'code': 200, 'message': '删除成功', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[删除产品类型] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 # ============================================================================ # 企业产品类型管理接口 (基于enterprise_id的独立管理) # ============================================================================ @product_bp.route('/types/list', methods=['GET']) @require_auth def get_enterprise_product_types(): """获取企业所有产品类型列表(独立接口)""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[获取企业产品类型列表] 开始处理请求, IP: {client_ip}") try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: logger.warning(f"[获取企业产品类型列表] 无法获取企业ID, IP: {client_ip}") return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 logger.info(f"[获取企业产品类型列表] 企业ID: {enterprise_id}, IP: {client_ip}") # 获取分页参数 page = int(request.args.get('page', 1)) page_size = int(request.args.get('pageSize', 20)) keyword = request.args.get('keyword', '').strip() offset = (page - 1) * page_size db_manager = get_db_manager() # 构建查询条件 where_conditions = ["enterprise_id = %s"] params = [enterprise_id] if keyword: where_conditions.append("type_name LIKE %s") params.append(f"%{keyword}%") where_clause = " AND ".join(where_conditions) # 查询总数 count_sql = f"SELECT COUNT(*) as total FROM ai_product_types WHERE {where_clause}" total = db_manager.execute_query(count_sql, params)[0]['total'] # 查询类型列表 sql = f""" SELECT id, type_name, enterprise_id, product_id, created_at FROM ai_product_types WHERE {where_clause} ORDER BY created_at DESC LIMIT %s OFFSET %s """ params.extend([page_size, offset]) types = db_manager.execute_query(sql, params) logger.info(f"[获取企业产品类型列表] 查询成功, 企业ID: {enterprise_id}, 总数: {total}, 返回: {len(types)}, IP: {client_ip}") return jsonify({ 'code': 200, 'message': 'success', 'data': {'total': total, 'list': types}, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取企业产品类型列表] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @product_bp.route('/types/create', methods=['POST']) @require_auth def create_enterprise_product_type(): """创建企业产品类型(独立接口)""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[创建企业产品类型] 开始处理请求, IP: {client_ip}") try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: logger.warning(f"[创建企业产品类型] 无法获取企业ID, IP: {client_ip}") return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 data = request.get_json() if not data or not data.get('type_name'): logger.warning(f"[创建企业产品类型] 类型名称不能为空, 企业ID: {enterprise_id}, IP: {client_ip}") return jsonify({'code': 400, 'message': '类型名称不能为空', 'data': None}), 400 product_id = data.get('product_id', 0) # 默认0表示不关联产品 type_name = data['type_name'] logger.info(f"[创建企业产品类型] 类型名称: {type_name}, 产品ID: {product_id}, 企业ID: {enterprise_id}, IP: {client_ip}") db_manager = get_db_manager() # 如果指定了产品ID,验证产品归属 if product_id > 0: check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'" if not db_manager.execute_query(check_sql, (product_id, enterprise_id)): logger.warning(f"[创建企业产品类型] 产品不存在或不属于当前企业, 产品ID: {product_id}, 企业ID: {enterprise_id}") return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404 # 检查类型是否已存在 exists_sql = "SELECT id FROM ai_product_types WHERE enterprise_id = %s AND product_id = %s AND type_name = %s" if db_manager.execute_query(exists_sql, (enterprise_id, product_id, type_name)): logger.warning(f"[创建企业产品类型] 类型已存在, 类型: {type_name}, 企业ID: {enterprise_id}") return jsonify({'code': 400, 'message': '该类型已存在', 'data': None}), 400 # 插入类型 sql = "INSERT INTO ai_product_types (enterprise_id, product_id, type_name) VALUES (%s, %s, %s)" type_id = db_manager.execute_insert(sql, (enterprise_id, product_id, type_name)) logger.info(f"[创建企业产品类型] 创建成功, ID: {type_id}, 类型: {type_name}, 企业ID: {enterprise_id}, IP: {client_ip}") log_create('enterprise_product_type', type_id, f"创建产品类型: {type_name}", data, {'id': type_id}) return jsonify({ 'code': 200, 'message': '创建成功', 'data': {'id': type_id}, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[创建企业产品类型] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @product_bp.route('/types/', methods=['PUT']) @require_auth def update_enterprise_product_type(type_id): """更新企业产品类型(独立接口)""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[更新企业产品类型] 类型ID: {type_id}, IP: {client_ip}") try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 data = request.get_json() if not data: return jsonify({'code': 400, 'message': '请求参数错误', 'data': None}), 400 db_manager = get_db_manager() # 验证类型归属 check_sql = "SELECT type_name, product_id FROM ai_product_types WHERE id = %s AND enterprise_id = %s" result = db_manager.execute_query(check_sql, (type_id, enterprise_id)) if not result: logger.warning(f"[更新企业产品类型] 类型不存在, ID: {type_id}, 企业ID: {enterprise_id}") return jsonify({'code': 404, 'message': '类型不存在', 'data': None}), 404 old_data = result[0] # 构建更新 update_fields = [] params = [] if 'type_name' in data: # 检查新名称是否与其他类型重复 exists_sql = "SELECT id FROM ai_product_types WHERE enterprise_id = %s AND product_id = %s AND type_name = %s AND id != %s" if db_manager.execute_query(exists_sql, (enterprise_id, old_data['product_id'], data['type_name'], type_id)): return jsonify({'code': 400, 'message': '类型名称已存在', 'data': None}), 400 update_fields.append("type_name = %s") params.append(data['type_name']) if 'product_id' in data: # 验证新产品归属 if data['product_id'] > 0: check_product_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'" if not db_manager.execute_query(check_product_sql, (data['product_id'], enterprise_id)): return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404 update_fields.append("product_id = %s") params.append(data['product_id']) if update_fields: params.append(type_id) sql = f"UPDATE ai_product_types SET {', '.join(update_fields)} WHERE id = %s" db_manager.execute_update(sql, params) logger.info(f"[更新企业产品类型] 更新成功, ID: {type_id}, 企业ID: {enterprise_id}") log_update('enterprise_product_type', type_id, "更新产品类型", data) return jsonify({ 'code': 200, 'message': '更新成功', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[更新企业产品类型] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @product_bp.route('/types/', methods=['DELETE']) @require_auth def delete_enterprise_product_type(type_id): """删除企业产品类型(独立接口)""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[删除企业产品类型] 类型ID: {type_id}, IP: {client_ip}") try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 db_manager = get_db_manager() # 验证类型归属 check_sql = "SELECT type_name FROM ai_product_types WHERE id = %s AND enterprise_id = %s" result = db_manager.execute_query(check_sql, (type_id, enterprise_id)) if not result: logger.warning(f"[删除企业产品类型] 类型不存在, ID: {type_id}, 企业ID: {enterprise_id}") return jsonify({'code': 404, 'message': '类型不存在', 'data': None}), 404 type_name = result[0]['type_name'] # 删除类型 sql = "DELETE FROM ai_product_types WHERE id = %s" db_manager.execute_update(sql, (type_id,)) logger.info(f"[删除企业产品类型] 删除成功, ID: {type_id}, 类型: {type_name}, 企业ID: {enterprise_id}, IP: {client_ip}") log_delete('enterprise_product_type', type_id, f"删除产品类型: {type_name}") return jsonify({ 'code': 200, 'message': '删除成功', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[删除企业产品类型] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500 @product_bp.route('/types/', methods=['GET']) @require_auth def get_enterprise_product_type_detail(type_id): """获取企业产品类型详情(独立接口)""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[获取企业产品类型详情] 类型ID: {type_id}, IP: {client_ip}") try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400 db_manager = get_db_manager() # 查询类型详情 sql = """ SELECT id, type_name, enterprise_id, product_id, created_at FROM ai_product_types WHERE id = %s AND enterprise_id = %s """ result = db_manager.execute_query(sql, (type_id, enterprise_id)) if not result: logger.warning(f"[获取企业产品类型详情] 类型不存在, ID: {type_id}, 企业ID: {enterprise_id}") return jsonify({'code': 404, 'message': '类型不存在', 'data': None}), 404 type_data = result[0] logger.info(f"[获取企业产品类型详情] 查询成功, ID: {type_id}, 类型: {type_data['type_name']}, 企业ID: {enterprise_id}") return jsonify({ 'code': 200, 'message': 'success', 'data': type_data, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取企业产品类型详情] 错误: {str(e)}", exc_info=True) return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500