362 lines
11 KiB
Python
362 lines
11 KiB
Python
"""
|
||
测试MinIO连接、上传和下载功能
|
||
"""
|
||
import os
|
||
import tempfile
|
||
from minio import Minio
|
||
from minio.error import S3Error
|
||
from datetime import datetime, timedelta
|
||
import requests
|
||
from urllib.parse import urlparse
|
||
|
||
# MinIO连接配置(用户提供的)
|
||
MINIO_CONFIG = {
|
||
'endpoint': 'minio.datacubeworld.com:9000', # 注意:去掉协议前缀和末尾斜杠
|
||
'access_key': 'JOLXFXny3avFSzB0uRA5',
|
||
'secret_key': 'G1BR8jStNfovkfH5ou39EmPl34E4l7dGrnd3Cz0I',
|
||
'secure': True # 使用HTTPS
|
||
}
|
||
|
||
BUCKET_NAME = 'finyx'
|
||
TENANT_ID = '615873064429507639'
|
||
|
||
def print_section(title):
|
||
"""打印章节标题"""
|
||
print("\n" + "="*60)
|
||
print(f" {title}")
|
||
print("="*60)
|
||
|
||
def print_result(success, message):
|
||
"""打印测试结果"""
|
||
status = "✓" if success else "✗"
|
||
print(f"{status} {message}")
|
||
|
||
def test_connection():
|
||
"""测试MinIO连接"""
|
||
print_section("1. 测试MinIO连接")
|
||
|
||
try:
|
||
# 创建MinIO客户端
|
||
client = Minio(
|
||
MINIO_CONFIG['endpoint'],
|
||
access_key=MINIO_CONFIG['access_key'],
|
||
secret_key=MINIO_CONFIG['secret_key'],
|
||
secure=MINIO_CONFIG['secure']
|
||
)
|
||
|
||
# 列出所有存储桶(测试连接)
|
||
buckets = client.list_buckets()
|
||
print_result(True, f"MinIO连接成功!")
|
||
print(f"\n 连接信息:")
|
||
print(f" 端点: {MINIO_CONFIG['endpoint']}")
|
||
print(f" 使用HTTPS: {MINIO_CONFIG['secure']}")
|
||
print(f" 访问密钥: {MINIO_CONFIG['access_key'][:10]}...")
|
||
|
||
print(f"\n 可用存储桶:")
|
||
for bucket in buckets:
|
||
print(f" - {bucket.name} (创建时间: {bucket.creation_date})")
|
||
|
||
# 检查目标存储桶是否存在
|
||
bucket_exists = client.bucket_exists(BUCKET_NAME)
|
||
if bucket_exists:
|
||
print_result(True, f"目标存储桶 '{BUCKET_NAME}' 存在")
|
||
else:
|
||
print_result(False, f"目标存储桶 '{BUCKET_NAME}' 不存在")
|
||
print(f" 建议:需要创建存储桶 '{BUCKET_NAME}'")
|
||
|
||
return client, bucket_exists
|
||
|
||
except Exception as e:
|
||
print_result(False, f"MinIO连接失败: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None, False
|
||
|
||
def create_test_file():
|
||
"""创建测试文件"""
|
||
print_section("2. 创建测试文件")
|
||
|
||
# 创建临时测试文件
|
||
test_content = f"""
|
||
这是一个MinIO连接测试文件
|
||
创建时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
|
||
测试内容: 测试MinIO上传和下载功能
|
||
"""
|
||
|
||
# 创建临时文件
|
||
temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False, encoding='utf-8')
|
||
temp_file.write(test_content)
|
||
temp_file.close()
|
||
|
||
file_size = os.path.getsize(temp_file.name)
|
||
print_result(True, f"测试文件创建成功")
|
||
print(f"\n 文件路径: {temp_file.name}")
|
||
print(f" 文件大小: {file_size} 字节")
|
||
|
||
return temp_file.name
|
||
|
||
def upload_file(client, file_path, bucket_exists):
|
||
"""上传文件到MinIO"""
|
||
print_section("3. 上传文件到MinIO")
|
||
|
||
if not bucket_exists:
|
||
print("⚠ 存储桶不存在,跳过上传测试")
|
||
return None, None
|
||
|
||
try:
|
||
# 生成对象名称(相对路径)
|
||
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
|
||
object_name = f"{TENANT_ID}/TEST/{datetime.now().year}/{datetime.now().month:02d}/test_file_{timestamp}.txt"
|
||
|
||
print(f"\n 上传信息:")
|
||
print(f" 存储桶: {BUCKET_NAME}")
|
||
print(f" 对象名称: {object_name}")
|
||
print(f" 源文件: {file_path}")
|
||
|
||
# 上传文件
|
||
print(f"\n 正在上传...")
|
||
client.fput_object(
|
||
BUCKET_NAME,
|
||
object_name,
|
||
file_path,
|
||
content_type='text/plain; charset=utf-8'
|
||
)
|
||
|
||
print_result(True, "文件上传成功!")
|
||
|
||
# 生成相对路径(用于数据库存储)
|
||
relative_path = f"/{object_name}"
|
||
print(f"\n 相对路径(数据库存储): {relative_path}")
|
||
|
||
return object_name, relative_path
|
||
|
||
except S3Error as e:
|
||
print_result(False, f"MinIO上传错误: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None, None
|
||
except Exception as e:
|
||
print_result(False, f"上传失败: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None, None
|
||
|
||
def generate_access_url(client, object_name):
|
||
"""生成可访问的URL"""
|
||
print_section("4. 生成访问URL")
|
||
|
||
if not object_name:
|
||
print("⚠ 没有上传文件,跳过URL生成")
|
||
return None
|
||
|
||
try:
|
||
# 生成预签名URL(7天有效期)
|
||
# MinIO库需要使用timedelta对象
|
||
expires = timedelta(days=7)
|
||
url = client.presigned_get_object(
|
||
BUCKET_NAME,
|
||
object_name,
|
||
expires=expires
|
||
)
|
||
|
||
print_result(True, "URL生成成功")
|
||
print(f"\n 预签名URL(7天有效):")
|
||
print(f" {url}")
|
||
|
||
# 生成公共URL(如果存储桶是公共的)
|
||
protocol = "https" if MINIO_CONFIG['secure'] else "http"
|
||
public_url = f"{protocol}://{MINIO_CONFIG['endpoint']}/{BUCKET_NAME}/{object_name}"
|
||
|
||
print(f"\n 公共URL(如果存储桶是公共的):")
|
||
print(f" {public_url}")
|
||
|
||
return url
|
||
|
||
except Exception as e:
|
||
print_result(False, f"URL生成失败: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None
|
||
|
||
def test_download(url):
|
||
"""测试下载URL"""
|
||
print_section("5. 测试URL下载")
|
||
|
||
if not url:
|
||
print("⚠ 没有URL,跳过下载测试")
|
||
return False
|
||
|
||
try:
|
||
print(f" 正在测试下载...")
|
||
print(f" URL: {url[:80]}...")
|
||
|
||
response = requests.get(url, timeout=10)
|
||
|
||
if response.status_code == 200:
|
||
print_result(True, f"下载成功!状态码: {response.status_code}")
|
||
print(f"\n 响应信息:")
|
||
print(f" 内容长度: {len(response.content)} 字节")
|
||
print(f" 内容类型: {response.headers.get('Content-Type', 'N/A')}")
|
||
|
||
# 显示内容预览
|
||
try:
|
||
content_preview = response.text[:200]
|
||
print(f"\n 内容预览:")
|
||
print(f" {content_preview}...")
|
||
except:
|
||
print(f" (无法显示文本预览)")
|
||
|
||
return True
|
||
else:
|
||
print_result(False, f"下载失败!状态码: {response.status_code}")
|
||
print(f" 响应内容: {response.text[:200]}")
|
||
return False
|
||
|
||
except requests.exceptions.Timeout:
|
||
print_result(False, "下载超时")
|
||
return False
|
||
except requests.exceptions.SSLError as e:
|
||
print_result(False, f"SSL错误: {str(e)}")
|
||
print(" 提示:可能需要验证SSL证书")
|
||
return False
|
||
except Exception as e:
|
||
print_result(False, f"下载失败: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
def test_list_objects(client):
|
||
"""测试列出对象"""
|
||
print_section("6. 测试列出存储桶中的对象")
|
||
|
||
if not client:
|
||
print("⚠ 没有客户端连接,跳过列表测试")
|
||
return
|
||
|
||
try:
|
||
# 检查存储桶是否存在
|
||
if not client.bucket_exists(BUCKET_NAME):
|
||
print(f"⚠ 存储桶 '{BUCKET_NAME}' 不存在")
|
||
return
|
||
|
||
print(f"\n 列出存储桶 '{BUCKET_NAME}' 中的对象(最多10个):")
|
||
|
||
objects = client.list_objects(BUCKET_NAME, recursive=True)
|
||
count = 0
|
||
|
||
for obj in objects:
|
||
count += 1
|
||
if count <= 10:
|
||
print(f" {count}. {obj.object_name} ({obj.size} 字节, 修改时间: {obj.last_modified})")
|
||
else:
|
||
break
|
||
|
||
if count == 0:
|
||
print(" (存储桶为空)")
|
||
elif count > 10:
|
||
print(f" ... 还有更多对象(总共可能超过10个)")
|
||
|
||
print_result(True, f"成功列出对象(显示前10个)")
|
||
|
||
except Exception as e:
|
||
print_result(False, f"列出对象失败: {str(e)}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
def cleanup_test_file(local_file_path):
|
||
"""清理测试文件"""
|
||
try:
|
||
if local_file_path and os.path.exists(local_file_path):
|
||
os.unlink(local_file_path)
|
||
print(f"\n✓ 已清理测试文件: {local_file_path}")
|
||
except Exception as e:
|
||
print(f"\n⚠ 清理测试文件失败: {e}")
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print("\n" + "="*60)
|
||
print(" MinIO连接测试")
|
||
print("="*60)
|
||
print(f"\n配置信息:")
|
||
print(f" 端点: {MINIO_CONFIG['endpoint']}")
|
||
print(f" 使用HTTPS: {MINIO_CONFIG['secure']}")
|
||
print(f" 存储桶: {BUCKET_NAME}")
|
||
print(f" 访问密钥: {MINIO_CONFIG['access_key']}")
|
||
|
||
test_file_path = None
|
||
object_name = None
|
||
|
||
try:
|
||
# 1. 测试连接
|
||
client, bucket_exists = test_connection()
|
||
|
||
if not client:
|
||
print("\n✗ 连接失败,无法继续测试")
|
||
return
|
||
|
||
# 2. 列出对象
|
||
test_list_objects(client)
|
||
|
||
# 3. 创建测试文件
|
||
test_file_path = create_test_file()
|
||
|
||
# 4. 上传文件
|
||
object_name, relative_path = upload_file(client, test_file_path, bucket_exists)
|
||
|
||
# 5. 生成访问URL
|
||
access_url = generate_access_url(client, object_name)
|
||
|
||
# 6. 测试下载
|
||
download_success = test_download(access_url)
|
||
|
||
# 总结
|
||
print_section("测试总结")
|
||
|
||
results = {
|
||
'连接': client is not None,
|
||
'存储桶存在': bucket_exists,
|
||
'文件上传': object_name is not None,
|
||
'URL生成': access_url is not None,
|
||
'URL下载': download_success
|
||
}
|
||
|
||
print("\n测试结果:")
|
||
for test_name, success in results.items():
|
||
status = "✓ 通过" if success else "✗ 失败"
|
||
print(f" {test_name}: {status}")
|
||
|
||
passed = sum(1 for v in results.values() if v)
|
||
total = len(results)
|
||
|
||
print(f"\n通过率: {passed}/{total} ({passed*100//total if total > 0 else 0}%)")
|
||
|
||
if passed == total:
|
||
print("\n✓ 所有测试通过!MinIO配置正确,可以正常使用。")
|
||
else:
|
||
print("\n⚠ 部分测试失败,请检查配置和网络连接。")
|
||
|
||
# 显示使用建议
|
||
if object_name:
|
||
print("\n" + "="*60)
|
||
print(" 使用建议")
|
||
print("="*60)
|
||
print(f"\n上传的文件路径(用于数据库存储):")
|
||
print(f" {relative_path}")
|
||
print(f"\n访问URL(预签名,7天有效):")
|
||
print(f" {access_url}")
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n\n测试已中断")
|
||
except Exception as e:
|
||
print(f"\n✗ 测试过程中发生错误: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
finally:
|
||
# 清理测试文件
|
||
if test_file_path:
|
||
cleanup_test_file(test_file_path)
|
||
|
||
if __name__ == '__main__':
|
||
main()
|
||
|