Files
ai_wht_B/release/22/product_routes.py

419 lines
14 KiB
Python
Raw Permalink Normal View History

2026-01-06 14:18:39 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
产品管理接口
"""
from flask import Blueprint, request, jsonify
import logging
from datetime import datetime
from auth_utils import require_auth, AuthUtils
from database_config import get_db_manager
from log_utils import log_create, log_update, log_delete, log_error, log_operation
logger = logging.getLogger(__name__)
# 创建蓝图
product_bp = Blueprint('product', __name__, url_prefix='/api/products')
@product_bp.route('/list', methods=['GET'])
@require_auth
def get_products_list():
"""获取产品列表"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
# 获取查询参数
page = int(request.args.get('page', 1))
page_size = int(request.args.get('pageSize', 20))
keyword = request.args.get('keyword', '').strip()
type_id = request.args.get('type', '').strip()
status = request.args.get('status', '').strip()
# 构建查询条件
where_conditions = ["enterprise_id = %s", "status != 'deleted'"]
params = [enterprise_id]
if keyword:
where_conditions.append("name LIKE %s")
params.append(f"%{keyword}%")
if type_id:
where_conditions.append("type_name LIKE %s")
params.append(f"%{type_id}%")
if status:
where_conditions.append("status = %s")
params.append(status)
where_clause = " AND ".join(where_conditions)
# 计算偏移量
offset = (page - 1) * page_size
db_manager = get_db_manager()
# 查询总数
count_sql = f"SELECT COUNT(*) as total FROM ai_products WHERE {where_clause}"
count_result = db_manager.execute_query(count_sql, params)
total = count_result[0]['total']
# 查询产品列表
sql = f"""
SELECT p.id, p.name, p.type_name, p.knowledge, p.status,
p.articles_total, p.published_total,
p.image_url, p.image_thumbnail_url,
p.created_at, p.updated_at
FROM ai_products p
WHERE {where_clause}
ORDER BY p.created_at DESC
LIMIT %s OFFSET %s
"""
params.extend([page_size, offset])
products = db_manager.execute_query(sql, params)
# 查询产品标签
for product in products:
tag_sql = """
SELECT id, tag_name
FROM ai_product_tags
WHERE product_id = %s
"""
tags = db_manager.execute_query(tag_sql, (product['id'],))
product['tags'] = tags
logger.info(f"获取产品列表成功,总数: {total}")
return jsonify({
'code': 200,
'message': 'success',
'data': {
'total': total,
'list': products
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取产品列表] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@product_bp.route('/create', methods=['POST'])
@require_auth
def create_product():
"""创建产品"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
data = request.get_json()
if not data:
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
# 验证必需字段
required_fields = ['name', 'type_id', 'knowledge']
for field in required_fields:
if not data.get(field):
return jsonify({
'code': 400,
'message': f'缺少必需字段: {field}',
'data': None
}), 400
db_manager = get_db_manager()
# 创建产品
sql = """
INSERT INTO ai_products
(enterprise_id, name, type_name, knowledge, status, articles_total, published_total, image_url, image_thumbnail_url)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
product_id = db_manager.execute_insert(sql, (
enterprise_id,
data['name'],
data.get('type_name', data.get('type_id', '')), # 兼容type_name和type_id
data['knowledge'],
'active',
0, 0, # articles_total, published_total
data.get('image_url', ''),
data.get('image_thumbnail_url', '')
))
# 添加标签
if data.get('tags'):
for tag_name in data['tags']:
tag_sql = "INSERT INTO ai_product_tags (enterprise_id, product_id, tag_name) VALUES (%s, %s, %s)"
db_manager.execute_insert(tag_sql, (enterprise_id, product_id, tag_name))
# 更新企业产品总数
update_sql = "UPDATE ai_enterprises SET products_total = products_total + 1 WHERE id = %s"
db_manager.execute_update(update_sql, (enterprise_id,))
logger.info(f"创建产品成功: {data['name']}, ID: {product_id}")
# 记录业务日志
log_create(
target_type='product',
target_id=product_id,
description=f"创建产品: {data['name']}",
request_data=data,
response_data={'id': product_id}
)
return jsonify({
'code': 200,
'message': '创建成功',
'data': {'id': product_id},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[创建产品] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@product_bp.route('/<int:product_id>', methods=['PUT'])
@require_auth
def update_product(product_id):
"""更新产品"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
data = request.get_json()
if not data:
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
db_manager = get_db_manager()
# 检查产品是否存在且属于当前企业
check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
existing = db_manager.execute_query(check_sql, (product_id, enterprise_id))
if not existing:
return jsonify({
'code': 404,
'message': '产品不存在',
'data': None
}), 404
# 构建更新字段
update_fields = []
params = []
if 'name' in data:
update_fields.append("name = %s")
params.append(data['name'])
if 'type_name' in data or 'type_id' in data:
update_fields.append("type_name = %s")
params.append(data.get('type_name', data.get('type_id', '')))
if 'knowledge' in data:
update_fields.append("knowledge = %s")
params.append(data['knowledge'])
if update_fields:
params.append(product_id)
sql = f"UPDATE ai_products SET {', '.join(update_fields)}, updated_at = NOW() WHERE id = %s"
db_manager.execute_update(sql, params)
# 更新标签
if 'tags' in data:
# 删除旧标签
del_sql = "DELETE FROM ai_product_tags WHERE product_id = %s"
db_manager.execute_update(del_sql, (product_id,))
# 添加新标签
for tag_name in data['tags']:
tag_sql = "INSERT INTO ai_product_tags (enterprise_id, product_id, tag_name) VALUES (%s, %s, %s)"
db_manager.execute_insert(tag_sql, (enterprise_id, product_id, tag_name))
logger.info(f"更新产品成功: ID {product_id}")
# 记录业务日志
log_update(
target_type='product',
target_id=product_id,
description=f"更新产品信息",
request_data=data
)
return jsonify({
'code': 200,
'message': '更新成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[更新产品] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@product_bp.route('/<int:product_id>', methods=['DELETE'])
@require_auth
def delete_product(product_id):
"""删除产品"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
db_manager = get_db_manager()
# 检查产品是否存在且属于当前企业
check_sql = "SELECT id, name FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
existing = db_manager.execute_query(check_sql, (product_id, enterprise_id))
if not existing:
return jsonify({
'code': 404,
'message': '产品不存在',
'data': None
}), 404
# 软删除产品
sql = "UPDATE ai_products SET status = 'deleted', updated_at = NOW() WHERE id = %s"
db_manager.execute_update(sql, (product_id,))
# 更新企业产品总数
update_sql = "UPDATE ai_enterprises SET products_total = products_total - 1 WHERE id = %s"
db_manager.execute_update(update_sql, (enterprise_id,))
logger.info(f"删除产品成功: {existing[0]['name']}")
# 记录业务日志
log_delete(
target_type='product',
target_id=product_id,
description=f"删除产品: {existing[0]['name']}"
)
return jsonify({
'code': 200,
'message': '删除成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[删除产品] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@product_bp.route('/<int:product_id>', methods=['GET'])
@require_auth
def get_product_detail(product_id):
"""获取产品详情"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
db_manager = get_db_manager()
# 查询产品详情
sql = """
SELECT p.id, p.name, p.type_name, p.knowledge, p.status,
p.articles_total, p.published_total,
p.image_url, p.image_thumbnail_url,
p.created_at, p.updated_at
FROM ai_products p
WHERE p.id = %s AND p.enterprise_id = %s AND p.status != 'deleted'
"""
result = db_manager.execute_query(sql, (product_id, enterprise_id))
if not result:
return jsonify({
'code': 404,
'message': '产品不存在',
'data': None
}), 404
product = result[0]
# 查询产品标签直接从ai_product_tags查询
tag_sql = """
SELECT id, tag_name
FROM ai_product_tags
WHERE product_id = %s
"""
tags = db_manager.execute_query(tag_sql, (product_id,))
product['tags'] = tags
# 暂不查询产品图片(图片关联表结构不同)
product['images'] = []
logger.info(f"获取产品详情成功: ID {product_id}")
return jsonify({
'code': 200,
'message': 'success',
'data': product,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取产品详情] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500