204 lines
7.5 KiB
Python
204 lines
7.5 KiB
Python
|
|
"""
|
|||
|
|
阿里云短信服务模块
|
|||
|
|
用于发送手机验证码
|
|||
|
|
"""
|
|||
|
|
import json
|
|||
|
|
import random
|
|||
|
|
import sys
|
|||
|
|
from typing import Dict, Any, Optional
|
|||
|
|
from datetime import datetime, timedelta
|
|||
|
|
|
|||
|
|
from alibabacloud_dysmsapi20170525.client import Client as Dysmsapi20170525Client
|
|||
|
|
from alibabacloud_credentials.client import Client as CredentialClient
|
|||
|
|
from alibabacloud_credentials.models import Config as CredentialConfig
|
|||
|
|
from alibabacloud_tea_openapi import models as open_api_models
|
|||
|
|
from alibabacloud_dysmsapi20170525 import models as dysmsapi_20170525_models
|
|||
|
|
from alibabacloud_tea_util import models as util_models
|
|||
|
|
|
|||
|
|
|
|||
|
|
class AliSmsService:
|
|||
|
|
"""阿里云短信服务"""
|
|||
|
|
|
|||
|
|
def __init__(self, access_key_id: str, access_key_secret: str, sign_name: str, template_code: str):
|
|||
|
|
"""
|
|||
|
|
初始化阿里云短信服务
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
access_key_id: 阿里云AccessKey ID
|
|||
|
|
access_key_secret: 阿里云AccessKey Secret
|
|||
|
|
sign_name: 短信签名
|
|||
|
|
template_code: 短信模板CODE
|
|||
|
|
"""
|
|||
|
|
self.sign_name = sign_name
|
|||
|
|
self.template_code = template_code
|
|||
|
|
|
|||
|
|
# 创建阿里云短信客户端
|
|||
|
|
credential_config = CredentialConfig(
|
|||
|
|
type='access_key',
|
|||
|
|
access_key_id=access_key_id,
|
|||
|
|
access_key_secret=access_key_secret
|
|||
|
|
)
|
|||
|
|
credential = CredentialClient(credential_config)
|
|||
|
|
config = open_api_models.Config(credential=credential)
|
|||
|
|
config.endpoint = 'dysmsapi.aliyuncs.com'
|
|||
|
|
|
|||
|
|
self.client = Dysmsapi20170525Client(config)
|
|||
|
|
|
|||
|
|
# 验证码缓存(简单内存存储,生产环境应使用Redis)
|
|||
|
|
self._code_cache: Dict[str, Dict[str, Any]] = {}
|
|||
|
|
|
|||
|
|
# 验证码配置
|
|||
|
|
self.code_length = 6 # 验证码长度
|
|||
|
|
self.code_expire_minutes = 5 # 验证码过期时间(分钟)
|
|||
|
|
|
|||
|
|
def _generate_code(self) -> str:
|
|||
|
|
"""生成随机验证码"""
|
|||
|
|
return ''.join([str(random.randint(0, 9)) for _ in range(self.code_length)])
|
|||
|
|
|
|||
|
|
async def send_verification_code(self, phone: str) -> Dict[str, Any]:
|
|||
|
|
"""
|
|||
|
|
发送验证码到指定手机号
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
phone: 手机号
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Dict containing success status and error message if any
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
# 生成验证码
|
|||
|
|
code = self._generate_code()
|
|||
|
|
|
|||
|
|
print(f"[短信服务] 正在发送验证码到 {phone},验证码: {code}", file=sys.stderr)
|
|||
|
|
|
|||
|
|
# 构建短信请求
|
|||
|
|
send_sms_request = dysmsapi_20170525_models.SendSmsRequest(
|
|||
|
|
phone_numbers=phone,
|
|||
|
|
sign_name=self.sign_name,
|
|||
|
|
template_code=self.template_code,
|
|||
|
|
template_param=json.dumps({"code": code})
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
runtime = util_models.RuntimeOptions()
|
|||
|
|
|
|||
|
|
# 发送短信
|
|||
|
|
try:
|
|||
|
|
resp = self.client.send_sms_with_options(send_sms_request, runtime)
|
|||
|
|
resp_dict = resp.to_map()
|
|||
|
|
|
|||
|
|
print(f"[短信服务] 阿里云响应: {json.dumps(resp_dict, default=str, indent=2, ensure_ascii=False)}", file=sys.stderr)
|
|||
|
|
|
|||
|
|
# 检查发送结果
|
|||
|
|
if resp_dict.get('body', {}).get('Code') == 'OK':
|
|||
|
|
# 缓存验证码
|
|||
|
|
self._code_cache[phone] = {
|
|||
|
|
'code': code,
|
|||
|
|
'expire_time': datetime.now() + timedelta(minutes=self.code_expire_minutes),
|
|||
|
|
'sent_at': datetime.now()
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
print(f"[短信服务] 验证码发送成功,手机号: {phone}", file=sys.stderr)
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"success": True,
|
|||
|
|
"message": f"验证码已发送,{self.code_expire_minutes}分钟内有效",
|
|||
|
|
"code": code # 开发环境返回验证码,生产环境应移除
|
|||
|
|
}
|
|||
|
|
else:
|
|||
|
|
error_msg = resp_dict.get('body', {}).get('Message', '未知错误')
|
|||
|
|
print(f"[短信服务] 发送失败: {error_msg}", file=sys.stderr)
|
|||
|
|
return {
|
|||
|
|
"success": False,
|
|||
|
|
"error": f"短信发送失败: {error_msg}"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
error_msg = str(e)
|
|||
|
|
print(f"[短信服务] 发送异常: {error_msg}", file=sys.stderr)
|
|||
|
|
|
|||
|
|
# 如果有诊断地址,打印出来
|
|||
|
|
if hasattr(e, 'data') and e.data:
|
|||
|
|
recommend = e.data.get('Recommend')
|
|||
|
|
if recommend:
|
|||
|
|
print(f"[短信服务] 诊断地址: {recommend}", file=sys.stderr)
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"success": False,
|
|||
|
|
"error": f"短信发送异常: {error_msg}"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"[短信服务] 发送验证码失败: {str(e)}", file=sys.stderr)
|
|||
|
|
return {
|
|||
|
|
"success": False,
|
|||
|
|
"error": str(e)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
def verify_code(self, phone: str, code: str) -> Dict[str, Any]:
|
|||
|
|
"""
|
|||
|
|
验证手机号和验证码
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
phone: 手机号
|
|||
|
|
code: 用户输入的验证码
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Dict containing verification result
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
# 检查验证码是否存在
|
|||
|
|
if phone not in self._code_cache:
|
|||
|
|
return {
|
|||
|
|
"success": False,
|
|||
|
|
"error": "验证码未发送或已过期,请重新获取"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
cached_data = self._code_cache[phone]
|
|||
|
|
|
|||
|
|
# 检查是否过期
|
|||
|
|
if datetime.now() > cached_data['expire_time']:
|
|||
|
|
# 删除过期验证码
|
|||
|
|
del self._code_cache[phone]
|
|||
|
|
return {
|
|||
|
|
"success": False,
|
|||
|
|
"error": "验证码已过期,请重新获取"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 验证码匹配
|
|||
|
|
if code == cached_data['code']:
|
|||
|
|
# 验证成功后删除验证码(一次性使用)
|
|||
|
|
del self._code_cache[phone]
|
|||
|
|
|
|||
|
|
print(f"[短信服务] 验证码验证成功,手机号: {phone}", file=sys.stderr)
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"success": True,
|
|||
|
|
"message": "验证码验证成功"
|
|||
|
|
}
|
|||
|
|
else:
|
|||
|
|
return {
|
|||
|
|
"success": False,
|
|||
|
|
"error": "验证码错误,请重新输入"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"[短信服务] 验证码验证失败: {str(e)}", file=sys.stderr)
|
|||
|
|
return {
|
|||
|
|
"success": False,
|
|||
|
|
"error": str(e)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
def cleanup_expired_codes(self):
|
|||
|
|
"""清理过期的验证码"""
|
|||
|
|
current_time = datetime.now()
|
|||
|
|
expired_phones = [
|
|||
|
|
phone for phone, data in self._code_cache.items()
|
|||
|
|
if current_time > data['expire_time']
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
for phone in expired_phones:
|
|||
|
|
del self._code_cache[phone]
|
|||
|
|
|
|||
|
|
if expired_phones:
|
|||
|
|
print(f"[短信服务] 已清理 {len(expired_phones)} 个过期验证码", file=sys.stderr)
|