52 lines
1.5 KiB
Python
52 lines
1.5 KiB
Python
|
|
"""
|
|||
|
|
测试OSS上传功能
|
|||
|
|
"""
|
|||
|
|
import sys
|
|||
|
|
from oss_utils import OSSUploader
|
|||
|
|
|
|||
|
|
def test_oss_connection():
|
|||
|
|
"""测试OSS连接"""
|
|||
|
|
print("=" * 60)
|
|||
|
|
print("测试阿里云OSS连接")
|
|||
|
|
print("=" * 60)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 创建OSS上传器
|
|||
|
|
uploader = OSSUploader()
|
|||
|
|
|
|||
|
|
print(f"\n✅ OSS配置:")
|
|||
|
|
print(f" Bucket: {uploader.bucket_name}")
|
|||
|
|
print(f" Endpoint: {uploader.endpoint}")
|
|||
|
|
print(f" Access Key ID: {uploader.access_key_id[:8]}...")
|
|||
|
|
|
|||
|
|
# 测试Bucket是否可访问
|
|||
|
|
try:
|
|||
|
|
# 列出bucket中的对象(最多1个)
|
|||
|
|
result = uploader.bucket.list_objects(prefix=uploader.base_path, max_keys=1)
|
|||
|
|
print(f"\n✅ Bucket访问成功!")
|
|||
|
|
print(f" 基础路径: {uploader.base_path}")
|
|||
|
|
|
|||
|
|
if result.object_list:
|
|||
|
|
print(f" 示例文件: {result.object_list[0].key}")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"\n❌ Bucket访问失败: {e}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
print("\n" + "=" * 60)
|
|||
|
|
print("✅ OSS配置测试通过!")
|
|||
|
|
print("=" * 60)
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"\n❌ OSS初始化失败: {e}")
|
|||
|
|
print("\n请检查配置:")
|
|||
|
|
print(" 1. Access Key ID和Secret是否正确")
|
|||
|
|
print(" 2. Bucket名称是否正确")
|
|||
|
|
print(" 3. Endpoint地区是否匹配")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
success = test_oss_connection()
|
|||
|
|
sys.exit(0 if success else 1)
|