# -*- coding: utf-8 -*- import json import os import logging from datetime import datetime from PIL import Image import oss2 # 导入统一日志配置 from log_config import setup_article_server_logger # 以下代码展示了图片服务的基本用法。更详细应用请参看官网文档 https://help.aliyun.com/document_detail/32206.html # 首先初始化AccessKeyId、AccessKeySecret、Endpoint等信息。 # 通过环境变量获取,或者把诸如"<你的AccessKeyId>"替换成真实的AccessKeyId等。 # # 以杭州区域为例,Endpoint可以是: # http://oss-cn-hangzhou.aliyuncs.com # https://oss-cn-hangzhou.aliyuncs.com # 分别以HTTP、HTTPS协议访问。 access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', 'LTAI5tNesdhDH4ErqEUZmEg2') access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', 'xZn7WUkTW76TqOLTh01zZATnU6p3Tf') bucket_name = os.getenv('OSS_TEST_BUCKET', 'oss-pai-apou8llecyqfa7c9du-cn-shanghai') endpoint = os.getenv('OSS_TEST_ENDPOINT', 'https://oss-cn-shanghai.aliyuncs.com/') # 确认上面的参数都填写正确了 for param in (access_key_id, access_key_secret, bucket_name, endpoint): assert '<' not in param, '请设置参数:' + param def get_image_info(image_file): """获取本地图片信息 :param str image_file: 本地图片 :return tuple: a 3-tuple(height, width, format). """ im = Image.open(image_file) return im.height, im.width, im.format class SyncImageToOSS: """同步图片到OSS的类""" def __init__(self, access_key_id=None, access_key_secret=None, bucket_name=None, endpoint=None): """初始化OSS配置 Args: access_key_id: OSS访问密钥ID access_key_secret: OSS访问密钥Secret bucket_name: OSS存储桶名称 endpoint: OSS端点地址 """ self.access_key_id = access_key_id or os.getenv('OSS_TEST_ACCESS_KEY_ID', 'LTAI5tNesdhDH4ErqEUZmEg2') self.access_key_secret = access_key_secret or os.getenv('OSS_TEST_ACCESS_KEY_SECRET', 'xZn7WUkTW76TqOLTh01zZATnU6p3Tf') self.bucket_name = bucket_name or os.getenv('OSS_TEST_BUCKET', 'oss-pai-apou8llecyqfa7c9du-cn-shanghai') self.endpoint = endpoint or os.getenv('OSS_TEST_ENDPOINT', 'https://oss-cn-shanghai.aliyuncs.com/') # 创建Bucket对象 self.bucket = oss2.Bucket(oss2.Auth(self.access_key_id, self.access_key_secret), self.endpoint, self.bucket_name) # 设置日志 self.logger = setup_article_server_logger() def TransformerImage(self, image_file_path): """将图片上传到OSS Args: image_file_path (str): 图片文件完整路径,例如:D:\\baijiahao\\tags_images\\Images\\20250825\\1756129305749449.png Returns: dict: 上传结果,包含成功状态和OSS URL """ try: # 检查文件是否存在 if not os.path.exists(image_file_path): self.logger.error(f"[OSS上传] 文件不存在: {image_file_path}") return { 'success': False, 'message': f'文件不存在: {image_file_path}', 'oss_url': None } # 记录业务日志 - 开始处理 self.logger.info(f"[OSS业务] 开始处理图片上传请求 - 文件: {os.path.basename(image_file_path)}") file_size = os.path.getsize(image_file_path) self.logger.info(f"[OSS业务] 文件信息 - 大小: {file_size} bytes, 路径: {image_file_path}") # 从完整路径中提取相对路径作为OSS key # 例如:D:\baijiahao\tags_images\Images\20250825\1756129305749449.png -> Images/20250825/1756129305749449.png if 'Images' in image_file_path: # 找到Images目录的位置 images_index = image_file_path.find('Images') if images_index != -1: # 提取从Images开始的相对路径 relative_path = image_file_path[images_index:] # 将反斜杠替换为正斜杠(OSS使用正斜杠) oss_key = relative_path.replace('\\', '/') else: # 如果找不到Images目录,使用文件名 oss_key = os.path.basename(image_file_path) else: # 如果路径中没有Images,使用文件名 oss_key = os.path.basename(image_file_path) self.logger.info(f"[OSS上传] 开始上传文件: {image_file_path} -> {oss_key}") self.logger.info(f"[OSS业务] OSS配置 - Bucket: {self.bucket_name}, Endpoint: {self.endpoint}") # 上传文件到OSS upload_start_time = datetime.now() result = self.bucket.put_object_from_file(oss_key, image_file_path) upload_end_time = datetime.now() upload_duration = (upload_end_time - upload_start_time).total_seconds() self.logger.info(f"[OSS业务] 上传耗时: {upload_duration:.2f} 秒") # 检查上传结果 if result.status == 200: # 构建OSS访问URL oss_url = f"https://{self.bucket_name}.{self.endpoint.replace('https://', '').replace('http://', '')}/{oss_key}" self.logger.info(f"[OSS上传] 文件上传成功: {oss_key} -> {oss_url}") self.logger.info(f"[OSS业务] 上传成功 - OSS Key: {oss_key}, 文件大小: {file_size} bytes, 耗时: {upload_duration:.2f}s") return { 'success': True, 'message': '上传成功', 'oss_url': oss_url, 'oss_key': oss_key } else: self.logger.error(f"[OSS上传] 上传失败,状态码: {result.status}") self.logger.error(f"[OSS业务] 上传失败 - 文件: {os.path.basename(image_file_path)}, 状态码: {result.status}, OSS Key: {oss_key}") return { 'success': False, 'message': f'上传失败,状态码: {result.status}', 'oss_url': None } except Exception as e: self.logger.error(f"[OSS上传] 上传异常: {str(e)}") self.logger.error(f"[OSS业务] 上传异常 - 文件: {os.path.basename(image_file_path) if 'image_file_path' in locals() else 'unknown'}, 异常信息: {str(e)}") return { 'success': False, 'message': f'上传异常: {str(e)}', 'oss_url': None } # 创建Bucket对象,所有Object相关的接口都可以通过Bucket对象来进行 bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name) # 示例代码(可以注释掉) # key = '1753426518174494.png' # # # 上传示例图片 # bucket.put_object_from_file(key, '1753426518174494.png') # # # 获取图片信息 # result = bucket.get_object(key, process='image/info') # # json_content = result.read() # decoded_json = json.loads(oss2.to_unicode(json_content)) # assert int(decoded_json['ImageHeight']['value']) == 267 # assert int(decoded_json['ImageWidth']['value']) == 400 # assert int(decoded_json['FileSize']['value']) == 21839 # assert decoded_json['Format']['value'] == 'jpg' # print(decoded_json)