Files
ai_wht_wechat/backend/oss_utils.py

158 lines
5.1 KiB
Python
Raw Normal View History

2026-01-06 19:36:42 +08:00
"""
阿里云OSS工具类
用于Python脚本中上传/下载文件到OSS
"""
import os
import oss2
from datetime import datetime
from typing import Optional
class OSSUploader:
"""OSS上传工具"""
def __init__(
self,
access_key_id: Optional[str] = None,
access_key_secret: Optional[str] = None,
bucket_name: Optional[str] = None,
endpoint: Optional[str] = None
):
"""
初始化OSS客户端
Args:
access_key_id: AccessKey ID可选默认从环境变量读取
access_key_secret: AccessKey Secret可选默认从环境变量读取
bucket_name: Bucket名称可选默认从环境变量读取
endpoint: OSS访问域名可选默认从环境变量读取
"""
# 使用提供的值或从环境变量读取
self.access_key_id = access_key_id or os.getenv('OSS_TEST_ACCESS_KEY_ID', 'LTAI5tNesdhDH4ErqEUZmEg2')
self.access_key_secret = access_key_secret or os.getenv('OSS_TEST_ACCESS_KEY_SECRET', 'xZn7WUkTW76TqOLTh01zZATnU6p3Tf')
self.bucket_name = bucket_name or os.getenv('OSS_TEST_BUCKET', 'bxmkb-beijing')
self.endpoint = endpoint or os.getenv('OSS_TEST_ENDPOINT', 'https://oss-cn-beijing.aliyuncs.com/')
# 移除endpoint中的协议前缀oss2库不需要https://
self.endpoint = self.endpoint.replace('https://', '').replace('http://', '')
# 创建认证对象
self.auth = oss2.Auth(self.access_key_id, self.access_key_secret)
# 创建Bucket对象
self.bucket = oss2.Bucket(self.auth, self.endpoint, self.bucket_name)
# 基础路径
self.base_path = "wht/"
def upload_file(self, local_file_path: str, object_name: Optional[str] = None) -> str:
"""
上传文件到OSS
Args:
local_file_path: 本地文件路径
object_name: OSS对象名称可选默认自动生成
Returns:
OSS文件的完整URL
"""
# 如果未指定对象名称,自动生成
if object_name is None:
# 生成格式: wht/YYYYMMDD/timestamp_filename.ext
now = datetime.now()
date_dir = now.strftime("%Y%m%d")
timestamp = int(now.timestamp())
filename = os.path.basename(local_file_path)
object_name = f"{self.base_path}{date_dir}/{timestamp}_{filename}"
# 上传文件
self.bucket.put_object_from_file(object_name, local_file_path)
# 生成访问URL
url = f"https://{self.bucket_name}.{self.endpoint}/{object_name}"
return url
def upload_bytes(self, data: bytes, filename: str) -> str:
"""
上传字节数据到OSS
Args:
data: 文件字节数据
filename: 文件名用于生成扩展名
Returns:
OSS文件的完整URL
"""
# 生成对象名称
now = datetime.now()
date_dir = now.strftime("%Y%m%d")
timestamp = int(now.timestamp())
# 获取扩展名
ext = os.path.splitext(filename)[1] or '.jpg'
object_name = f"{self.base_path}{date_dir}/{timestamp}_{filename}"
# 上传数据
self.bucket.put_object(object_name, data)
# 生成访问URL
url = f"https://{self.bucket_name}.{self.endpoint}/{object_name}"
return url
def delete_file(self, file_url: str) -> bool:
"""
从OSS删除文件
Args:
file_url: OSS文件的完整URL
Returns:
是否删除成功
"""
try:
# 从URL中提取对象名称
# 格式: https://bucket.endpoint/path/file.jpg
prefix = f"https://{self.bucket_name}.{self.endpoint}/"
if file_url.startswith(prefix):
object_name = file_url[len(prefix):]
self.bucket.delete_object(object_name)
return True
else:
return False
except Exception as e:
print(f"删除OSS文件失败: {e}")
return False
def file_exists(self, file_url: str) -> bool:
"""
检查OSS文件是否存在
Args:
file_url: OSS文件的完整URL
Returns:
文件是否存在
"""
try:
prefix = f"https://{self.bucket_name}.{self.endpoint}/"
if file_url.startswith(prefix):
object_name = file_url[len(prefix):]
return self.bucket.object_exists(object_name)
else:
return False
except Exception:
return False
# 创建默认实例(使用环境变量配置)
default_uploader = None
def get_oss_uploader() -> OSSUploader:
"""获取默认的OSS上传器实例"""
global default_uploader
if default_uploader is None:
default_uploader = OSSUploader()
return default_uploader