147 lines
4.6 KiB
Python
147 lines
4.6 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
认证工具模块
|
|
提供JWT token生成、验证和权限装饰器
|
|
"""
|
|
|
|
import jwt
|
|
import hashlib
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from functools import wraps
|
|
from flask import request, jsonify, g
|
|
from database_config import get_db_manager
|
|
|
|
logger = logging.getLogger('article_server')
|
|
|
|
# JWT配置
|
|
JWT_SECRET_KEY = 'wht_secret_key_2025' # 生产环境应使用环境变量
|
|
JWT_ALGORITHM = 'HS256'
|
|
JWT_EXPIRATION_HOURS = 24 # token有效期24小时
|
|
|
|
class AuthUtils:
|
|
"""认证工具类"""
|
|
|
|
@staticmethod
|
|
def hash_password(password):
|
|
"""密码加密"""
|
|
return hashlib.sha256(password.encode('utf-8')).hexdigest()
|
|
|
|
@staticmethod
|
|
def verify_password(password, hashed_password):
|
|
"""验证密码"""
|
|
return AuthUtils.hash_password(password) == hashed_password
|
|
|
|
@staticmethod
|
|
def generate_token(user_id, username, role, enterprise_id=None):
|
|
"""生成JWT token"""
|
|
try:
|
|
payload = {
|
|
'user_id': user_id,
|
|
'username': username,
|
|
'role': role,
|
|
'enterprise_id': enterprise_id,
|
|
'exp': datetime.utcnow() + timedelta(hours=JWT_EXPIRATION_HOURS),
|
|
'iat': datetime.utcnow()
|
|
}
|
|
token = jwt.encode(payload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
|
|
logger.info(f"[Token生成] 用户: {username}, ID: {user_id}, 角色: {role}")
|
|
return token
|
|
except Exception as e:
|
|
logger.error(f"[Token生成失败] {e}")
|
|
raise
|
|
|
|
@staticmethod
|
|
def verify_token(token):
|
|
"""验证JWT token"""
|
|
try:
|
|
payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM])
|
|
logger.info(f"[Token验证成功] 用户ID: {payload.get('user_id')}")
|
|
return payload
|
|
except jwt.ExpiredSignatureError:
|
|
logger.warning("[Token验证失败] Token已过期")
|
|
return None
|
|
except jwt.InvalidTokenError as e:
|
|
logger.warning(f"[Token验证失败] 无效的Token: {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"[Token验证异常] {e}")
|
|
return None
|
|
|
|
@staticmethod
|
|
def get_current_user():
|
|
"""获取当前登录用户信息"""
|
|
return getattr(g, 'current_user', None)
|
|
|
|
def require_auth(f):
|
|
"""需要认证的装饰器"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
# 获取token
|
|
auth_header = request.headers.get('Authorization')
|
|
if not auth_header:
|
|
logger.warning("[认证失败] 缺少Authorization header")
|
|
return jsonify({
|
|
'code': 401,
|
|
'message': '未提供认证token',
|
|
'data': None
|
|
}), 401
|
|
|
|
# 验证Bearer格式
|
|
parts = auth_header.split()
|
|
if len(parts) != 2 or parts[0].lower() != 'bearer':
|
|
logger.warning("[认证失败] Authorization header格式错误")
|
|
return jsonify({
|
|
'code': 401,
|
|
'message': 'Token格式错误',
|
|
'data': None
|
|
}), 401
|
|
|
|
token = parts[1]
|
|
|
|
# 验证token
|
|
payload = AuthUtils.verify_token(token)
|
|
if not payload:
|
|
return jsonify({
|
|
'code': 401,
|
|
'message': 'Token无效或已过期',
|
|
'data': None
|
|
}), 401
|
|
|
|
# 将用户信息存储到g对象
|
|
g.current_user = payload
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
def require_role(*allowed_roles):
|
|
"""需要特定角色的装饰器"""
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
current_user = AuthUtils.get_current_user()
|
|
if not current_user:
|
|
logger.warning("[权限检查失败] 未找到当前用户")
|
|
return jsonify({
|
|
'code': 401,
|
|
'message': '未登录',
|
|
'data': None
|
|
}), 401
|
|
|
|
user_role = current_user.get('role')
|
|
if user_role not in allowed_roles:
|
|
logger.warning(f"[权限检查失败] 用户角色 {user_role} 不在允许的角色 {allowed_roles} 中")
|
|
return jsonify({
|
|
'code': 403,
|
|
'message': '权限不足',
|
|
'data': None
|
|
}), 403
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
return decorator
|