Files
ai_wht_B/ver_25121816/product_routes.py
“shengyudong” 5a384b694e 2026-1-6
2026-01-06 14:18:39 +08:00

1243 lines
51 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
产品管理接口
"""
from flask import Blueprint, request, jsonify
import logging
from datetime import datetime
from auth_utils import require_auth, AuthUtils
from database_config import get_db_manager, format_datetime_fields
from log_utils import log_create, log_update, log_delete, log_error, log_operation
logger = logging.getLogger('article_server')
# 创建蓝图
product_bp = Blueprint('product', __name__, url_prefix='/api/products')
@product_bp.route('/list', methods=['GET'])
@require_auth
def get_products_list():
"""获取产品列表"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[获取产品列表] 开始处理请求, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
logger.info(f"[获取产品列表] 用户信息 - 用户ID: {current_user.get('user_id')}, 企业ID: {enterprise_id}, IP: {client_ip}")
if not enterprise_id:
logger.warning(f"[获取产品列表] 无法获取企业ID, IP: {client_ip}")
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
# 获取查询参数
page = int(request.args.get('page', 1))
page_size = int(request.args.get('pageSize', 20))
keyword = request.args.get('keyword', '').strip()
type_id = request.args.get('type', '').strip()
status = request.args.get('status', '').strip()
logger.info(f"[获取产品列表] 查询参数: page={page}, pageSize={page_size}, keyword={keyword}, type={type_id}, status={status}, 企业ID: {enterprise_id}, IP: {client_ip}")
# 构建查询条件
where_conditions = ["enterprise_id = %s", "status != 'deleted'"]
params = [enterprise_id]
if keyword:
where_conditions.append("name LIKE %s")
params.append(f"%{keyword}%")
if type_id:
where_conditions.append("type_name LIKE %s")
params.append(f"%{type_id}%")
if status:
where_conditions.append("status = %s")
params.append(status)
where_clause = " AND ".join(where_conditions)
# 计算偏移量
offset = (page - 1) * page_size
db_manager = get_db_manager()
# 查询总数
count_sql = f"SELECT COUNT(*) as total FROM ai_products WHERE {where_clause}"
count_result = db_manager.execute_query(count_sql, params)
total = count_result[0]['total']
# 查询产品列表
sql = f"""
SELECT p.id, p.name, p.type_name, p.knowledge, p.status,
p.articles_total, p.published_total,
p.image_url, p.image_thumbnail_url,
p.created_at, p.updated_at
FROM ai_products p
WHERE {where_clause}
ORDER BY p.created_at DESC
LIMIT %s OFFSET %s
"""
params.extend([page_size, offset])
products = db_manager.execute_query(sql, params)
# 查询产品标签
for product in products:
tag_sql = """
SELECT id, tag_name
FROM ai_product_tags
WHERE product_id = %s
"""
tags = db_manager.execute_query(tag_sql, (product['id'],))
product['tags'] = tags
# 格式化日期时间字段
products = format_datetime_fields(products)
logger.info(f"[获取产品列表] 查询成功, 总数: {total}, 当前页: {page}, 每页: {page_size}, 返回数量: {len(products)}, 企业ID: {enterprise_id}, IP: {client_ip}")
return jsonify({
'code': 200,
'message': 'success',
'data': {
'total': total,
'list': products
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取产品列表] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@product_bp.route('/create', methods=['POST'])
@require_auth
def create_product():
"""创建产品"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[创建产品] 开始处理创建产品请求, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
logger.info(f"[创建产品] 用户信息 - 用户ID: {current_user.get('user_id')}, 企业ID: {enterprise_id}, IP: {client_ip}")
if not enterprise_id:
logger.warning(f"[创建产品] 无法获取企业ID, IP: {client_ip}")
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
data = request.get_json()
if not data:
logger.warning(f"[创建产品] 请求参数为空, 企业ID: {enterprise_id}, IP: {client_ip}")
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
logger.info(f"[创建产品] 收到创建请求, 产品名称: {data.get('name')}, 类型: {data.get('type_name') or data.get('type_id')}, 企业ID: {enterprise_id}, IP: {client_ip}")
# 验证必需字段
required_fields = ['name', 'type_id', 'knowledge', 'type_name']
for field in required_fields:
if not data.get(field):
logger.warning(f"[创建产品] 缺少必需字段: {field}, 企业ID: {enterprise_id}, IP: {client_ip}")
return jsonify({
'code': 400,
'message': f'缺少必需字段: {field}',
'data': None
}), 400
db_manager = get_db_manager()
# 产品名称判重:检查同一企业下是否已存在相同名称的产品
logger.info(f"[创建产品] 开始检查产品名称是否重复, 名称: {data['name']}, 企业ID: {enterprise_id}")
check_duplicate_sql = """
SELECT id, name FROM ai_products
WHERE enterprise_id = %s AND name = %s AND status != 'deleted'
"""
existing_product = db_manager.execute_query(check_duplicate_sql, (enterprise_id, data['name']))
if existing_product:
logger.warning(f"[创建产品] 产品名称已存在, 名称: {data['name']}, 已存在ID: {existing_product[0]['id']}, 企业ID: {enterprise_id}, IP: {client_ip}")
product_name = data['name']
return jsonify({
'code': 409,
'message': f'产品名称「{product_name}」已存在,请使用其他名称',
'data': None
}), 409
# 创建产品
logger.info(f"[创建产品] 开始插入产品数据, 名称: {data['name']}, 企业ID: {enterprise_id}")
sql = """
INSERT INTO ai_products
(enterprise_id, name, type_id, type_name, knowledge, status, articles_total, published_total, image_url, image_thumbnail_url)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
product_id = db_manager.execute_insert(sql, (
enterprise_id,
data['name'],
data.get('type_id', 0),
data.get('type_name', ''),
data['knowledge'],
'active',
0, 0, # articles_total, published_total
data.get('image_url', ''),
data.get('image_thumbnail_url', '')
))
logger.info(f"[创建产品] 产品插入成功, ID: {product_id}, 名称: {data['name']}, 企业ID: {enterprise_id}")
# 添加标签
if data.get('tags'):
logger.info(f"[创建产品] 开始添加产品标签, 产品ID: {product_id}, 标签数量: {len(data['tags'])}")
for tag_name in data['tags']:
tag_sql = "INSERT INTO ai_product_tags (enterprise_id, product_id, tag_name) VALUES (%s, %s, %s)"
db_manager.execute_insert(tag_sql, (enterprise_id, product_id, tag_name))
logger.info(f"[创建产品] 产品标签添加完成, 产品ID: {product_id}, 数量: {len(data['tags'])}")
# 更新企业产品总数
logger.info(f"[创建产品] 开始更新企业产品总数, 企业ID: {enterprise_id}")
update_sql = "UPDATE ai_enterprises SET products_total = products_total + 1 WHERE id = %s"
db_manager.execute_update(update_sql, (enterprise_id,))
logger.info(f"[创建产品] 产品创建全部完成, ID: {product_id}, 名称: {data['name']}, 企业ID: {enterprise_id}, IP: {client_ip}")
# 记录业务日志
log_create(
target_type='product',
target_id=product_id,
description=f"创建产品: {data['name']}",
request_data=data,
response_data={'id': product_id}
)
return jsonify({
'code': 200,
'message': '创建成功',
'data': {'id': product_id},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[创建产品] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@product_bp.route('/<int:product_id>', methods=['PUT'])
@require_auth
def update_product(product_id):
"""更新产品"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
data = request.get_json()
if not data:
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
db_manager = get_db_manager()
# 检查产品是否存在且属于当前企业
check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
existing = db_manager.execute_query(check_sql, (product_id, enterprise_id))
if not existing:
return jsonify({
'code': 404,
'message': '产品不存在',
'data': None
}), 404
# 构建更新字段
update_fields = []
params = []
if 'name' in data:
update_fields.append("name = %s")
params.append(data['name'])
if 'type_name' in data or 'type_id' in data:
update_fields.append("type_name = %s")
params.append(data.get('type_name', data.get('type_id', '')))
if 'knowledge' in data:
update_fields.append("knowledge = %s")
params.append(data['knowledge'])
if update_fields:
params.append(product_id)
sql = f"UPDATE ai_products SET {', '.join(update_fields)}, updated_at = NOW() WHERE id = %s"
db_manager.execute_update(sql, params)
# 更新标签
if 'tags' in data:
# 删除旧标签
del_sql = "DELETE FROM ai_product_tags WHERE product_id = %s"
db_manager.execute_update(del_sql, (product_id,))
# 添加新标签
for tag_name in data['tags']:
tag_sql = "INSERT INTO ai_product_tags (enterprise_id, product_id, tag_name) VALUES (%s, %s, %s)"
db_manager.execute_insert(tag_sql, (enterprise_id, product_id, tag_name))
logger.info(f"更新产品成功: ID {product_id}")
# 记录业务日志
log_update(
target_type='product',
target_id=product_id,
description=f"更新产品信息",
request_data=data
)
return jsonify({
'code': 200,
'message': '更新成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[更新产品] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@product_bp.route('/<int:product_id>', methods=['DELETE'])
@require_auth
def delete_product(product_id):
"""删除产品"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
db_manager = get_db_manager()
# 检查产品是否存在且属于当前企业
check_sql = "SELECT id, name FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
existing = db_manager.execute_query(check_sql, (product_id, enterprise_id))
if not existing:
return jsonify({
'code': 404,
'message': '产品不存在',
'data': None
}), 404
# 软删除产品
sql = "UPDATE ai_products SET status = 'deleted', updated_at = NOW() WHERE id = %s"
db_manager.execute_update(sql, (product_id,))
# 更新企业产品总数
update_sql = "UPDATE ai_enterprises SET products_total = products_total - 1 WHERE id = %s"
db_manager.execute_update(update_sql, (enterprise_id,))
logger.info(f"删除产品成功: {existing[0]['name']}")
# 记录业务日志
log_delete(
target_type='product',
target_id=product_id,
description=f"删除产品: {existing[0]['name']}"
)
return jsonify({
'code': 200,
'message': '删除成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[删除产品] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@product_bp.route('/<int:product_id>', methods=['GET'])
@require_auth
def get_product_detail(product_id):
"""获取产品详情"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
db_manager = get_db_manager()
# 查询产品详情
sql = """
SELECT p.id, p.name, p.type_name, p.knowledge, p.status,
p.articles_total, p.published_total,
p.image_url, p.image_thumbnail_url,
p.created_at, p.updated_at
FROM ai_products p
WHERE p.id = %s AND p.enterprise_id = %s AND p.status != 'deleted'
"""
result = db_manager.execute_query(sql, (product_id, enterprise_id))
if not result:
return jsonify({
'code': 404,
'message': '产品不存在',
'data': None
}), 404
product = result[0]
# 查询产品标签直接从ai_product_tags查询
tag_sql = """
SELECT id, tag_name
FROM ai_product_tags
WHERE product_id = %s
"""
tags = db_manager.execute_query(tag_sql, (product_id,))
product['tags'] = tags
# 暂不查询产品图片(图片关联表结构不同)
product['images'] = []
logger.info(f"获取产品详情成功: ID {product_id}")
return jsonify({
'code': 200,
'message': 'success',
'data': product,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取产品详情] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
# ============================================================================
# 产品图片管理接口 (ai_product_images)
# ============================================================================
@product_bp.route('/<int:product_id>/images', methods=['GET'])
@require_auth
def get_product_images(product_id):
"""获取产品图片列表"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[获取产品图片] 产品ID: {product_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 验证产品归属
check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
if not db_manager.execute_query(check_sql, (product_id, enterprise_id)):
return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404
# 获取分页参数
page = int(request.args.get('page', 1))
page_size = int(request.args.get('pageSize', 20))
status = request.args.get('status', 'active')
offset = (page - 1) * page_size
# 查询总数
count_sql = "SELECT COUNT(*) as total FROM ai_product_images WHERE product_id = %s AND status = %s"
total = db_manager.execute_query(count_sql, (product_id, status))[0]['total']
# 查询图片列表
sql = """
SELECT id, image_id, image_name, image_url, thumbnail_url, type_name,
description, file_size, width, height, upload_user_id, status,
created_at, updated_at
FROM ai_product_images
WHERE product_id = %s AND status = %s
ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""
images = db_manager.execute_query(sql, (product_id, status, page_size, offset))
logger.info(f"[获取产品图片] 成功, 产品ID: {product_id}, 总数: {total}, 返回: {len(images)}")
return jsonify({
'code': 200,
'message': 'success',
'data': {'total': total, 'list': images},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取产品图片] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/<int:product_id>/images', methods=['POST'])
@require_auth
def add_product_image(product_id):
"""添加产品图片"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[添加产品图片] 产品ID: {product_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
user_id = current_user.get('user_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
data = request.get_json()
if not data:
return jsonify({'code': 400, 'message': '请求参数错误', 'data': None}), 400
# 验证必需字段
required = ['image_id', 'image_name', 'image_url']
for field in required:
if not data.get(field):
return jsonify({'code': 400, 'message': f'缺少必需字段: {field}', 'data': None}), 400
db_manager = get_db_manager()
# 验证产品归属
check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
if not db_manager.execute_query(check_sql, (product_id, enterprise_id)):
return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404
# 插入图片
sql = """
INSERT INTO ai_product_images
(enterprise_id, product_id, image_id, image_name, image_url, thumbnail_url,
type_name, description, file_size, width, height, upload_user_id, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
image_id = db_manager.execute_insert(sql, (
enterprise_id, product_id,
data['image_id'], data['image_name'], data['image_url'],
data.get('thumbnail_url', ''), data.get('type_name', ''),
data.get('description', ''), data.get('file_size'),
data.get('width'), data.get('height'),
user_id, 'active'
))
logger.info(f"[添加产品图片] 成功, ID: {image_id}, 产品ID: {product_id}")
log_create('product_image', image_id, f"添加产品图片: {data['image_name']}", data, {'id': image_id})
return jsonify({
'code': 200,
'message': '添加成功',
'data': {'id': image_id},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[添加产品图片] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/<int:product_id>/images/<int:image_id>', methods=['PUT'])
@require_auth
def update_product_image(product_id, image_id):
"""更新产品图片"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
data = request.get_json()
if not data:
return jsonify({'code': 400, 'message': '请求参数错误', 'data': None}), 400
db_manager = get_db_manager()
# 验证图片归属
check_sql = "SELECT id FROM ai_product_images WHERE id = %s AND product_id = %s AND enterprise_id = %s"
if not db_manager.execute_query(check_sql, (image_id, product_id, enterprise_id)):
return jsonify({'code': 404, 'message': '图片不存在', 'data': None}), 404
# 构建更新
update_fields = []
params = []
field_mapping = {
'image_name': 'image_name', 'image_url': 'image_url',
'thumbnail_url': 'thumbnail_url', 'type_name': 'type_name',
'description': 'description', 'file_size': 'file_size',
'width': 'width', 'height': 'height'
}
for key, db_field in field_mapping.items():
if key in data:
update_fields.append(f"{db_field} = %s")
params.append(data[key])
if update_fields:
params.append(image_id)
sql = f"UPDATE ai_product_images SET {', '.join(update_fields)}, updated_at = NOW() WHERE id = %s"
db_manager.execute_update(sql, params)
logger.info(f"[更新产品图片] 成功, ID: {image_id}")
log_update('product_image', image_id, "更新产品图片", data)
return jsonify({
'code': 200,
'message': '更新成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[更新产品图片] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/<int:product_id>/images/<int:image_id>', methods=['DELETE'])
@require_auth
def delete_product_image(product_id, image_id):
"""删除产品图片"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 验证图片归属
check_sql = "SELECT image_name FROM ai_product_images WHERE id = %s AND product_id = %s AND enterprise_id = %s"
result = db_manager.execute_query(check_sql, (image_id, product_id, enterprise_id))
if not result:
return jsonify({'code': 404, 'message': '图片不存在', 'data': None}), 404
# 软删除
sql = "UPDATE ai_product_images SET status = 'deleted', updated_at = NOW() WHERE id = %s"
db_manager.execute_update(sql, (image_id,))
logger.info(f"[删除产品图片] 成功, ID: {image_id}")
log_delete('product_image', image_id, f"删除产品图片: {result[0]['image_name']}")
return jsonify({
'code': 200,
'message': '删除成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[删除产品图片] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
# ============================================================================
# 产品标签管理接口 (ai_product_tags)
# ============================================================================
@product_bp.route('/<int:product_id>/tags', methods=['GET'])
@require_auth
def get_product_tags(product_id):
"""获取产品标签列表"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[获取产品标签] 产品ID: {product_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 验证产品归属
check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
if not db_manager.execute_query(check_sql, (product_id, enterprise_id)):
return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404
# 查询标签
sql = """
SELECT id, tag_name, created_at
FROM ai_product_tags
WHERE product_id = %s AND enterprise_id = %s
ORDER BY created_at DESC
"""
tags = db_manager.execute_query(sql, (product_id, enterprise_id))
logger.info(f"[获取产品标签] 成功, 产品ID: {product_id}, 数量: {len(tags)}")
return jsonify({
'code': 200,
'message': 'success',
'data': {'list': tags},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取产品标签] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/<int:product_id>/tags', methods=['POST'])
@require_auth
def add_product_tag(product_id):
"""添加产品标签"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[添加产品标签] 产品ID: {product_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
data = request.get_json()
if not data or not data.get('tag_name'):
return jsonify({'code': 400, 'message': '标签名称不能为空', 'data': None}), 400
db_manager = get_db_manager()
# 验证产品归属
check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
if not db_manager.execute_query(check_sql, (product_id, enterprise_id)):
return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404
# 检查标签是否已存在
exists_sql = "SELECT id FROM ai_product_tags WHERE product_id = %s AND enterprise_id = %s AND tag_name = %s"
if db_manager.execute_query(exists_sql, (product_id, enterprise_id, data['tag_name'])):
return jsonify({'code': 400, 'message': '标签已存在', 'data': None}), 400
# 插入标签
sql = "INSERT INTO ai_product_tags (enterprise_id, product_id, tag_name) VALUES (%s, %s, %s)"
tag_id = db_manager.execute_insert(sql, (enterprise_id, product_id, data['tag_name']))
logger.info(f"[添加产品标签] 成功, ID: {tag_id}, 标签: {data['tag_name']}")
log_create('product_tag', tag_id, f"添加产品标签: {data['tag_name']}", data, {'id': tag_id})
return jsonify({
'code': 200,
'message': '添加成功',
'data': {'id': tag_id},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[添加产品标签] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/<int:product_id>/tags/<int:tag_id>', methods=['DELETE'])
@require_auth
def delete_product_tag(product_id, tag_id):
"""删除产品标签"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 验证标签归属
check_sql = "SELECT tag_name FROM ai_product_tags WHERE id = %s AND product_id = %s AND enterprise_id = %s"
result = db_manager.execute_query(check_sql, (tag_id, product_id, enterprise_id))
if not result:
return jsonify({'code': 404, 'message': '标签不存在', 'data': None}), 404
# 删除标签
sql = "DELETE FROM ai_product_tags WHERE id = %s"
db_manager.execute_update(sql, (tag_id,))
logger.info(f"[删除产品标签] 成功, ID: {tag_id}")
log_delete('product_tag', tag_id, f"删除产品标签: {result[0]['tag_name']}")
return jsonify({
'code': 200,
'message': '删除成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[删除产品标签] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
# ============================================================================
# 产品类型管理接口 (ai_product_types)
# ============================================================================
@product_bp.route('/<int:product_id>/types', methods=['GET'])
@require_auth
def get_product_types(product_id):
"""获取产品类型列表"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[获取产品类型] 产品ID: {product_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 验证产品归属
check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
if not db_manager.execute_query(check_sql, (product_id, enterprise_id)):
return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404
# 查询类型
sql = """
SELECT id, type_name, created_at
FROM ai_product_types
WHERE product_id = %s AND enterprise_id = %s
ORDER BY created_at DESC
"""
types = db_manager.execute_query(sql, (product_id, enterprise_id))
logger.info(f"[获取产品类型] 成功, 产品ID: {product_id}, 数量: {len(types)}")
return jsonify({
'code': 200,
'message': 'success',
'data': {'list': types},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取产品类型] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/<int:product_id>/types', methods=['POST'])
@require_auth
def add_product_type(product_id):
"""添加产品类型"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[添加产品类型] 产品ID: {product_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
data = request.get_json()
if not data or not data.get('type_name'):
return jsonify({'code': 400, 'message': '类型名称不能为空', 'data': None}), 400
db_manager = get_db_manager()
# 验证产品归属
check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
if not db_manager.execute_query(check_sql, (product_id, enterprise_id)):
return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404
# 检查类型是否已存在
exists_sql = "SELECT id FROM ai_product_types WHERE product_id = %s AND enterprise_id = %s AND type_name = %s"
if db_manager.execute_query(exists_sql, (product_id, enterprise_id, data['type_name'])):
return jsonify({'code': 400, 'message': '类型已存在', 'data': None}), 400
# 插入类型
sql = "INSERT INTO ai_product_types (enterprise_id, product_id, type_name) VALUES (%s, %s, %s)"
type_id = db_manager.execute_insert(sql, (enterprise_id, product_id, data['type_name']))
logger.info(f"[添加产品类型] 成功, ID: {type_id}, 类型: {data['type_name']}")
log_create('product_type', type_id, f"添加产品类型: {data['type_name']}", data, {'id': type_id})
return jsonify({
'code': 200,
'message': '添加成功',
'data': {'id': type_id},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[添加产品类型] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/<int:product_id>/types/<int:type_id>', methods=['DELETE'])
@require_auth
def delete_product_type(product_id, type_id):
"""删除产品类型"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 验证类型归属
check_sql = "SELECT type_name FROM ai_product_types WHERE id = %s AND product_id = %s AND enterprise_id = %s"
result = db_manager.execute_query(check_sql, (type_id, product_id, enterprise_id))
if not result:
return jsonify({'code': 404, 'message': '类型不存在', 'data': None}), 404
# 删除类型
sql = "DELETE FROM ai_product_types WHERE id = %s"
db_manager.execute_update(sql, (type_id,))
logger.info(f"[删除产品类型] 成功, ID: {type_id}")
log_delete('product_type', type_id, f"删除产品类型: {result[0]['type_name']}")
return jsonify({
'code': 200,
'message': '删除成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[删除产品类型] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
# ============================================================================
# 企业产品类型管理接口 (基于enterprise_id的独立管理)
# ============================================================================
@product_bp.route('/types/list', methods=['GET'])
@require_auth
def get_enterprise_product_types():
"""获取企业所有产品类型列表(独立接口)"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[获取企业产品类型列表] 开始处理请求, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
logger.warning(f"[获取企业产品类型列表] 无法获取企业ID, IP: {client_ip}")
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
logger.info(f"[获取企业产品类型列表] 企业ID: {enterprise_id}, IP: {client_ip}")
# 获取分页参数
page = int(request.args.get('page', 1))
page_size = int(request.args.get('pageSize', 20))
keyword = request.args.get('keyword', '').strip()
offset = (page - 1) * page_size
db_manager = get_db_manager()
# 构建查询条件
where_conditions = ["enterprise_id = %s"]
params = [enterprise_id]
if keyword:
where_conditions.append("type_name LIKE %s")
params.append(f"%{keyword}%")
where_clause = " AND ".join(where_conditions)
# 查询总数
count_sql = f"SELECT COUNT(*) as total FROM ai_product_types WHERE {where_clause}"
total = db_manager.execute_query(count_sql, params)[0]['total']
# 查询类型列表
sql = f"""
SELECT id, type_name, enterprise_id, product_id, created_at
FROM ai_product_types
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""
params.extend([page_size, offset])
types = db_manager.execute_query(sql, params)
logger.info(f"[获取企业产品类型列表] 查询成功, 企业ID: {enterprise_id}, 总数: {total}, 返回: {len(types)}, IP: {client_ip}")
return jsonify({
'code': 200,
'message': 'success',
'data': {'total': total, 'list': types},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取企业产品类型列表] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/types/create', methods=['POST'])
@require_auth
def create_enterprise_product_type():
"""创建企业产品类型(独立接口)"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[创建企业产品类型] 开始处理请求, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
logger.warning(f"[创建企业产品类型] 无法获取企业ID, IP: {client_ip}")
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
data = request.get_json()
if not data or not data.get('type_name'):
logger.warning(f"[创建企业产品类型] 类型名称不能为空, 企业ID: {enterprise_id}, IP: {client_ip}")
return jsonify({'code': 400, 'message': '类型名称不能为空', 'data': None}), 400
product_id = data.get('product_id', 0) # 默认0表示不关联产品
type_name = data['type_name']
logger.info(f"[创建企业产品类型] 类型名称: {type_name}, 产品ID: {product_id}, 企业ID: {enterprise_id}, IP: {client_ip}")
db_manager = get_db_manager()
# 如果指定了产品ID验证产品归属
if product_id > 0:
check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
if not db_manager.execute_query(check_sql, (product_id, enterprise_id)):
logger.warning(f"[创建企业产品类型] 产品不存在或不属于当前企业, 产品ID: {product_id}, 企业ID: {enterprise_id}")
return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404
# 检查类型是否已存在
exists_sql = "SELECT id FROM ai_product_types WHERE enterprise_id = %s AND product_id = %s AND type_name = %s"
if db_manager.execute_query(exists_sql, (enterprise_id, product_id, type_name)):
logger.warning(f"[创建企业产品类型] 类型已存在, 类型: {type_name}, 企业ID: {enterprise_id}")
return jsonify({'code': 400, 'message': '该类型已存在', 'data': None}), 400
# 插入类型
sql = "INSERT INTO ai_product_types (enterprise_id, product_id, type_name) VALUES (%s, %s, %s)"
type_id = db_manager.execute_insert(sql, (enterprise_id, product_id, type_name))
logger.info(f"[创建企业产品类型] 创建成功, ID: {type_id}, 类型: {type_name}, 企业ID: {enterprise_id}, IP: {client_ip}")
log_create('enterprise_product_type', type_id, f"创建产品类型: {type_name}", data, {'id': type_id})
return jsonify({
'code': 200,
'message': '创建成功',
'data': {'id': type_id},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[创建企业产品类型] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/types/<int:type_id>', methods=['PUT'])
@require_auth
def update_enterprise_product_type(type_id):
"""更新企业产品类型(独立接口)"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[更新企业产品类型] 类型ID: {type_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
data = request.get_json()
if not data:
return jsonify({'code': 400, 'message': '请求参数错误', 'data': None}), 400
db_manager = get_db_manager()
# 验证类型归属
check_sql = "SELECT type_name, product_id FROM ai_product_types WHERE id = %s AND enterprise_id = %s"
result = db_manager.execute_query(check_sql, (type_id, enterprise_id))
if not result:
logger.warning(f"[更新企业产品类型] 类型不存在, ID: {type_id}, 企业ID: {enterprise_id}")
return jsonify({'code': 404, 'message': '类型不存在', 'data': None}), 404
old_data = result[0]
# 构建更新
update_fields = []
params = []
if 'type_name' in data:
# 检查新名称是否与其他类型重复
exists_sql = "SELECT id FROM ai_product_types WHERE enterprise_id = %s AND product_id = %s AND type_name = %s AND id != %s"
if db_manager.execute_query(exists_sql, (enterprise_id, old_data['product_id'], data['type_name'], type_id)):
return jsonify({'code': 400, 'message': '类型名称已存在', 'data': None}), 400
update_fields.append("type_name = %s")
params.append(data['type_name'])
if 'product_id' in data:
# 验证新产品归属
if data['product_id'] > 0:
check_product_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
if not db_manager.execute_query(check_product_sql, (data['product_id'], enterprise_id)):
return jsonify({'code': 404, 'message': '产品不存在', 'data': None}), 404
update_fields.append("product_id = %s")
params.append(data['product_id'])
if update_fields:
params.append(type_id)
sql = f"UPDATE ai_product_types SET {', '.join(update_fields)} WHERE id = %s"
db_manager.execute_update(sql, params)
logger.info(f"[更新企业产品类型] 更新成功, ID: {type_id}, 企业ID: {enterprise_id}")
log_update('enterprise_product_type', type_id, "更新产品类型", data)
return jsonify({
'code': 200,
'message': '更新成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[更新企业产品类型] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/types/<int:type_id>', methods=['DELETE'])
@require_auth
def delete_enterprise_product_type(type_id):
"""删除企业产品类型(独立接口)"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[删除企业产品类型] 类型ID: {type_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 验证类型归属
check_sql = "SELECT type_name FROM ai_product_types WHERE id = %s AND enterprise_id = %s"
result = db_manager.execute_query(check_sql, (type_id, enterprise_id))
if not result:
logger.warning(f"[删除企业产品类型] 类型不存在, ID: {type_id}, 企业ID: {enterprise_id}")
return jsonify({'code': 404, 'message': '类型不存在', 'data': None}), 404
type_name = result[0]['type_name']
# 删除类型
sql = "DELETE FROM ai_product_types WHERE id = %s"
db_manager.execute_update(sql, (type_id,))
logger.info(f"[删除企业产品类型] 删除成功, ID: {type_id}, 类型: {type_name}, 企业ID: {enterprise_id}, IP: {client_ip}")
log_delete('enterprise_product_type', type_id, f"删除产品类型: {type_name}")
return jsonify({
'code': 200,
'message': '删除成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[删除企业产品类型] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@product_bp.route('/types/<int:type_id>', methods=['GET'])
@require_auth
def get_enterprise_product_type_detail(type_id):
"""获取企业产品类型详情(独立接口)"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[获取企业产品类型详情] 类型ID: {type_id}, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({'code': 400, 'message': '无法获取企业ID', 'data': None}), 400
db_manager = get_db_manager()
# 查询类型详情
sql = """
SELECT id, type_name, enterprise_id, product_id, created_at
FROM ai_product_types
WHERE id = %s AND enterprise_id = %s
"""
result = db_manager.execute_query(sql, (type_id, enterprise_id))
if not result:
logger.warning(f"[获取企业产品类型详情] 类型不存在, ID: {type_id}, 企业ID: {enterprise_id}")
return jsonify({'code': 404, 'message': '类型不存在', 'data': None}), 404
type_data = result[0]
logger.info(f"[获取企业产品类型详情] 查询成功, ID: {type_id}, 类型: {type_data['type_name']}, 企业ID: {enterprise_id}")
return jsonify({
'code': 200,
'message': 'success',
'data': type_data,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取企业产品类型详情] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500