Files
ai_wht_B/product_routes.py

1298 lines
53 KiB
Python
Raw Permalink Normal View History

2026-01-06 14:18:39 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
产品管理接口
"""
from flask import Blueprint, request, jsonify
import logging
from datetime import datetime
from auth_utils import require_auth, AuthUtils
from database_config import get_db_manager, format_datetime_fields
from log_utils import log_create, log_update, log_delete, log_error, log_operation
logger = logging.getLogger('article_server')
# 创建蓝图
product_bp = Blueprint('product', __name__, url_prefix='/api/products')
@product_bp.route('/list_info', methods=['GET'])
@require_auth
def get_products_list_info():
"""获取产品列表简化版仅返回5个字段"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[获取产品列表简化版] 开始处理请求, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
logger.info(f"[获取产品列表简化版] 用户信息 - 用户ID: {current_user.get('user_id')}, 企业ID: {enterprise_id}, IP: {client_ip}")
if not enterprise_id:
logger.warning(f"[获取产品列表简化版] 无法获取企业ID, IP: {client_ip}")
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
db_manager = get_db_manager()
# 查询产品列表仅返回5个字段
# 使用 IN 替代 != 以优化索引使用
sql = """
SELECT id, name, status, created_at, updated_at
FROM ai_products
WHERE enterprise_id = %s AND status IN ('draft', 'active')
ORDER BY created_at DESC
"""
products = db_manager.execute_query(sql, (enterprise_id,))
# 格式化日期时间字段
products = format_datetime_fields(products)
logger.info(f"[获取产品列表简化版] 查询成功, 返回数量: {len(products)}, 企业ID: {enterprise_id}, IP: {client_ip}")
return jsonify({
'code': 200,
'message': 'success',
'data': {
'list': products
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取产品列表简化版] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@product_bp.route('/list', methods=['GET'])
@require_auth
def get_products_list():
"""获取产品列表"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[获取产品列表] 开始处理请求, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
logger.info(f"[获取产品列表] 用户信息 - 用户ID: {current_user.get('user_id')}, 企业ID: {enterprise_id}, IP: {client_ip}")
if not enterprise_id:
logger.warning(f"[获取产品列表] 无法获取企业ID, IP: {client_ip}")
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
# 获取查询参数
page = int(request.args.get('page', 1))
page_size = int(request.args.get('pageSize', 20))
keyword = request.args.get('keyword', '').strip()
type_id = request.args.get('type', '').strip()
status = request.args.get('status', '').strip()
logger.info(f"[获取产品列表] 查询参数: page={page}, pageSize={page_size}, keyword={keyword}, type={type_id}, status={status}, 企业ID: {enterprise_id}, IP: {client_ip}")
# 构建查询条件
where_conditions = ["enterprise_id = %s", "status != 'deleted'"]
params = [enterprise_id]
if keyword:
where_conditions.append("name LIKE %s")
params.append(f"%{keyword}%")
if type_id:
where_conditions.append("type_name LIKE %s")
params.append(f"%{type_id}%")
if status:
where_conditions.append("status = %s")
params.append(status)
where_clause = " AND ".join(where_conditions)
# 计算偏移量
offset = (page - 1) * page_size
db_manager = get_db_manager()
# 查询总数
count_sql = f"SELECT COUNT(*) as total FROM ai_products WHERE {where_clause}"
count_result = db_manager.execute_query(count_sql, params)
total = count_result[0]['total']
# 查询产品列表
sql = f"""
SELECT p.id, p.name, p.type_name, p.knowledge, p.status,
p.articles_total, p.published_total,
p.image_url, p.image_thumbnail_url,
p.created_at, p.updated_at
FROM ai_products p
WHERE {where_clause}
ORDER BY p.created_at DESC
LIMIT %s OFFSET %s
"""
params.extend([page_size, offset])
products = db_manager.execute_query(sql, params)
# 查询产品标签
for product in products:
tag_sql = """
SELECT id, tag_name
FROM ai_product_tags
WHERE product_id = %s
"""
tags = db_manager.execute_query(tag_sql, (product['id'],))
product['tags'] = tags
# 格式化日期时间字段
products = format_datetime_fields(products)
logger.info(f"[获取产品列表] 查询成功, 总数: {total}, 当前页: {page}, 每页: {page_size}, 返回数量: {len(products)}, 企业ID: {enterprise_id}, IP: {client_ip}")
return jsonify({
'code': 200,
'message': 'success',
'data': {
'total': total,
'list': products
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取产品列表] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@product_bp.route('/create', methods=['POST'])
@require_auth
def create_product():
"""创建产品"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[创建产品] 开始处理创建产品请求, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
logger.info(f"[创建产品] 用户信息 - 用户ID: {current_user.get('user_id')}, 企业ID: {enterprise_id}, IP: {client_ip}")
if not enterprise_id:
logger.warning(f"[创建产品] 无法获取企业ID, IP: {client_ip}")
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
data = request.get_json()
if not data:
logger.warning(f"[创建产品] 请求参数为空, 企业ID: {enterprise_id}, IP: {client_ip}")
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
logger.info(f"[创建产品] 收到创建请求, 产品名称: {data.get('name')}, 类型: {data.get('type_name') or data.get('type_id')}, 企业ID: {enterprise_id}, IP: {client_ip}")
# 验证必需字段
required_fields = ['name', 'type_id', 'knowledge', 'type_name']
for field in required_fields:
if not data.get(field):
logger.warning(f"[创建产品] 缺少必需字段: {field}, 企业ID: {enterprise_id}, IP: {client_ip}")
return jsonify({
'code': 400,
'message': f'缺少必需字段: {field}',
'data': None
}), 400
db_manager = get_db_manager()
# 产品名称判重:检查同一企业下是否已存在相同名称的产品
logger.info(f"[创建产品] 开始检查产品名称是否重复, 名称: {data['name']}, 企业ID: {enterprise_id}")
check_duplicate_sql = """
SELECT id, name FROM ai_products
WHERE enterprise_id = %s AND name = %s AND status != 'deleted'
"""
existing_product = db_manager.execute_query(check_duplicate_sql, (enterprise_id, data['name']))
if existing_product:
logger.warning(f"[创建产品] 产品名称已存在, 名称: {data['name']}, 已存在ID: {existing_product[0]['id']}, 企业ID: {enterprise_id}, IP: {client_ip}")
product_name = data['name']
return jsonify({
'code': 409,
'message': f'产品名称「{product_name}」已存在,请使用其他名称',
'data': None
}), 409
# 创建产品
logger.info(f"[创建产品] 开始插入产品数据, 名称: {data['name']}, 企业ID: {enterprise_id}")
sql = """
INSERT INTO ai_products
(enterprise_id, name, type_id, type_name, knowledge, status, articles_total, published_total, image_url, image_thumbnail_url)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
product_id = db_manager.execute_insert(sql, (
enterprise_id,
data['name'],
data.get('type_id', 0),
data.get('type_name', ''),
data['knowledge'],
'active',
0, 0, # articles_total, published_total
data.get('image_url', ''),
data.get('image_thumbnail_url', '')
))
logger.info(f"[创建产品] 产品插入成功, ID: {product_id}, 名称: {data['name']}, 企业ID: {enterprise_id}")
# 添加标签
if data.get('tags'):
logger.info(f"[创建产品] 开始添加产品标签, 产品ID: {product_id}, 标签数量: {len(data['tags'])}")
for tag_name in data['tags']:
tag_sql = "INSERT INTO ai_product_tags (enterprise_id, product_id, tag_name) VALUES (%s, %s, %s)"
db_manager.execute_insert(tag_sql, (enterprise_id, product_id, tag_name))
logger.info(f"[创建产品] 产品标签添加完成, 产品ID: {product_id}, 数量: {len(data['tags'])}")
# 更新企业产品总数
logger.info(f"[创建产品] 开始更新企业产品总数, 企业ID: {enterprise_id}")
update_sql = "UPDATE ai_enterprises SET products_total = products_total + 1 WHERE id = %s"
db_manager.execute_update(update_sql, (enterprise_id,))
logger.info(f"[创建产品] 产品创建全部完成, ID: {product_id}, 名称: {data['name']}, 企业ID: {enterprise_id}, IP: {client_ip}")
# 记录业务日志
log_create(
target_type='product',
target_id=product_id,
description=f"创建产品: {data['name']}",
request_data=data,
response_data={'id': product_id}
)
return jsonify({
'code': 200,
'message': '创建成功',
'data': {'id': product_id},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[创建产品] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@product_bp.route('/<int:product_id>', methods=['PUT'])
@require_auth
def update_product(product_id):
"""更新产品"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
data = request.get_json()
if not data:
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
db_manager = get_db_manager()
# 检查产品是否存在且属于当前企业
check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
existing = db_manager.execute_query(check_sql, (product_id, enterprise_id))
if not existing:
return jsonify({
'code': 404,
'message': '产品不存在',
'data': None
}), 404
# 构建更新字段
update_fields = []
params = []
if 'name' in data:
update_fields.append("name = %s")
params.append(data['name'])
if 'type_name' in data or 'type_id' in data:
update_fields.append("type_name = %s")
params.append(data.get('type_name', data.get('type_id', '')))
if 'knowledge' in data:
update_fields.append("knowledge = %s")
params.append(data['knowledge'])
if update_fields:
params.append(product_id)
sql = f"UPDATE ai_products SET {', '.join(update_fields)}, updated_at = NOW() WHERE id = %s"
db_manager.execute_update(sql, params)
# 更新标签
if 'tags' in data:
# 删除旧标签
del_sql = "DELETE FROM ai_product_tags WHERE product_id = %s"
db_manager.execute_update(del_sql, (product_id,))
# 添加新标签
for tag_name in data['tags']:
tag_sql = "INSERT INTO ai_product_tags (enterprise_id, product_id, tag_name) VALUES (%s, %s, %s)"
db_manager.execute_insert(tag_sql, (enterprise_id, product_id, tag_name))
logger.info(f"更新产品成功: ID {product_id}")
# 记录业务日志
log_update(
target_type='product',
target_id=product_id,
description=f"更新产品信息",
request_data=data
)
return jsonify({
'code': 200,
'message': '更新成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[更新产品] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@product_bp.route('/<int:product_id>', methods=['DELETE'])
@require_auth
def delete_product(product_id):
"""删除产品"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
db_manager = get_db_manager()
# 检查产品是否存在且属于当前企业
check_sql = "SELECT id, name FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
existing = db_manager.execute_query(check_sql, (product_id, enterprise_id))
if not existing:
return jsonify({
'code': 404,
'message': '产品不存在',
'data': None
}), 404
# 软删除产品
sql = "UPDATE ai_products SET status = 'deleted', updated_at = NOW() WHERE id = %s"
db_manager.execute_update(sql, (product_id,))
# 更新企业产品总数
update_sql = "UPDATE ai_enterprises SET products_total = products_total - 1 WHERE id = %s"
db_manager.execute_update(update_sql, (enterprise_id,))
logger.info(f"删除产品成功: {existing[0]['name']}")
# 记录业务日志
log_delete(
target_type='product',
target_id=product_id,
description=f"删除产品: {existing[0]['name']}"
)
return jsonify({
'code': 200,
'message': '删除成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[删除产品] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@product_bp.route('/<int:product_id>', methods=['GET'])
@require_auth
def get_product_detail(product_id):
"""获取产品详情"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
db_manager = get_db_manager()
# 查询产品详情
sql = """
SELECT p.id, p.name, p.type_name, p.knowledge, p.status,
p.articles_total, p.published_total,
p.image_url, p.image_thumbnail_url,
p.created_at, p.updated_at
FROM ai_products p
WHERE p.id = %s AND p.enterprise_id = %s AND p.status != 'deleted'
"""
result = db_manager.execute_query(sql, (product_id, enterprise_id))
if not result:
return jsonify({
'code': 404,
'message': '产品不存在',
'data': None
}), 404
product = result[0]
# 查询产品标签直接从ai_product_tags查询
tag_sql = """
SELECT id, tag_name
FROM ai_product_tags
WHERE product_id = %s
"""
tags = db_manager.execute_query(tag_sql, (product_id,))
product['tags'] = tags
# 暂不查询产品图片(图片关联表结构不同)
product['images'] = []
logger.info(f"获取产品详情成功: ID {product_id}")
return jsonify({
'code': 200,
'message': 'success',
'data': product,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取产品详情] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
# ============================================================================
# 产品图片管理接口 (ai_product_images)
# ============================================================================
@product_bp.route('/<int:product_id>/images', methods=['GET'])
@require_auth
def get_product_images(product_id):
"""获取产品图片列表"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[获取产品图片] 产品ID: {product_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 验证产品归属
check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
if not db_manager.execute_query(check_sql, (product_id, enterprise_id)):
return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404
# 获取分页参数
page = int(request.args.get('page', 1))
page_size = int(request.args.get('pageSize', 20))
status = request.args.get('status', 'active')
offset = (page - 1) * page_size
# 查询总数
count_sql = "SELECT COUNT(*) as total FROM ai_product_images WHERE product_id = %s AND status = %s"
total = db_manager.execute_query(count_sql, (product_id, status))[0]['total']
# 查询图片列表
sql = """
SELECT id, image_id, image_name, image_url, thumbnail_url, type_name,
description, file_size, width, height, upload_user_id, status,
created_at, updated_at
FROM ai_product_images
WHERE product_id = %s AND status = %s
ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""
images = db_manager.execute_query(sql, (product_id, status, page_size, offset))
logger.info(f"[获取产品图片] 成功, 产品ID: {product_id}, 总数: {total}, 返回: {len(images)}")
return jsonify({
'code': 200,
'message': 'success',
'data': {'total': total, 'list': images},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取产品图片] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/<int:product_id>/images', methods=['POST'])
@require_auth
def add_product_image(product_id):
"""添加产品图片"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[添加产品图片] 产品ID: {product_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
user_id = current_user.get('user_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
data = request.get_json()
if not data:
return jsonify({'code': 400, 'message': '请求参数错误', 'data': None}), 400
# 验证必需字段
required = ['image_id', 'image_name', 'image_url']
for field in required:
if not data.get(field):
return jsonify({'code': 400, 'message': f'缺少必需字段: {field}', 'data': None}), 400
db_manager = get_db_manager()
# 验证产品归属
check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
if not db_manager.execute_query(check_sql, (product_id, enterprise_id)):
return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404
# 插入图片
sql = """
INSERT INTO ai_product_images
(enterprise_id, product_id, image_id, image_name, image_url, thumbnail_url,
type_name, description, file_size, width, height, upload_user_id, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
image_id = db_manager.execute_insert(sql, (
enterprise_id, product_id,
data['image_id'], data['image_name'], data['image_url'],
data.get('thumbnail_url', ''), data.get('type_name', ''),
data.get('description', ''), data.get('file_size'),
data.get('width'), data.get('height'),
user_id, 'active'
))
logger.info(f"[添加产品图片] 成功, ID: {image_id}, 产品ID: {product_id}")
log_create('product_image', image_id, f"添加产品图片: {data['image_name']}", data, {'id': image_id})
return jsonify({
'code': 200,
'message': '添加成功',
'data': {'id': image_id},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[添加产品图片] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/<int:product_id>/images/<int:image_id>', methods=['PUT'])
@require_auth
def update_product_image(product_id, image_id):
"""更新产品图片"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
data = request.get_json()
if not data:
return jsonify({'code': 400, 'message': '请求参数错误', 'data': None}), 400
db_manager = get_db_manager()
# 验证图片归属
check_sql = "SELECT id FROM ai_product_images WHERE id = %s AND product_id = %s AND enterprise_id = %s"
if not db_manager.execute_query(check_sql, (image_id, product_id, enterprise_id)):
return jsonify({'code': 404, 'message': '图片不存在', 'data': None}), 404
# 构建更新
update_fields = []
params = []
field_mapping = {
'image_name': 'image_name', 'image_url': 'image_url',
'thumbnail_url': 'thumbnail_url', 'type_name': 'type_name',
'description': 'description', 'file_size': 'file_size',
'width': 'width', 'height': 'height'
}
for key, db_field in field_mapping.items():
if key in data:
update_fields.append(f"{db_field} = %s")
params.append(data[key])
if update_fields:
params.append(image_id)
sql = f"UPDATE ai_product_images SET {', '.join(update_fields)}, updated_at = NOW() WHERE id = %s"
db_manager.execute_update(sql, params)
logger.info(f"[更新产品图片] 成功, ID: {image_id}")
log_update('product_image', image_id, "更新产品图片", data)
return jsonify({
'code': 200,
'message': '更新成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[更新产品图片] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/<int:product_id>/images/<int:image_id>', methods=['DELETE'])
@require_auth
def delete_product_image(product_id, image_id):
"""删除产品图片"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 验证图片归属
check_sql = "SELECT image_name FROM ai_product_images WHERE id = %s AND product_id = %s AND enterprise_id = %s"
result = db_manager.execute_query(check_sql, (image_id, product_id, enterprise_id))
if not result:
return jsonify({'code': 404, 'message': '图片不存在', 'data': None}), 404
# 软删除
sql = "UPDATE ai_product_images SET status = 'deleted', updated_at = NOW() WHERE id = %s"
db_manager.execute_update(sql, (image_id,))
logger.info(f"[删除产品图片] 成功, ID: {image_id}")
log_delete('product_image', image_id, f"删除产品图片: {result[0]['image_name']}")
return jsonify({
'code': 200,
'message': '删除成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[删除产品图片] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
# ============================================================================
# 产品标签管理接口 (ai_product_tags)
# ============================================================================
@product_bp.route('/<int:product_id>/tags', methods=['GET'])
@require_auth
def get_product_tags(product_id):
"""获取产品标签列表"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[获取产品标签] 产品ID: {product_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 验证产品归属
check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
if not db_manager.execute_query(check_sql, (product_id, enterprise_id)):
return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404
# 查询标签
sql = """
SELECT id, tag_name, created_at
FROM ai_product_tags
WHERE product_id = %s AND enterprise_id = %s
ORDER BY created_at DESC
"""
tags = db_manager.execute_query(sql, (product_id, enterprise_id))
logger.info(f"[获取产品标签] 成功, 产品ID: {product_id}, 数量: {len(tags)}")
return jsonify({
'code': 200,
'message': 'success',
'data': {'list': tags},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取产品标签] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/<int:product_id>/tags', methods=['POST'])
@require_auth
def add_product_tag(product_id):
"""添加产品标签"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[添加产品标签] 产品ID: {product_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
data = request.get_json()
if not data or not data.get('tag_name'):
return jsonify({'code': 400, 'message': '标签名称不能为空', 'data': None}), 400
db_manager = get_db_manager()
# 验证产品归属
check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
if not db_manager.execute_query(check_sql, (product_id, enterprise_id)):
return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404
# 检查标签是否已存在
exists_sql = "SELECT id FROM ai_product_tags WHERE product_id = %s AND enterprise_id = %s AND tag_name = %s"
if db_manager.execute_query(exists_sql, (product_id, enterprise_id, data['tag_name'])):
return jsonify({'code': 400, 'message': '标签已存在', 'data': None}), 400
# 插入标签
sql = "INSERT INTO ai_product_tags (enterprise_id, product_id, tag_name) VALUES (%s, %s, %s)"
tag_id = db_manager.execute_insert(sql, (enterprise_id, product_id, data['tag_name']))
logger.info(f"[添加产品标签] 成功, ID: {tag_id}, 标签: {data['tag_name']}")
log_create('product_tag', tag_id, f"添加产品标签: {data['tag_name']}", data, {'id': tag_id})
return jsonify({
'code': 200,
'message': '添加成功',
'data': {'id': tag_id},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[添加产品标签] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/<int:product_id>/tags/<int:tag_id>', methods=['DELETE'])
@require_auth
def delete_product_tag(product_id, tag_id):
"""删除产品标签"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 验证标签归属
check_sql = "SELECT tag_name FROM ai_product_tags WHERE id = %s AND product_id = %s AND enterprise_id = %s"
result = db_manager.execute_query(check_sql, (tag_id, product_id, enterprise_id))
if not result:
return jsonify({'code': 404, 'message': '标签不存在', 'data': None}), 404
# 删除标签
sql = "DELETE FROM ai_product_tags WHERE id = %s"
db_manager.execute_update(sql, (tag_id,))
logger.info(f"[删除产品标签] 成功, ID: {tag_id}")
log_delete('product_tag', tag_id, f"删除产品标签: {result[0]['tag_name']}")
return jsonify({
'code': 200,
'message': '删除成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[删除产品标签] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
# ============================================================================
# 产品类型管理接口 (ai_product_types)
# ============================================================================
@product_bp.route('/<int:product_id>/types', methods=['GET'])
@require_auth
def get_product_types(product_id):
"""获取产品类型列表"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[获取产品类型] 产品ID: {product_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 验证产品归属
check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
if not db_manager.execute_query(check_sql, (product_id, enterprise_id)):
return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404
# 查询类型
sql = """
SELECT id, type_name, created_at
FROM ai_product_types
WHERE product_id = %s AND enterprise_id = %s
ORDER BY created_at DESC
"""
types = db_manager.execute_query(sql, (product_id, enterprise_id))
logger.info(f"[获取产品类型] 成功, 产品ID: {product_id}, 数量: {len(types)}")
return jsonify({
'code': 200,
'message': 'success',
'data': {'list': types},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取产品类型] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/<int:product_id>/types', methods=['POST'])
@require_auth
def add_product_type(product_id):
"""添加产品类型"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[添加产品类型] 产品ID: {product_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
data = request.get_json()
if not data or not data.get('type_name'):
return jsonify({'code': 400, 'message': '类型名称不能为空', 'data': None}), 400
db_manager = get_db_manager()
# 验证产品归属
check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
if not db_manager.execute_query(check_sql, (product_id, enterprise_id)):
return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404
# 检查类型是否已存在
exists_sql = "SELECT id FROM ai_product_types WHERE product_id = %s AND enterprise_id = %s AND type_name = %s"
if db_manager.execute_query(exists_sql, (product_id, enterprise_id, data['type_name'])):
return jsonify({'code': 400, 'message': '类型已存在', 'data': None}), 400
# 插入类型
sql = "INSERT INTO ai_product_types (enterprise_id, product_id, type_name) VALUES (%s, %s, %s)"
type_id = db_manager.execute_insert(sql, (enterprise_id, product_id, data['type_name']))
logger.info(f"[添加产品类型] 成功, ID: {type_id}, 类型: {data['type_name']}")
log_create('product_type', type_id, f"添加产品类型: {data['type_name']}", data, {'id': type_id})
return jsonify({
'code': 200,
'message': '添加成功',
'data': {'id': type_id},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[添加产品类型] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/<int:product_id>/types/<int:type_id>', methods=['DELETE'])
@require_auth
def delete_product_type(product_id, type_id):
"""删除产品类型"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 验证类型归属
check_sql = "SELECT type_name FROM ai_product_types WHERE id = %s AND product_id = %s AND enterprise_id = %s"
result = db_manager.execute_query(check_sql, (type_id, product_id, enterprise_id))
if not result:
return jsonify({'code': 404, 'message': '类型不存在', 'data': None}), 404
# 删除类型
sql = "DELETE FROM ai_product_types WHERE id = %s"
db_manager.execute_update(sql, (type_id,))
logger.info(f"[删除产品类型] 成功, ID: {type_id}")
log_delete('product_type', type_id, f"删除产品类型: {result[0]['type_name']}")
return jsonify({
'code': 200,
'message': '删除成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[删除产品类型] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
# ============================================================================
# 企业产品类型管理接口 (基于enterprise_id的独立管理)
# ============================================================================
@product_bp.route('/types/list', methods=['GET'])
@require_auth
def get_enterprise_product_types():
"""获取企业所有产品类型列表(独立接口)"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[获取企业产品类型列表] 开始处理请求, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
logger.warning(f"[获取企业产品类型列表] 无法获取企业ID, IP: {client_ip}")
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
logger.info(f"[获取企业产品类型列表] 企业ID: {enterprise_id}, IP: {client_ip}")
# 获取分页参数
page = int(request.args.get('page', 1))
page_size = int(request.args.get('pageSize', 20))
keyword = request.args.get('keyword', '').strip()
offset = (page - 1) * page_size
db_manager = get_db_manager()
# 构建查询条件
where_conditions = ["enterprise_id = %s"]
params = [enterprise_id]
if keyword:
where_conditions.append("type_name LIKE %s")
params.append(f"%{keyword}%")
where_clause = " AND ".join(where_conditions)
# 查询总数
count_sql = f"SELECT COUNT(*) as total FROM ai_product_types WHERE {where_clause}"
total = db_manager.execute_query(count_sql, params)[0]['total']
# 查询类型列表
sql = f"""
SELECT id, type_name, enterprise_id, product_id, created_at
FROM ai_product_types
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""
params.extend([page_size, offset])
types = db_manager.execute_query(sql, params)
logger.info(f"[获取企业产品类型列表] 查询成功, 企业ID: {enterprise_id}, 总数: {total}, 返回: {len(types)}, IP: {client_ip}")
return jsonify({
'code': 200,
'message': 'success',
'data': {'total': total, 'list': types},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取企业产品类型列表] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/types/create', methods=['POST'])
@require_auth
def create_enterprise_product_type():
"""创建企业产品类型(独立接口)"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[创建企业产品类型] 开始处理请求, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
logger.warning(f"[创建企业产品类型] 无法获取企业ID, IP: {client_ip}")
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
data = request.get_json()
if not data or not data.get('type_name'):
logger.warning(f"[创建企业产品类型] 类型名称不能为空, 企业ID: {enterprise_id}, IP: {client_ip}")
return jsonify({'code': 400, 'message': '类型名称不能为空', 'data': None}), 400
product_id = data.get('product_id', 0) # 默认0表示不关联产品
type_name = data['type_name']
logger.info(f"[创建企业产品类型] 类型名称: {type_name}, 产品ID: {product_id}, 企业ID: {enterprise_id}, IP: {client_ip}")
db_manager = get_db_manager()
# 如果指定了产品ID验证产品归属
if product_id > 0:
check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
if not db_manager.execute_query(check_sql, (product_id, enterprise_id)):
logger.warning(f"[创建企业产品类型] 产品不存在或不属于当前企业, 产品ID: {product_id}, 企业ID: {enterprise_id}")
return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404
# 检查类型是否已存在
exists_sql = "SELECT id FROM ai_product_types WHERE enterprise_id = %s AND product_id = %s AND type_name = %s"
if db_manager.execute_query(exists_sql, (enterprise_id, product_id, type_name)):
logger.warning(f"[创建企业产品类型] 类型已存在, 类型: {type_name}, 企业ID: {enterprise_id}")
return jsonify({'code': 400, 'message': '该类型已存在', 'data': None}), 400
# 插入类型
sql = "INSERT INTO ai_product_types (enterprise_id, product_id, type_name) VALUES (%s, %s, %s)"
type_id = db_manager.execute_insert(sql, (enterprise_id, product_id, type_name))
logger.info(f"[创建企业产品类型] 创建成功, ID: {type_id}, 类型: {type_name}, 企业ID: {enterprise_id}, IP: {client_ip}")
log_create('enterprise_product_type', type_id, f"创建产品类型: {type_name}", data, {'id': type_id})
return jsonify({
'code': 200,
'message': '创建成功',
'data': {'id': type_id},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[创建企业产品类型] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/types/<int:type_id>', methods=['PUT'])
@require_auth
def update_enterprise_product_type(type_id):
"""更新企业产品类型(独立接口)"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[更新企业产品类型] 类型ID: {type_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
data = request.get_json()
if not data:
return jsonify({'code': 400, 'message': '请求参数错误', 'data': None}), 400
db_manager = get_db_manager()
# 验证类型归属
check_sql = "SELECT type_name, product_id FROM ai_product_types WHERE id = %s AND enterprise_id = %s"
result = db_manager.execute_query(check_sql, (type_id, enterprise_id))
if not result:
logger.warning(f"[更新企业产品类型] 类型不存在, ID: {type_id}, 企业ID: {enterprise_id}")
return jsonify({'code': 404, 'message': '类型不存在', 'data': None}), 404
old_data = result[0]
# 构建更新
update_fields = []
params = []
if 'type_name' in data:
# 检查新名称是否与其他类型重复
exists_sql = "SELECT id FROM ai_product_types WHERE enterprise_id = %s AND product_id = %s AND type_name = %s AND id != %s"
if db_manager.execute_query(exists_sql, (enterprise_id, old_data['product_id'], data['type_name'], type_id)):
return jsonify({'code': 400, 'message': '类型名称已存在', 'data': None}), 400
update_fields.append("type_name = %s")
params.append(data['type_name'])
if 'product_id' in data:
# 验证新产品归属
if data['product_id'] > 0:
check_product_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
if not db_manager.execute_query(check_product_sql, (data['product_id'], enterprise_id)):
return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404
update_fields.append("product_id = %s")
params.append(data['product_id'])
if update_fields:
params.append(type_id)
sql = f"UPDATE ai_product_types SET {', '.join(update_fields)} WHERE id = %s"
db_manager.execute_update(sql, params)
logger.info(f"[更新企业产品类型] 更新成功, ID: {type_id}, 企业ID: {enterprise_id}")
log_update('enterprise_product_type', type_id, "更新产品类型", data)
return jsonify({
'code': 200,
'message': '更新成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[更新企业产品类型] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/types/<int:type_id>', methods=['DELETE'])
@require_auth
def delete_enterprise_product_type(type_id):
"""删除企业产品类型(独立接口)"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[删除企业产品类型] 类型ID: {type_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 验证类型归属
check_sql = "SELECT type_name FROM ai_product_types WHERE id = %s AND enterprise_id = %s"
result = db_manager.execute_query(check_sql, (type_id, enterprise_id))
if not result:
logger.warning(f"[删除企业产品类型] 类型不存在, ID: {type_id}, 企业ID: {enterprise_id}")
return jsonify({'code': 404, 'message': '类型不存在', 'data': None}), 404
type_name = result[0]['type_name']
# 删除类型
sql = "DELETE FROM ai_product_types WHERE id = %s"
db_manager.execute_update(sql, (type_id,))
logger.info(f"[删除企业产品类型] 删除成功, ID: {type_id}, 类型: {type_name}, 企业ID: {enterprise_id}, IP: {client_ip}")
log_delete('enterprise_product_type', type_id, f"删除产品类型: {type_name}")
return jsonify({
'code': 200,
'message': '删除成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[删除企业产品类型] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/types/<int:type_id>', methods=['GET'])
@require_auth
def get_enterprise_product_type_detail(type_id):
"""获取企业产品类型详情(独立接口)"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[获取企业产品类型详情] 类型ID: {type_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 查询类型详情
sql = """
SELECT id, type_name, enterprise_id, product_id, created_at
FROM ai_product_types
WHERE id = %s AND enterprise_id = %s
"""
result = db_manager.execute_query(sql, (type_id, enterprise_id))
if not result:
logger.warning(f"[获取企业产品类型详情] 类型不存在, ID: {type_id}, 企业ID: {enterprise_id}")
return jsonify({'code': 404, 'message': '类型不存在', 'data': None}), 404
type_data = result[0]
logger.info(f"[获取企业产品类型详情] 查询成功, ID: {type_id}, 类型: {type_data['type_name']}, 企业ID: {enterprise_id}")
return jsonify({
'code': 200,
'message': 'success',
'data': type_data,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取企业产品类型详情] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500