""" 阿里云短信服务模块 用于发送手机验证码 """ import json import random import sys from typing import Dict, Any, Optional from datetime import datetime, timedelta from alibabacloud_dysmsapi20170525.client import Client as Dysmsapi20170525Client from alibabacloud_credentials.client import Client as CredentialClient from alibabacloud_credentials.models import Config as CredentialConfig from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_dysmsapi20170525 import models as dysmsapi_20170525_models from alibabacloud_tea_util import models as util_models class AliSmsService: """阿里云短信服务""" def __init__(self, access_key_id: str, access_key_secret: str, sign_name: str, template_code: str): """ 初始化阿里云短信服务 Args: access_key_id: 阿里云AccessKey ID access_key_secret: 阿里云AccessKey Secret sign_name: 短信签名 template_code: 短信模板CODE """ self.sign_name = sign_name self.template_code = template_code # 创建阿里云短信客户端 credential_config = CredentialConfig( type='access_key', access_key_id=access_key_id, access_key_secret=access_key_secret ) credential = CredentialClient(credential_config) config = open_api_models.Config(credential=credential) config.endpoint = 'dysmsapi.aliyuncs.com' self.client = Dysmsapi20170525Client(config) # 验证码缓存(简单内存存储,生产环境应使用Redis) self._code_cache: Dict[str, Dict[str, Any]] = {} # 验证码配置 self.code_length = 6 # 验证码长度 self.code_expire_minutes = 5 # 验证码过期时间(分钟) def _generate_code(self) -> str: """生成随机验证码""" return ''.join([str(random.randint(0, 9)) for _ in range(self.code_length)]) async def send_verification_code(self, phone: str) -> Dict[str, Any]: """ 发送验证码到指定手机号 Args: phone: 手机号 Returns: Dict containing success status and error message if any """ try: # 生成验证码 code = self._generate_code() print(f"[短信服务] 正在发送验证码到 {phone},验证码: {code}", file=sys.stderr) # 构建短信请求 send_sms_request = dysmsapi_20170525_models.SendSmsRequest( phone_numbers=phone, sign_name=self.sign_name, template_code=self.template_code, template_param=json.dumps({"code": code}) ) runtime = util_models.RuntimeOptions() # 发送短信 try: resp = self.client.send_sms_with_options(send_sms_request, runtime) resp_dict = resp.to_map() print(f"[短信服务] 阿里云响应: {json.dumps(resp_dict, default=str, indent=2, ensure_ascii=False)}", file=sys.stderr) # 检查发送结果 if resp_dict.get('body', {}).get('Code') == 'OK': # 缓存验证码 self._code_cache[phone] = { 'code': code, 'expire_time': datetime.now() + timedelta(minutes=self.code_expire_minutes), 'sent_at': datetime.now() } print(f"[短信服务] 验证码发送成功,手机号: {phone}", file=sys.stderr) return { "success": True, "message": f"验证码已发送,{self.code_expire_minutes}分钟内有效", "code": code # 开发环境返回验证码,生产环境应移除 } else: error_msg = resp_dict.get('body', {}).get('Message', '未知错误') print(f"[短信服务] 发送失败: {error_msg}", file=sys.stderr) return { "success": False, "error": f"短信发送失败: {error_msg}" } except Exception as e: error_msg = str(e) print(f"[短信服务] 发送异常: {error_msg}", file=sys.stderr) # 如果有诊断地址,打印出来 if hasattr(e, 'data') and e.data: recommend = e.data.get('Recommend') if recommend: print(f"[短信服务] 诊断地址: {recommend}", file=sys.stderr) return { "success": False, "error": f"短信发送异常: {error_msg}" } except Exception as e: print(f"[短信服务] 发送验证码失败: {str(e)}", file=sys.stderr) return { "success": False, "error": str(e) } def verify_code(self, phone: str, code: str) -> Dict[str, Any]: """ 验证手机号和验证码 Args: phone: 手机号 code: 用户输入的验证码 Returns: Dict containing verification result """ try: # 检查验证码是否存在 if phone not in self._code_cache: return { "success": False, "error": "验证码未发送或已过期,请重新获取" } cached_data = self._code_cache[phone] # 检查是否过期 if datetime.now() > cached_data['expire_time']: # 删除过期验证码 del self._code_cache[phone] return { "success": False, "error": "验证码已过期,请重新获取" } # 验证码匹配 if code == cached_data['code']: # 验证成功后删除验证码(一次性使用) del self._code_cache[phone] print(f"[短信服务] 验证码验证成功,手机号: {phone}", file=sys.stderr) return { "success": True, "message": "验证码验证成功" } else: return { "success": False, "error": "验证码错误,请重新输入" } except Exception as e: print(f"[短信服务] 验证码验证失败: {str(e)}", file=sys.stderr) return { "success": False, "error": str(e) } def cleanup_expired_codes(self): """清理过期的验证码""" current_time = datetime.now() expired_phones = [ phone for phone, data in self._code_cache.items() if current_time > data['expire_time'] ] for phone in expired_phones: del self._code_cache[phone] if expired_phones: print(f"[短信服务] 已清理 {len(expired_phones)} 个过期验证码", file=sys.stderr)