Files
ai_wht_B/ver_25121821/enterprise_routes.py
“shengyudong” 5a384b694e 2026-1-6
2026-01-06 14:18:39 +08:00

690 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
企业管理接口
"""
from flask import Blueprint, request, jsonify
import logging
from datetime import datetime
from auth_utils import require_auth, require_role, AuthUtils
from database_config import get_db_manager, format_datetime_fields
from log_utils import log_create, log_update, log_delete, log_error
import uuid
logger = logging.getLogger('article_server')
# 创建蓝图
enterprise_bp = Blueprint('enterprise', __name__, url_prefix='/api/enterprises')
@enterprise_bp.route('/list', methods=['GET'])
@require_auth
@require_role('admin', 'enterprise')
def get_enterprises_list():
"""获取企业列表(平台管理员)"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[获取企业列表] 开始处理请求, IP: {client_ip}")
try:
# 获取查询参数
page = int(request.args.get('page', 1))
page_size = int(request.args.get('pageSize', 20))
keyword = request.args.get('keyword', '').strip()
status = request.args.get('status', '').strip()
logger.info(f"[获取企业列表] 查询参数: page={page}, pageSize={page_size}, keyword={keyword}, status={status}, IP: {client_ip}")
# 构建查询条件
where_conditions = ["1=1"]
params = []
if keyword:
where_conditions.append("(name LIKE %s OR short_name LIKE %s OR phone LIKE %s)")
keyword_pattern = f"%{keyword}%"
params.extend([keyword_pattern, keyword_pattern, keyword_pattern])
if status:
where_conditions.append("status = %s")
params.append(status)
where_clause = " AND ".join(where_conditions)
# 计算偏移量
offset = (page - 1) * page_size
db_manager = get_db_manager()
# 查询总数
count_sql = f"SELECT COUNT(*) as total FROM ai_enterprises WHERE {where_clause}"
count_result = db_manager.execute_query(count_sql, params)
total = count_result[0]['total']
# 查询企业列表
sql = f"""
SELECT id, enterprise_ID, name, short_name, phone, email, status,
users_total, products_total, articles_total, published_total,
created_at, updated_at
FROM ai_enterprises
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""
params.extend([page_size, offset])
enterprises = db_manager.execute_query(sql, params)
# 格式化日期时间字段
enterprises = format_datetime_fields(enterprises)
logger.info(f"[获取企业列表] 查询成功, 总数: {total}, 当前页: {page}, 每页: {page_size}, 返回数量: {len(enterprises)}, IP: {client_ip}")
return jsonify({
'code': 200,
'message': 'success',
'data': {
'total': total,
'list': enterprises
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取企业列表] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@enterprise_bp.route('/create', methods=['POST'])
@require_auth
@require_role('admin', 'enterprise')
def create_enterprise():
"""创建企业(平台管理员)"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[创建企业] 开始处理创建企业请求, IP: {client_ip}")
try:
data = request.get_json()
if not data:
logger.warning(f"[创建企业] 请求参数为空, IP: {client_ip}")
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
logger.info(f"[创建企业] 收到创建请求, 企业名称: {data.get('name')}, 简称: {data.get('short_name')}, 手机号: {data.get('phone')}, IP: {client_ip}")
# 验证必需字段
required_fields = ['name', 'short_name', 'phone', 'password']
for field in required_fields:
if not data.get(field):
logger.warning(f"[创建企业] 缺少必需字段: {field}, IP: {client_ip}")
return jsonify({
'code': 400,
'message': f'缺少必需字段: {field}',
'data': None
}), 400
db_manager = get_db_manager()
# ✅ 检查手机号是否已存在ai_enterprises表
logger.info(f"[创建企业] 检查ai_enterprises表手机号是否已存在: {data['phone']}")
check_enterprise_sql = "SELECT id FROM ai_enterprises WHERE phone = %s"
existing_enterprise = db_manager.execute_query(check_enterprise_sql, (data['phone'],))
if existing_enterprise:
logger.warning(f"[创建企业] ai_enterprises表中手机号已存在: {data['phone']}, IP: {client_ip}")
return jsonify({
'code': 409,
'message': '手机号已被使用',
'data': None
}), 409
# ✅ 检查手机号是否已存在ai_users表
logger.info(f"[创建企业] 检查ai_users表手机号是否已存在: {data['phone']}")
check_user_sql = "SELECT id FROM ai_users WHERE phone = %s"
existing_user = db_manager.execute_query(check_user_sql, (data['phone'],))
if existing_user:
logger.warning(f"[创建企业] ai_users表中手机号已存在: {data['phone']}, IP: {client_ip}")
return jsonify({
'code': 409,
'message': '手机号已被使用',
'data': None
}), 409
# 生成企业唯一标识
enterprise_id_str = f"ENT-{datetime.now().strftime('%Y%m%d')}-{str(uuid.uuid4())[:8].upper()}"
logger.info(f"[创建企业] 生成企业ID: {enterprise_id_str}, 企业名称: {data['name']}")
# 加密密码
from auth_utils import AuthUtils
hashed_password = AuthUtils.hash_password(data['password'])
# ✅ 1创建企业ai_enterprises表
logger.info(f"[创建企业] 开始插入ai_enterprises表, 企业ID: {enterprise_id_str}, 名称: {data['name']}")
enterprise_sql = """
INSERT INTO ai_enterprises
(enterprise_ID, name, short_name, icon, phone, email, website, address, status,
users_total, products_total, published_total, articles_total, released_month_total, linked_to_xhs_num)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
enterprise_db_id = db_manager.execute_insert(enterprise_sql, (
enterprise_id_str,
data['name'],
data['short_name'],
data.get('icon', ''),
data['phone'],
data.get('email', ''),
data.get('website', ''),
data.get('address', ''),
'active',
0, 0, 0, 0, 0, 0
))
logger.info(f"[创建企业] ai_enterprises表插入成功, 数据库ID: {enterprise_db_id}, 企业ID: {enterprise_id_str}")
# ✅ 2创建企业管理员用户ai_users表
logger.info(f"[创建企业] 开始创建企业管理员用户, 手机号: {data['phone']}, 角色: enterprise")
user_sql = """
INSERT INTO ai_users
(enterprise_id, enterprise_name, username, password, real_name, email, phone,
department, role, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
# 用户名使用手机号
username = data['phone']
user_db_id = db_manager.execute_insert(user_sql, (
enterprise_db_id, # enterprise_id
data['name'], # enterprise_name
username, # username (使用手机号)
hashed_password, # password (使用传入的密码)
data.get('real_name', data['short_name']), # real_name
data.get('email', ''), # email
data['phone'], # phone
data.get('department', ''), # department
'enterprise', # role (✅ 角色为enterprise)
'active' # status
))
logger.info(f"[创建企业] ai_users表插入成功, 用户ID: {user_db_id}, 用户名: {username}, 角色: enterprise")
logger.info(f"[创建企业] 企业创建成功, 企业DB_ID: {enterprise_db_id}, 企业ID: {enterprise_id_str}, 名称: {data['name']}, 管理员用户ID: {user_db_id}, IP: {client_ip}")
# 记录业务日志
log_create(
target_type='enterprise',
target_id=enterprise_db_id,
description=f"创建企业: {data['name']}, 同时创建企业管理员用户",
request_data={**data, 'password': '***'}, # 隐藏密码
response_data={'enterprise_id': enterprise_db_id, 'enterprise_ID': enterprise_id_str, 'user_id': user_db_id}
)
return jsonify({
'code': 200,
'message': '创建成功',
'data': {
'id': enterprise_db_id,
'enterprise_ID': enterprise_id_str,
'name': data['name'],
'user_id': user_db_id,
'username': username
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[创建企业] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@enterprise_bp.route('/<int:enterprise_id>', methods=['PUT'])
@require_auth
@require_role('admin', 'enterprise')
def update_enterprise(enterprise_id):
"""更新企业信息(平台管理员)"""
try:
data = request.get_json()
if not data:
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
# 检查企业是否存在
db_manager = get_db_manager()
check_sql = "SELECT id FROM ai_enterprises WHERE id = %s"
existing = db_manager.execute_query(check_sql, (enterprise_id,))
if not existing:
return jsonify({
'code': 404,
'message': '企业不存在',
'data': None
}), 404
# 构建更新字段
update_fields = []
params = []
if 'name' in data:
update_fields.append("name = %s")
params.append(data['name'])
if 'short_name' in data:
update_fields.append("short_name = %s")
params.append(data['short_name'])
if 'email' in data:
update_fields.append("email = %s")
params.append(data['email'])
if 'password' in data:
from auth_utils import AuthUtils
update_fields.append("password = %s")
params.append(AuthUtils.hash_password(data['password']))
if not update_fields:
return jsonify({
'code': 400,
'message': '没有要更新的字段',
'data': None
}), 400
# 执行更新
params.append(enterprise_id)
sql = f"UPDATE ai_enterprises SET {', '.join(update_fields)}, updated_at = NOW() WHERE id = %s"
db_manager.execute_update(sql, params)
logger.info(f"更新企业成功: ID {enterprise_id}")
# 记录业务日志
log_update(
target_type='enterprise',
target_id=enterprise_id,
description=f"更新企业信息",
request_data=data
)
return jsonify({
'code': 200,
'message': '更新成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[更新企业] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@enterprise_bp.route('/<int:enterprise_id>/status', methods=['PUT'])
@require_auth
@require_role('admin', 'enterprise')
def toggle_enterprise_status(enterprise_id):
"""切换企业状态(平台管理员)"""
try:
data = request.get_json()
if not data or 'status' not in data:
return jsonify({
'code': 400,
'message': '缺少status参数',
'data': None
}), 400
status = data['status']
if status not in ['active', 'disabled']:
return jsonify({
'code': 400,
'message': '状态值无效',
'data': None
}), 400
db_manager = get_db_manager()
sql = "UPDATE ai_enterprises SET status = %s, updated_at = NOW() WHERE id = %s"
affected = db_manager.execute_update(sql, (status, enterprise_id))
if affected == 0:
return jsonify({
'code': 404,
'message': '企业不存在',
'data': None
}), 404
logger.info(f"切换企业状态成功: ID {enterprise_id}, 状态: {status}")
# 记录业务日志
log_update(
target_type='enterprise',
target_id=enterprise_id,
description=f"切换企业状态: {status}",
request_data=data
)
return jsonify({
'code': 200,
'message': '操作成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[切换企业状态] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@enterprise_bp.route('/<int:enterprise_id>', methods=['DELETE'])
@require_auth
@require_role('admin', 'enterprise')
def delete_enterprise(enterprise_id):
"""删除企业(平台管理员)"""
try:
db_manager = get_db_manager()
# 检查企业是否存在
check_sql = "SELECT id, name FROM ai_enterprises WHERE id = %s"
existing = db_manager.execute_query(check_sql, (enterprise_id,))
if not existing:
return jsonify({
'code': 404,
'message': '企业不存在',
'data': None
}), 404
# 物理删除企业API文档要求
sql = "DELETE FROM ai_enterprises WHERE id = %s"
db_manager.execute_update(sql, (enterprise_id,))
logger.info(f"删除企业成功: {existing[0]['name']}")
# 记录业务日志
log_delete(
target_type='enterprise',
target_id=enterprise_id,
description=f"删除企业: {existing[0]['name']}"
)
return jsonify({
'code': 200,
'message': '删除成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[删除企业] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@enterprise_bp.route('/stats', methods=['GET'])
@require_auth
@require_role('admin', 'enterprise')
def get_enterprises_stats():
"""获取企业统计(平台管理员)"""
try:
db_manager = get_db_manager()
sql = """
SELECT
COUNT(*) as total_count,
SUM(CASE WHEN status = 'active' THEN 1 ELSE 0 END) as active_count,
SUM(CASE WHEN status = 'disabled' THEN 1 ELSE 0 END) as disabled_count,
SUM(users_total) as total_users,
SUM(products_total) as total_products,
SUM(articles_total) as total_articles,
SUM(published_total) as total_published
FROM ai_enterprises
WHERE status != 'deleted'
"""
result = db_manager.execute_query(sql)
stats = result[0] if result else {}
logger.info("获取企业统计成功")
return jsonify({
'code': 200,
'message': 'success',
'data': stats,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取企业统计] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
# 企业管理员接口
@enterprise_bp.route('/info', methods=['GET'])
@require_auth
@require_role('enterprise')
def get_enterprise_info():
"""获取企业信息(企业管理员)"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
db_manager = get_db_manager()
sql = """
SELECT id, enterprise_ID, name, short_name, phone, email, status,
users_total, products_total, articles_total, published_total,
created_at, updated_at
FROM ai_enterprises
WHERE id = %s
"""
result = db_manager.execute_query(sql, (enterprise_id,))
if not result:
return jsonify({
'code': 404,
'message': '企业不存在',
'data': None
}), 404
# 格式化日期时间字段
enterprise_info = format_datetime_fields(result[0])
logger.info(f"获取企业信息成功: ID {enterprise_id}")
return jsonify({
'code': 200,
'message': 'success',
'data': enterprise_info,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取企业信息] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@enterprise_bp.route('/info', methods=['PUT'])
@require_auth
@require_role('enterprise')
def update_enterprise_info():
"""更新企业信息(企业管理员)"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
data = request.get_json()
if not data:
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
# 构建更新字段(企业管理员只能更新部分字段)
update_fields = []
params = []
if 'short_name' in data:
update_fields.append("short_name = %s")
params.append(data['short_name'])
if 'email' in data:
update_fields.append("email = %s")
params.append(data['email'])
if not update_fields:
return jsonify({
'code': 400,
'message': '没有要更新的字段',
'data': None
}), 400
db_manager = get_db_manager()
params.append(enterprise_id)
sql = f"UPDATE ai_enterprises SET {', '.join(update_fields)}, updated_at = NOW() WHERE id = %s"
db_manager.execute_update(sql, params)
logger.info(f"企业管理员更新企业信息成功: ID {enterprise_id}")
# 记录业务日志
log_update(
target_type='enterprise',
target_id=enterprise_id,
description=f"更新企业信息(企业管理员)",
request_data=data
)
return jsonify({
'code': 200,
'message': '更新成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[更新企业信息] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@enterprise_bp.route('/change-password', methods=['PUT'])
@require_auth
@require_role('enterprise')
def change_password():
"""修改企业密码(企业管理员)"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
data = request.get_json()
if not data:
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
old_password = data.get('old_password')
new_password = data.get('new_password')
if not old_password or not new_password:
return jsonify({
'code': 400,
'message': '旧密码和新密码不能为空',
'data': None
}), 400
# 获取当前密码
db_manager = get_db_manager()
sql = "SELECT password FROM ai_enterprises WHERE id = %s"
result = db_manager.execute_query(sql, (enterprise_id,))
if not result:
return jsonify({
'code': 404,
'message': '企业不存在',
'data': None
}), 404
current_password = result[0]['password']
# 验证旧密码
if not AuthUtils.verify_password(old_password, current_password):
logger.warning(f"企业修改密码失败 - 旧密码错误: 企业ID {enterprise_id}")
return jsonify({
'code': 401,
'message': '旧密码错误',
'data': None
}), 401
# 更新密码
hashed_new_password = AuthUtils.hash_password(new_password)
update_sql = "UPDATE ai_enterprises SET password = %s, updated_at = NOW() WHERE id = %s"
db_manager.execute_update(update_sql, (hashed_new_password, enterprise_id))
logger.info(f"企业修改密码成功: ID {enterprise_id}")
# 记录业务日志
log_update(
target_type='enterprise',
target_id=enterprise_id,
description=f"修改企业密码",
request_data={'old_password': '***', 'new_password': '***'} # 隐藏密码
)
return jsonify({
'code': 200,
'message': '密码修改成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[修改密码] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500