""" 阿里云OSS工具类 用于Python脚本中上传/下载文件到OSS """ import os import oss2 from datetime import datetime from typing import Optional class OSSUploader: """OSS上传工具""" def __init__( self, access_key_id: Optional[str] = None, access_key_secret: Optional[str] = None, bucket_name: Optional[str] = None, endpoint: Optional[str] = None ): """ 初始化OSS客户端 Args: access_key_id: AccessKey ID(可选,默认从环境变量读取) access_key_secret: AccessKey Secret(可选,默认从环境变量读取) bucket_name: Bucket名称(可选,默认从环境变量读取) endpoint: OSS访问域名(可选,默认从环境变量读取) """ # 使用提供的值或从环境变量读取 self.access_key_id = access_key_id or os.getenv('OSS_TEST_ACCESS_KEY_ID', 'LTAI5tNesdhDH4ErqEUZmEg2') self.access_key_secret = access_key_secret or os.getenv('OSS_TEST_ACCESS_KEY_SECRET', 'xZn7WUkTW76TqOLTh01zZATnU6p3Tf') self.bucket_name = bucket_name or os.getenv('OSS_TEST_BUCKET', 'bxmkb-beijing') self.endpoint = endpoint or os.getenv('OSS_TEST_ENDPOINT', 'https://oss-cn-beijing.aliyuncs.com/') # 移除endpoint中的协议前缀(oss2库不需要https://) self.endpoint = self.endpoint.replace('https://', '').replace('http://', '') # 创建认证对象 self.auth = oss2.Auth(self.access_key_id, self.access_key_secret) # 创建Bucket对象 self.bucket = oss2.Bucket(self.auth, self.endpoint, self.bucket_name) # 基础路径 self.base_path = "wht/" def upload_file(self, local_file_path: str, object_name: Optional[str] = None) -> str: """ 上传文件到OSS Args: local_file_path: 本地文件路径 object_name: OSS对象名称(可选,默认自动生成) Returns: OSS文件的完整URL """ # 如果未指定对象名称,自动生成 if object_name is None: # 生成格式: wht/YYYYMMDD/timestamp_filename.ext now = datetime.now() date_dir = now.strftime("%Y%m%d") timestamp = int(now.timestamp()) filename = os.path.basename(local_file_path) object_name = f"{self.base_path}{date_dir}/{timestamp}_{filename}" # 上传文件 self.bucket.put_object_from_file(object_name, local_file_path) # 生成访问URL url = f"https://{self.bucket_name}.{self.endpoint}/{object_name}" return url def upload_bytes(self, data: bytes, filename: str) -> str: """ 上传字节数据到OSS Args: data: 文件字节数据 filename: 文件名(用于生成扩展名) Returns: OSS文件的完整URL """ # 生成对象名称 now = datetime.now() date_dir = now.strftime("%Y%m%d") timestamp = int(now.timestamp()) # 获取扩展名 ext = os.path.splitext(filename)[1] or '.jpg' object_name = f"{self.base_path}{date_dir}/{timestamp}_{filename}" # 上传数据 self.bucket.put_object(object_name, data) # 生成访问URL url = f"https://{self.bucket_name}.{self.endpoint}/{object_name}" return url def delete_file(self, file_url: str) -> bool: """ 从OSS删除文件 Args: file_url: OSS文件的完整URL Returns: 是否删除成功 """ try: # 从URL中提取对象名称 # 格式: https://bucket.endpoint/path/file.jpg prefix = f"https://{self.bucket_name}.{self.endpoint}/" if file_url.startswith(prefix): object_name = file_url[len(prefix):] self.bucket.delete_object(object_name) return True else: return False except Exception as e: print(f"删除OSS文件失败: {e}") return False def file_exists(self, file_url: str) -> bool: """ 检查OSS文件是否存在 Args: file_url: OSS文件的完整URL Returns: 文件是否存在 """ try: prefix = f"https://{self.bucket_name}.{self.endpoint}/" if file_url.startswith(prefix): object_name = file_url[len(prefix):] return self.bucket.object_exists(object_name) else: return False except Exception: return False # 创建默认实例(使用环境变量配置) default_uploader = None def get_oss_uploader() -> OSSUploader: """获取默认的OSS上传器实例""" global default_uploader if default_uploader is None: default_uploader = OSSUploader() return default_uploader