""" JWT 工具函数 """ import jwt from datetime import datetime, timedelta from typing import Optional from fastapi import HTTPException, Depends, Header from app.config import get_settings def create_token(user_id: int, openid: str) -> str: """ 创建 JWT Token """ settings = get_settings() expire = datetime.utcnow() + timedelta(hours=settings.jwt_expire_hours) payload = { "user_id": user_id, "openid": openid, "exp": expire, "iat": datetime.utcnow() } token = jwt.encode(payload, settings.jwt_secret_key, algorithm="HS256") return token def verify_token(token: str) -> dict: """ 验证 JWT Token 返回 payload 或抛出异常 """ settings = get_settings() try: payload = jwt.decode(token, settings.jwt_secret_key, algorithms=["HS256"]) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token已过期,请重新登录") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="无效的Token") def get_current_user_id(authorization: Optional[str] = Header(None, alias="Authorization")) -> int: """ 从 Header 中获取并验证 Token,返回 user_id 用作 FastAPI 依赖注入 """ if not authorization: raise HTTPException(status_code=401, detail="未提供身份令牌") # 支持 "Bearer xxx" 格式 token = authorization if authorization.startswith("Bearer "): token = authorization[7:] payload = verify_token(token) return payload.get("user_id") def get_optional_user_id(authorization: Optional[str] = Header(None, alias="Authorization")) -> Optional[int]: """ 可选的用户验证,未提供 Token 时返回 None 用于不强制要求登录的接口 """ if not authorization: return None try: token = authorization if authorization.startswith("Bearer "): token = authorization[7:] payload = verify_token(token) return payload.get("user_id") except HTTPException: return None