Files
ai_wht_B/ver_25121923/oss_image.py

178 lines
7.9 KiB
Python
Raw Permalink Normal View History

2026-01-06 14:18:39 +08:00
# -*- coding: utf-8 -*-
import json
import os
import logging
from datetime import datetime
from PIL import Image
import oss2
# 导入统一日志配置
from log_config import setup_article_server_logger
# 以下代码展示了图片服务的基本用法。更详细应用请参看官网文档 https://help.aliyun.com/document_detail/32206.html
# 首先初始化AccessKeyId、AccessKeySecret、Endpoint等信息。
# 通过环境变量获取,或者把诸如"<你的AccessKeyId>"替换成真实的AccessKeyId等。
#
# 以杭州区域为例Endpoint可以是
# http://oss-cn-hangzhou.aliyuncs.com
# https://oss-cn-hangzhou.aliyuncs.com
# 分别以HTTP、HTTPS协议访问。
#access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', 'LTAI5tNesdhDH4ErqEUZmEg2')
#access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', 'xZn7WUkTW76TqOLTh01zZATnU6p3Tf')
#bucket_name = os.getenv('OSS_TEST_BUCKET', 'oss-pai-apou8llecyqfa7c9du-cn-shanghai')
#endpoint = os.getenv('OSS_TEST_ENDPOINT', 'https://oss-cn-shanghai.aliyuncs.com/')
access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', 'LTAI5tNesdhDH4ErqEUZmEg2')
access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', 'xZn7WUkTW76TqOLTh01zZATnU6p3Tf')
bucket_name = os.getenv('OSS_TEST_BUCKET', 'bxmkb-beijing')
endpoint = os.getenv('OSS_TEST_ENDPOINT', 'https://oss-cn-beijing.aliyuncs.com/')
# 确认上面的参数都填写正确了
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
assert '<' not in param, '请设置参数:' + param
def get_image_info(image_file):
"""获取本地图片信息
:param str image_file: 本地图片
:return tuple: a 3-tuple(height, width, format).
"""
im = Image.open(image_file)
return im.height, im.width, im.format
class SyncImageToOSS:
"""同步图片到OSS的类"""
def __init__(self, access_key_id=None, access_key_secret=None, bucket_name=None, endpoint=None):
"""初始化OSS配置
Args:
access_key_id: OSS访问密钥ID
access_key_secret: OSS访问密钥Secret
bucket_name: OSS存储桶名称
endpoint: OSS端点地址
"""
self.access_key_id = access_key_id or os.getenv('OSS_TEST_ACCESS_KEY_ID', 'LTAI5tNesdhDH4ErqEUZmEg2')
self.access_key_secret = access_key_secret or os.getenv('OSS_TEST_ACCESS_KEY_SECRET', 'xZn7WUkTW76TqOLTh01zZATnU6p3Tf')
#self.bucket_name = bucket_name or os.getenv('OSS_TEST_BUCKET', 'oss-pai-apou8llecyqfa7c9du-cn-shanghai')
#self.endpoint = endpoint or os.getenv('OSS_TEST_ENDPOINT', 'https://oss-cn-shanghai.aliyuncs.com/')
self.bucket_name = bucket_name or os.getenv('OSS_TEST_BUCKET', 'bxmkb-beijing')
self.endpoint = endpoint or os.getenv('OSS_TEST_ENDPOINT', 'https://oss-cn-beijing.aliyuncs.com/')
# 创建Bucket对象
self.bucket = oss2.Bucket(oss2.Auth(self.access_key_id, self.access_key_secret), self.endpoint, self.bucket_name)
# 设置日志
self.logger = setup_article_server_logger()
def TransformerImage(self, image_file_path):
"""将图片上传到OSS
Args:
image_file_path (str): 图片文件完整路径例如D:\\baijiahao\\tags_images\\Images\\20250825\\1756129305749449.png
Returns:
dict: 上传结果包含成功状态和OSS URL
"""
try:
# 检查文件是否存在
if not os.path.exists(image_file_path):
self.logger.error(f"[OSS上传] 文件不存在: {image_file_path}")
return {
'success': False,
'message': f'文件不存在: {image_file_path}',
'oss_url': None
}
# 记录业务日志 - 开始处理
self.logger.info(f"[OSS业务] 开始处理图片上传请求 - 文件: {os.path.basename(image_file_path)}")
file_size = os.path.getsize(image_file_path)
self.logger.info(f"[OSS业务] 文件信息 - 大小: {file_size} bytes, 路径: {image_file_path}")
# 从完整路径中提取相对路径作为OSS key
# 例如D:\baijiahao\tags_images\Images\20250825\1756129305749449.png -> Images/20250825/1756129305749449.png
if 'Images' in image_file_path:
# 找到Images目录的位置
images_index = image_file_path.find('Images')
if images_index != -1:
# 提取从Images开始的相对路径
relative_path = image_file_path[images_index:]
# 将反斜杠替换为正斜杠OSS使用正斜杠
oss_key = relative_path.replace('\\', '/')
else:
# 如果找不到Images目录使用文件名
oss_key = os.path.basename(image_file_path)
else:
# 如果路径中没有Images使用文件名
oss_key = os.path.basename(image_file_path)
self.logger.info(f"[OSS上传] 开始上传文件: {image_file_path} -> {oss_key}")
self.logger.info(f"[OSS业务] OSS配置 - Bucket: {self.bucket_name}, Endpoint: {self.endpoint}")
# 上传文件到OSS
upload_start_time = datetime.now()
result = self.bucket.put_object_from_file(oss_key, image_file_path)
upload_end_time = datetime.now()
upload_duration = (upload_end_time - upload_start_time).total_seconds()
self.logger.info(f"[OSS业务] 上传耗时: {upload_duration:.2f}")
# 检查上传结果
if result.status == 200:
# 构建OSS访问URL
oss_url = f"https://{self.bucket_name}.{self.endpoint.replace('https://', '').replace('http://', '')}/{oss_key}"
self.logger.info(f"[OSS上传] 文件上传成功: {oss_key} -> {oss_url}")
self.logger.info(f"[OSS业务] 上传成功 - OSS Key: {oss_key}, 文件大小: {file_size} bytes, 耗时: {upload_duration:.2f}s")
return {
'success': True,
'message': '上传成功',
'oss_url': oss_url,
'oss_key': oss_key
}
else:
self.logger.error(f"[OSS上传] 上传失败,状态码: {result.status}")
self.logger.error(f"[OSS业务] 上传失败 - 文件: {os.path.basename(image_file_path)}, 状态码: {result.status}, OSS Key: {oss_key}")
return {
'success': False,
'message': f'上传失败,状态码: {result.status}',
'oss_url': None
}
except Exception as e:
self.logger.error(f"[OSS上传] 上传异常: {str(e)}")
self.logger.error(f"[OSS业务] 上传异常 - 文件: {os.path.basename(image_file_path) if 'image_file_path' in locals() else 'unknown'}, 异常信息: {str(e)}")
return {
'success': False,
'message': f'上传异常: {str(e)}',
'oss_url': None
}
# 创建Bucket对象所有Object相关的接口都可以通过Bucket对象来进行
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)
# 示例代码(可以注释掉)
# key = '1753426518174494.png'
#
# # 上传示例图片
# bucket.put_object_from_file(key, '1753426518174494.png')
#
# # 获取图片信息
# result = bucket.get_object(key, process='image/info')
#
# json_content = result.read()
# decoded_json = json.loads(oss2.to_unicode(json_content))
# assert int(decoded_json['ImageHeight']['value']) == 267
# assert int(decoded_json['ImageWidth']['value']) == 400
# assert int(decoded_json['FileSize']['value']) == 21839
# assert decoded_json['Format']['value'] == 'jpg'
# print(decoded_json)