Files
ai_wht_B/ver_25121621/product_routes.py
“shengyudong” 5a384b694e 2026-1-6
2026-01-06 14:18:39 +08:00

419 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
产品管理接口
"""
from flask import Blueprint, request, jsonify
import logging
from datetime import datetime
from auth_utils import require_auth, AuthUtils
from database_config import get_db_manager
from log_utils import log_create, log_update, log_delete, log_error, log_operation
logger = logging.getLogger(__name__)
# 创建蓝图
product_bp = Blueprint('product', __name__, url_prefix='/api/products')
@product_bp.route('/list', methods=['GET'])
@require_auth
def get_products_list():
"""获取产品列表"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
# 获取查询参数
page = int(request.args.get('page', 1))
page_size = int(request.args.get('pageSize', 20))
keyword = request.args.get('keyword', '').strip()
type_id = request.args.get('type', '').strip()
status = request.args.get('status', '').strip()
# 构建查询条件
where_conditions = ["enterprise_id = %s", "status != 'deleted'"]
params = [enterprise_id]
if keyword:
where_conditions.append("name LIKE %s")
params.append(f"%{keyword}%")
if type_id:
where_conditions.append("type_name LIKE %s")
params.append(f"%{type_id}%")
if status:
where_conditions.append("status = %s")
params.append(status)
where_clause = " AND ".join(where_conditions)
# 计算偏移量
offset = (page - 1) * page_size
db_manager = get_db_manager()
# 查询总数
count_sql = f"SELECT COUNT(*) as total FROM ai_products WHERE {where_clause}"
count_result = db_manager.execute_query(count_sql, params)
total = count_result[0]['total']
# 查询产品列表
sql = f"""
SELECT p.id, p.name, p.type_name, p.knowledge, p.status,
p.articles_total, p.published_total,
p.image_url, p.image_thumbnail_url,
p.created_at, p.updated_at
FROM ai_products p
WHERE {where_clause}
ORDER BY p.created_at DESC
LIMIT %s OFFSET %s
"""
params.extend([page_size, offset])
products = db_manager.execute_query(sql, params)
# 查询产品标签
for product in products:
tag_sql = """
SELECT id, tag_name
FROM ai_product_tags
WHERE product_id = %s
"""
tags = db_manager.execute_query(tag_sql, (product['id'],))
product['tags'] = tags
logger.info(f"获取产品列表成功,总数: {total}")
return jsonify({
'code': 200,
'message': 'success',
'data': {
'total': total,
'list': products
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取产品列表] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@product_bp.route('/create', methods=['POST'])
@require_auth
def create_product():
"""创建产品"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
data = request.get_json()
if not data:
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
# 验证必需字段
required_fields = ['name', 'type_id', 'knowledge']
for field in required_fields:
if not data.get(field):
return jsonify({
'code': 400,
'message': f'缺少必需字段: {field}',
'data': None
}), 400
db_manager = get_db_manager()
# 创建产品
sql = """
INSERT INTO ai_products
(enterprise_id, name, type_name, knowledge, status, articles_total, published_total, image_url, image_thumbnail_url)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
product_id = db_manager.execute_insert(sql, (
enterprise_id,
data['name'],
data.get('type_name', data.get('type_id', '')), # 兼容type_name和type_id
data['knowledge'],
'active',
0, 0, # articles_total, published_total
data.get('image_url', ''),
data.get('image_thumbnail_url', '')
))
# 添加标签
if data.get('tags'):
for tag_name in data['tags']:
tag_sql = "INSERT INTO ai_product_tags (enterprise_id, product_id, tag_name) VALUES (%s, %s, %s)"
db_manager.execute_insert(tag_sql, (enterprise_id, product_id, tag_name))
# 更新企业产品总数
update_sql = "UPDATE ai_enterprises SET products_total = products_total + 1 WHERE id = %s"
db_manager.execute_update(update_sql, (enterprise_id,))
logger.info(f"创建产品成功: {data['name']}, ID: {product_id}")
# 记录业务日志
log_create(
target_type='product',
target_id=product_id,
description=f"创建产品: {data['name']}",
request_data=data,
response_data={'id': product_id}
)
return jsonify({
'code': 200,
'message': '创建成功',
'data': {'id': product_id},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[创建产品] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@product_bp.route('/<int:product_id>', methods=['PUT'])
@require_auth
def update_product(product_id):
"""更新产品"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
data = request.get_json()
if not data:
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
db_manager = get_db_manager()
# 检查产品是否存在且属于当前企业
check_sql = "SELECT id FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
existing = db_manager.execute_query(check_sql, (product_id, enterprise_id))
if not existing:
return jsonify({
'code': 404,
'message': '产品不存在',
'data': None
}), 404
# 构建更新字段
update_fields = []
params = []
if 'name' in data:
update_fields.append("name = %s")
params.append(data['name'])
if 'type_name' in data or 'type_id' in data:
update_fields.append("type_name = %s")
params.append(data.get('type_name', data.get('type_id', '')))
if 'knowledge' in data:
update_fields.append("knowledge = %s")
params.append(data['knowledge'])
if update_fields:
params.append(product_id)
sql = f"UPDATE ai_products SET {', '.join(update_fields)}, updated_at = NOW() WHERE id = %s"
db_manager.execute_update(sql, params)
# 更新标签
if 'tags' in data:
# 删除旧标签
del_sql = "DELETE FROM ai_product_tags WHERE product_id = %s"
db_manager.execute_update(del_sql, (product_id,))
# 添加新标签
for tag_name in data['tags']:
tag_sql = "INSERT INTO ai_product_tags (enterprise_id, product_id, tag_name) VALUES (%s, %s, %s)"
db_manager.execute_insert(tag_sql, (enterprise_id, product_id, tag_name))
logger.info(f"更新产品成功: ID {product_id}")
# 记录业务日志
log_update(
target_type='product',
target_id=product_id,
description=f"更新产品信息",
request_data=data
)
return jsonify({
'code': 200,
'message': '更新成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[更新产品] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@product_bp.route('/<int:product_id>', methods=['DELETE'])
@require_auth
def delete_product(product_id):
"""删除产品"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
db_manager = get_db_manager()
# 检查产品是否存在且属于当前企业
check_sql = "SELECT id, name FROM ai_products WHERE id = %s AND enterprise_id = %s AND status != 'deleted'"
existing = db_manager.execute_query(check_sql, (product_id, enterprise_id))
if not existing:
return jsonify({
'code': 404,
'message': '产品不存在',
'data': None
}), 404
# 软删除产品
sql = "UPDATE ai_products SET status = 'deleted', updated_at = NOW() WHERE id = %s"
db_manager.execute_update(sql, (product_id,))
# 更新企业产品总数
update_sql = "UPDATE ai_enterprises SET products_total = products_total - 1 WHERE id = %s"
db_manager.execute_update(update_sql, (enterprise_id,))
logger.info(f"删除产品成功: {existing[0]['name']}")
# 记录业务日志
log_delete(
target_type='product',
target_id=product_id,
description=f"删除产品: {existing[0]['name']}"
)
return jsonify({
'code': 200,
'message': '删除成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[删除产品] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@product_bp.route('/<int:product_id>', methods=['GET'])
@require_auth
def get_product_detail(product_id):
"""获取产品详情"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
db_manager = get_db_manager()
# 查询产品详情
sql = """
SELECT p.id, p.name, p.type_name, p.knowledge, p.status,
p.articles_total, p.published_total,
p.image_url, p.image_thumbnail_url,
p.created_at, p.updated_at
FROM ai_products p
WHERE p.id = %s AND p.enterprise_id = %s AND p.status != 'deleted'
"""
result = db_manager.execute_query(sql, (product_id, enterprise_id))
if not result:
return jsonify({
'code': 404,
'message': '产品不存在',
'data': None
}), 404
product = result[0]
# 查询产品标签直接从ai_product_tags查询
tag_sql = """
SELECT id, tag_name
FROM ai_product_tags
WHERE product_id = %s
"""
tags = db_manager.execute_query(tag_sql, (product_id,))
product['tags'] = tags
# 暂不查询产品图片(图片关联表结构不同)
product['images'] = []
logger.info(f"获取产品详情成功: ID {product_id}")
return jsonify({
'code': 200,
'message': 'success',
'data': product,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取产品详情] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500