#!/usr/bin/env python # -*- coding: utf-8 -*- """ 产品管理接口 """ from flask import Blueprint, request, jsonify import logging from datetime import datetime from auth_utils import require_auth, AuthUtils from database_config import get_db_manager from log_utils import log_create, log_update, log_delete, log_error, log_operation logger = logging.getLogger(__name__) # 创建蓝图 product_bp = Blueprint('product', __name__, url_prefix='/api/products') @product_bp.route('/list', methods=['GET']) @require_auth def get_products_list(): """获取产品列表""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 # 获取查询参数 page = int(request.args.get('page', 1)) page_size = int(request.args.get('pageSize', 20)) keyword = request.args.get('keyword', '').strip() type_id = request.args.get('type', '').strip() status = request.args.get('status', '').strip() # 构建查询条件 where_conditions = ["enterprise_id = %s", "status != 'deleted'"] params = [enterprise_id] if keyword: where_conditions.append("name LIKE %s") params.append(f"%{keyword}%") if type_id: where_conditions.append("type_name LIKE %s") params.append(f"%{type_id}%") if status: where_conditions.append("status = %s") params.append(status) where_clause = " AND ".join(where_conditions) # 计算偏移量 offset = (page - 1) * page_size db_manager = get_db_manager() # 查询总数 count_sql = f"SELECT COUNT(*) as total FROM ai_products WHERE {where_clause}" count_result = db_manager.execute_query(count_sql, params) total = count_result[0]['total'] # 查询产品列表 sql = f""" SELECT p.id, p.name, p.type_name, p.knowledge, p.status, p.articles_total, p.published_total, p.image_url, p.image_thumbnail_url, p.created_at, p.updated_at FROM ai_products p WHERE {where_clause} ORDER BY p.created_at DESC LIMIT %s OFFSET %s """ params.extend([page_size, offset]) products = db_manager.execute_query(sql, params) # 查询产品标签 for product in products: tag_sql = """ SELECT id, tag_name FROM ai_product_tags WHERE product_id = %s """ tags = db_manager.execute_query(tag_sql, (product['id'],)) product['tags'] = tags logger.info(f"获取产品列表成功,总数: {total}") return jsonify({ 'code': 200, 'message': 'success', 'data': { 'total': total, 'list': products }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取产品列表] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @product_bp.route('/create', methods=['POST']) @require_auth def create_product(): """创建产品""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 data = request.get_json() if not data: return jsonify({ 'code': 400, 'message': '请求参数错误', 'data': None }), 400 # 验证必需字段 required_fields = ['name', 'type_id', 'knowledge'] for field in required_fields: if not data.get(field): return jsonify({ 'code': 400, 'message': f'缺少必需字段: {field}', 'data': None }), 400 db_manager = get_db_manager() # 创建产品 sql = """ INSERT INTO ai_products (enterprise_id, name, type_name, knowledge, status, articles_total, published_total, image_url, image_thumbnail_url) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """ product_id = db_manager.execute_insert(sql, ( enterprise_id, data['name'], data.get('type_name', data.get('type_id', '')), # 兼容type_name和type_id data['knowledge'], 'active', 0, 0, # articles_total, published_total data.get('image_url', ''), data.get('image_thumbnail_url', '') )) # 添加标签 if data.get('tags'): for tag_name in data['tags']: tag_sql = "INSERT INTO ai_product_tags (enterprise_id, product_id, tag_name) VALUES (%s, %s, %s)" db_manager.execute_insert(tag_sql, (enterprise_id, product_id, tag_name)) # 更新企业产品总数 update_sql = "UPDATE ai_enterprises SET products_total = products_total + 1 WHERE id = %s" db_manager.execute_update(update_sql, (enterprise_id,)) logger.info(f"创建产品成功: {data['name']}, ID: {product_id}") # 记录业务日志 log_create( target_type='product', target_id=product_id, description=f"创建产品: {data['name']}", request_data=data, response_data={'id': product_id} ) return jsonify({ 'code': 200, 'message': '创建成功', 'data': {'id': product_id}, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[创建产品] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @product_bp.route('/', methods=['PUT']) @require_auth def update_product(product_id): """更新产品""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 data = request.get_json() if not data: return jsonify({ 'code': 400, 'message': '请求参数错误', 'data': None }), 400 db_manager = get_db_manager() # 检查产品是否存在且属于当前企业 check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'" existing = db_manager.execute_query(check_sql, (product_id, enterprise_id)) if not existing: return jsonify({ 'code': 404, 'message': '产品不存在', 'data': None }), 404 # 构建更新字段 update_fields = [] params = [] if 'name' in data: update_fields.append("name = %s") params.append(data['name']) if 'type_name' in data or 'type_id' in data: update_fields.append("type_name = %s") params.append(data.get('type_name', data.get('type_id', ''))) if 'knowledge' in data: update_fields.append("knowledge = %s") params.append(data['knowledge']) if update_fields: params.append(product_id) sql = f"UPDATE ai_products SET {', '.join(update_fields)}, updated_at = NOW() WHERE id = %s" db_manager.execute_update(sql, params) # 更新标签 if 'tags' in data: # 删除旧标签 del_sql = "DELETE FROM ai_product_tags WHERE product_id = %s" db_manager.execute_update(del_sql, (product_id,)) # 添加新标签 for tag_name in data['tags']: tag_sql = "INSERT INTO ai_product_tags (enterprise_id, product_id, tag_name) VALUES (%s, %s, %s)" db_manager.execute_insert(tag_sql, (enterprise_id, product_id, tag_name)) logger.info(f"更新产品成功: ID {product_id}") # 记录业务日志 log_update( target_type='product', target_id=product_id, description=f"更新产品信息", request_data=data ) return jsonify({ 'code': 200, 'message': '更新成功', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[更新产品] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @product_bp.route('/', methods=['DELETE']) @require_auth def delete_product(product_id): """删除产品""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 db_manager = get_db_manager() # 检查产品是否存在且属于当前企业 check_sql = "SELECT id, name FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'" existing = db_manager.execute_query(check_sql, (product_id, enterprise_id)) if not existing: return jsonify({ 'code': 404, 'message': '产品不存在', 'data': None }), 404 # 软删除产品 sql = "UPDATE ai_products SET status = 'deleted', updated_at = NOW() WHERE id = %s" db_manager.execute_update(sql, (product_id,)) # 更新企业产品总数 update_sql = "UPDATE ai_enterprises SET products_total = products_total - 1 WHERE id = %s" db_manager.execute_update(update_sql, (enterprise_id,)) logger.info(f"删除产品成功: {existing[0]['name']}") # 记录业务日志 log_delete( target_type='product', target_id=product_id, description=f"删除产品: {existing[0]['name']}" ) return jsonify({ 'code': 200, 'message': '删除成功', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[删除产品] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @product_bp.route('/', methods=['GET']) @require_auth def get_product_detail(product_id): """获取产品详情""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 db_manager = get_db_manager() # 查询产品详情 sql = """ SELECT p.id, p.name, p.type_name, p.knowledge, p.status, p.articles_total, p.published_total, p.image_url, p.image_thumbnail_url, p.created_at, p.updated_at FROM ai_products p WHERE p.id = %s AND p.enterprise_id = %s AND p.status != 'deleted' """ result = db_manager.execute_query(sql, (product_id, enterprise_id)) if not result: return jsonify({ 'code': 404, 'message': '产品不存在', 'data': None }), 404 product = result[0] # 查询产品标签(直接从ai_product_tags查询) tag_sql = """ SELECT id, tag_name FROM ai_product_tags WHERE product_id = %s """ tags = db_manager.execute_query(tag_sql, (product_id,)) product['tags'] = tags # 暂不查询产品图片(图片关联表结构不同) product['images'] = [] logger.info(f"获取产品详情成功: ID {product_id}") return jsonify({ 'code': 200, 'message': 'success', 'data': product, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取产品详情] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500