Files
ai_wht_B/ver_2512212119/user_routes.py
“shengyudong” 5a384b694e 2026-1-6
2026-01-06 14:18:39 +08:00

609 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
用户管理接口
"""
from flask import Blueprint, request, jsonify
import logging
from datetime import datetime
from auth_utils import require_auth, require_role, AuthUtils
from database_config import get_db_manager, format_datetime_fields
from log_utils import log_create, log_update, log_delete, log_error, log_operation
logger = logging.getLogger('article_server')
# 创建蓝图
user_bp = Blueprint('user', __name__, url_prefix='/api/users')
@user_bp.route('', methods=['GET'])
@require_auth
def get_users():
"""获取用户列表"""
try:
# 获取查询参数
page = int(request.args.get('page', 1))
size = int(request.args.get('size', 10))
role = request.args.get('role')
status = request.args.get('status')
# 构建查询条件
where_conditions = ["1=1"]
params = []
if role:
where_conditions.append("role = %s")
params.append(role)
if status:
where_conditions.append("status = %s")
params.append(status)
where_clause = " AND ".join(where_conditions)
# 计算偏移量
offset = (page - 1) * size
db_manager = get_db_manager()
# 查询总数
count_sql = f"SELECT COUNT(*) as total FROM wht_admin_users WHERE {where_clause}"
count_result = db_manager.execute_query(count_sql, params)
total = count_result[0]['total']
# 查询用户列表
sql = f"""
SELECT id, username, real_name, email, phone, role, status, created_at, updated_at
FROM wht_admin_users
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT %s OFFSET %s
"""
params.extend([size, offset])
users = db_manager.execute_query(sql, params)
# 格式化日期时间字段
users = format_datetime_fields(users)
# 记录操作日志
current_user = AuthUtils.get_current_user()
log_sql = """
INSERT INTO wht_logs (user_id, action, description, ip_address, user_agent, status)
VALUES (%s, %s, %s, %s, %s, %s)
"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
user_agent = request.headers.get('User-Agent', '未知')
db_manager.execute_insert(log_sql, (
current_user['user_id'],
'get_users',
f'获取用户列表,总数: {total}',
client_ip,
user_agent,
'success'
))
logger.info(f"获取用户列表成功,总数: {total}")
return jsonify({
'code': 200,
'message': '获取成功',
'data': {
'list': users,
'total': total,
'page': page,
'size': size
},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取用户列表] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@user_bp.route('/<int:user_id>', methods=['GET'])
@require_auth
def get_user(user_id):
"""获取特定用户"""
try:
db_manager = get_db_manager()
sql = """
SELECT id, username, real_name, email, phone, role, status, created_at, updated_at
FROM wht_admin_users
WHERE id = %s AND status != 'deleted'
"""
result = db_manager.execute_query(sql, (user_id,))
logger.info(f"SQL查询结果: {result}")
if not result:
return jsonify({
'code': 404,
'message': '用户不存在',
'data': None
}), 404
user_data = result[0]
# 格式化日期时间字段
user_data = format_datetime_fields(user_data)
# 记录操作日志
current_user = AuthUtils.get_current_user()
log_sql = """
INSERT INTO wht_logs (user_id, action, target_type, target_id, description, ip_address, user_agent, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
user_agent = request.headers.get('User-Agent', '未知')
db_manager.execute_insert(log_sql, (
current_user['user_id'],
'get_user',
'user',
user_id,
f'获取用户信息: {user_data["username"]}',
client_ip,
user_agent,
'success'
))
logger.info(f"获取用户信息成功: {user_data['username']}")
return jsonify({
'code': 200,
'message': '获取成功',
'data': user_data,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取用户信息] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@user_bp.route('', methods=['POST'])
@require_auth
@require_role('admin')
def create_user():
"""创建用户"""
try:
data = request.get_json()
if not data:
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
# 验证必需字段
required_fields = ['username', 'password', 'real_name', 'role']
for field in required_fields:
if not data.get(field):
return jsonify({
'code': 400,
'message': f'缺少必需字段: {field}',
'data': None
}), 400
# 数据验证
import re
# 验证用户名长度
if len(data['username']) > 50:
return jsonify({
'code': 400,
'message': '用户名长度不能超过50个字符',
'data': None
}), 400
# 验证密码长度
if len(data['password']) < 6:
return jsonify({
'code': 400,
'message': '密码长度不能少于6个字符',
'data': None
}), 400
# 验证邮箱格式
if data.get('email'):
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, data['email']):
return jsonify({
'code': 400,
'message': '邮箱格式不正确',
'data': None
}), 400
# 验证角色
valid_roles = ['admin', 'editor', 'reviewer', 'publisher']
if data['role'] not in valid_roles:
return jsonify({
'code': 400,
'message': '无效的用户角色',
'data': None
}), 400
# 检查用户名是否已存在
db_manager = get_db_manager()
check_sql = "SELECT id FROM wht_admin_users WHERE username = %s"
existing_user = db_manager.execute_query(check_sql, (data['username'],))
if existing_user:
return jsonify({
'code': 409,
'message': '用户名已存在',
'data': None
}), 409
# 创建用户
sql = """
INSERT INTO wht_admin_users (username, password, real_name, email, phone, role, status)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
hashed_password = AuthUtils.hash_password(data['password'])
user_id = db_manager.execute_insert(sql, (
data['username'],
hashed_password,
data['real_name'],
data.get('email'),
data.get('phone'),
data['role'],
'active'
))
# 记录操作日志
current_user = AuthUtils.get_current_user()
log_sql = """
INSERT INTO wht_logs (user_id, action, target_type, target_id, description, ip_address, user_agent, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
user_agent = request.headers.get('User-Agent', '未知')
db_manager.execute_insert(log_sql, (
current_user['user_id'],
'create_user',
'user',
user_id,
f'创建用户: {data["username"]}',
client_ip,
user_agent,
'success'
))
logger.info(f"创建用户成功: {data['username']}, ID: {user_id}")
return jsonify({
'code': 200,
'message': '创建成功',
'data': {'id': user_id},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[创建用户] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@user_bp.route('/<int:user_id>', methods=['PUT'])
@require_auth
@require_role('admin')
def update_user(user_id):
"""更新用户"""
try:
data = request.get_json()
if not data:
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
# 检查用户是否存在
db_manager = get_db_manager()
check_sql = "SELECT id, username FROM wht_admin_users WHERE id = %s"
existing_user = db_manager.execute_query(check_sql, (user_id,))
if not existing_user:
return jsonify({
'code': 404,
'message': '用户不存在',
'data': None
}), 404
# 构建更新字段
update_fields = []
params = []
if 'real_name' in data:
update_fields.append("real_name = %s")
params.append(data['real_name'])
if 'email' in data:
update_fields.append("email = %s")
params.append(data['email'])
if 'phone' in data:
update_fields.append("phone = %s")
params.append(data['phone'])
if 'role' in data:
update_fields.append("role = %s")
params.append(data['role'])
if 'status' in data:
update_fields.append("status = %s")
params.append(data['status'])
if not update_fields:
return jsonify({
'code': 400,
'message': '没有要更新的字段',
'data': None
}), 400
# 执行更新
params.append(user_id)
sql = f"UPDATE wht_admin_users SET {', '.join(update_fields)} WHERE id = %s"
affected_rows = db_manager.execute_update(sql, params)
# 记录操作日志
current_user = AuthUtils.get_current_user()
log_sql = """
INSERT INTO wht_logs (user_id, action, target_type, target_id, description, ip_address, user_agent, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
user_agent = request.headers.get('User-Agent', '未知')
db_manager.execute_insert(log_sql, (
current_user['user_id'],
'update_user',
'user',
user_id,
f'更新用户: {existing_user[0]["username"]}',
client_ip,
user_agent,
'success'
))
logger.info(f"更新用户成功: {existing_user[0]['username']}, 影响行数: {affected_rows}")
return jsonify({
'code': 200,
'message': '更新成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[更新用户] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@user_bp.route('/<int:user_id>', methods=['DELETE'])
@require_auth
@require_role('admin')
def delete_user(user_id):
"""删除用户"""
try:
# 检查用户是否存在
db_manager = get_db_manager()
check_sql = "SELECT id, username FROM wht_admin_users WHERE id = %s"
existing_user = db_manager.execute_query(check_sql, (user_id,))
if not existing_user:
return jsonify({
'code': 404,
'message': '用户不存在',
'data': None
}), 404
# 软删除用户设置状态为deleted
sql = "UPDATE wht_admin_users SET status = 'deleted' WHERE id = %s"
affected_rows = db_manager.execute_update(sql, (user_id,))
# 记录操作日志
current_user = AuthUtils.get_current_user()
log_sql = """
INSERT INTO wht_logs (user_id, action, target_type, target_id, description, ip_address, user_agent, status)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
user_agent = request.headers.get('User-Agent', '未知')
db_manager.execute_insert(log_sql, (
current_user['user_id'],
'delete_user',
'user',
user_id,
f'删除用户: {existing_user[0]["username"]}',
client_ip,
user_agent,
'success'
))
logger.info(f"删除用户成功: {existing_user[0]['username']}, 影响行数: {affected_rows}")
return jsonify({
'code': 200,
'message': '删除成功',
'data': None,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[删除用户] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500
@user_bp.route('/info', methods=['GET'])
@require_auth
def get_user_info():
"""获取当前用户信息默认不包含password企业主可通过action=need_password获取"""
client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知'))
logger.info(f"[获取用户信息] 开始处理请求, IP: {client_ip}")
try:
current_user = AuthUtils.get_current_user()
user_id = current_user.get('user_id')
if not user_id:
logger.warning(f"[获取用户信息] 无法获取用户ID, IP: {client_ip}")
return jsonify({'code': 400, 'message': '无法获取用户ID', 'data': None}), 400
# 获取action参数
action = request.args.get('action', '').strip()
logger.info(f"[获取用户信息] 用户ID: {user_id}, action: {action}, IP: {client_ip}")
db_manager = get_db_manager()
# 查询用户信息
# 从ai_authors表获取xhs_cookie, xhs_phone, xhs_account, bound_at字段以优化性能
sql = """
SELECT id, enterprise_id, enterprise_name, username, real_name, email, phone,
wechat_openid, wechat_unionid,
is_bound_xhs, department, role, status, created_at, updated_at
FROM ai_users u
WHERE id = %s AND status != 'deleted'
"""
result = db_manager.execute_query(sql, (user_id,))
if not result:
logger.warning(f"[获取用户信息] 用户不存在, ID: {user_id}, IP: {client_ip}")
return jsonify({'code': 404, 'message': '用户不存在', 'data': None}), 404
user_info = result[0]
# 记录操作日志
log_operation(
action='get_user_info',
target_type='user',
target_id=user_id,
description=f"查询用户信息: {user_info.get('username')}, action={action}"
)
return jsonify({
'code': 200,
'message': 'success',
'data': user_info,
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[获取用户信息] 错误: {str(e)}", exc_info=True)
return jsonify({'code': 500, 'message': '服务器内部错误', 'data': None}), 500
@user_bp.route('/add_employees', methods=['POST'])
@require_auth
@require_role('enterprise')
def add_employees():
"""添加员工"""
try:
current_user = AuthUtils.get_current_user()
enterprise_id = current_user.get('enterprise_id')
if not enterprise_id:
return jsonify({
'code': 400,
'message': '无法获取企业ID',
'data': None
}), 400
data = request.get_json()
if not data:
return jsonify({
'code': 400,
'message': '请求参数错误',
'data': None
}), 400
# 验证必需字段
required_fields = ['name', 'phone', 'password', 'role']
for field in required_fields:
if not data.get(field):
return jsonify({
'code': 400,
'message': f'缺少必需字段: {field}',
'data': None
}), 400
# 验证角色
valid_roles = ['editor', 'reviewer', 'publisher', 'each_title_reviewer','employee','teamleader','supervisor']
if data['role'] not in valid_roles:
return jsonify({
'code': 400,
'message': '无效的角色',
'data': None
}), 400
# 检查手机号是否已存在
db_manager = get_db_manager()
check_sql = "SELECT id FROM ai_users WHERE phone = %s AND enterprise_id = %s"
existing = db_manager.execute_query(check_sql, (data['phone'], enterprise_id))
if existing:
return jsonify({
'code': 409,
'message': '手机号已被使用',
'data': None
}), 409
# 创建员工
# xhs_phone, xhs_account, bound_at字段已迁移到ai_authors表不再在ai_users表中存储
sql = """
INSERT INTO ai_users
(enterprise_id, enterprise_name, username, real_name, phone, password, role, department, status, is_bound_xhs)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
hashed_password = AuthUtils.hash_password(data['password'])
# 获取企业名称
ent_sql = "SELECT name FROM ai_enterprises WHERE id = %s"
ent_result = db_manager.execute_query(ent_sql, (enterprise_id,))
enterprise_name = ent_result[0]['name'] if ent_result else ''
user_id = db_manager.execute_insert(sql, (
enterprise_id,
enterprise_name,
data['phone'], # 使用手机号作为username
data['name'], # real_name
data['phone'],
hashed_password,
data['role'],
data.get('department', ''),
'active',
0 # is_bound_xhs
))
# 更新企业员工总数
update_sql = "UPDATE ai_enterprises SET users_total = users_total + 1 WHERE id = %s"
db_manager.execute_update(update_sql, (enterprise_id,))
logger.info(f"添加员工成功: {data['name']}, ID: {user_id}")
return jsonify({
'code': 200,
'message': '添加成功',
'data': {'id': user_id},
'timestamp': int(datetime.now().timestamp() * 1000)
})
except Exception as e:
logger.error(f"[添加员工] 处理请求时发生错误: {str(e)}", exc_info=True)
return jsonify({
'code': 500,
'message': '服务器内部错误',
'data': None
}), 500