ai-business-write/test_scripts/test_minio_connection.py

362 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
测试MinIO连接、上传和下载功能
"""
import os
import tempfile
from minio import Minio
from minio.error import S3Error
from datetime import datetime, timedelta
import requests
from urllib.parse import urlparse
# MinIO连接配置用户提供的
MINIO_CONFIG = {
'endpoint': 'minio.datacubeworld.com:9000', # 注意:去掉协议前缀和末尾斜杠
'access_key': 'JOLXFXny3avFSzB0uRA5',
'secret_key': 'G1BR8jStNfovkfH5ou39EmPl34E4l7dGrnd3Cz0I',
'secure': True # 使用HTTPS
}
BUCKET_NAME = 'finyx'
TENANT_ID = '615873064429507639'
def print_section(title):
"""打印章节标题"""
print("\n" + "="*60)
print(f" {title}")
print("="*60)
def print_result(success, message):
"""打印测试结果"""
status = "" if success else ""
print(f"{status} {message}")
def test_connection():
"""测试MinIO连接"""
print_section("1. 测试MinIO连接")
try:
# 创建MinIO客户端
client = Minio(
MINIO_CONFIG['endpoint'],
access_key=MINIO_CONFIG['access_key'],
secret_key=MINIO_CONFIG['secret_key'],
secure=MINIO_CONFIG['secure']
)
# 列出所有存储桶(测试连接)
buckets = client.list_buckets()
print_result(True, f"MinIO连接成功")
print(f"\n 连接信息:")
print(f" 端点: {MINIO_CONFIG['endpoint']}")
print(f" 使用HTTPS: {MINIO_CONFIG['secure']}")
print(f" 访问密钥: {MINIO_CONFIG['access_key'][:10]}...")
print(f"\n 可用存储桶:")
for bucket in buckets:
print(f" - {bucket.name} (创建时间: {bucket.creation_date})")
# 检查目标存储桶是否存在
bucket_exists = client.bucket_exists(BUCKET_NAME)
if bucket_exists:
print_result(True, f"目标存储桶 '{BUCKET_NAME}' 存在")
else:
print_result(False, f"目标存储桶 '{BUCKET_NAME}' 不存在")
print(f" 建议:需要创建存储桶 '{BUCKET_NAME}'")
return client, bucket_exists
except Exception as e:
print_result(False, f"MinIO连接失败: {str(e)}")
import traceback
traceback.print_exc()
return None, False
def create_test_file():
"""创建测试文件"""
print_section("2. 创建测试文件")
# 创建临时测试文件
test_content = f"""
这是一个MinIO连接测试文件
创建时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
测试内容: 测试MinIO上传和下载功能
"""
# 创建临时文件
temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False, encoding='utf-8')
temp_file.write(test_content)
temp_file.close()
file_size = os.path.getsize(temp_file.name)
print_result(True, f"测试文件创建成功")
print(f"\n 文件路径: {temp_file.name}")
print(f" 文件大小: {file_size} 字节")
return temp_file.name
def upload_file(client, file_path, bucket_exists):
"""上传文件到MinIO"""
print_section("3. 上传文件到MinIO")
if not bucket_exists:
print("⚠ 存储桶不存在,跳过上传测试")
return None, None
try:
# 生成对象名称(相对路径)
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
object_name = f"{TENANT_ID}/TEST/{datetime.now().year}/{datetime.now().month:02d}/test_file_{timestamp}.txt"
print(f"\n 上传信息:")
print(f" 存储桶: {BUCKET_NAME}")
print(f" 对象名称: {object_name}")
print(f" 源文件: {file_path}")
# 上传文件
print(f"\n 正在上传...")
client.fput_object(
BUCKET_NAME,
object_name,
file_path,
content_type='text/plain; charset=utf-8'
)
print_result(True, "文件上传成功!")
# 生成相对路径(用于数据库存储)
relative_path = f"/{object_name}"
print(f"\n 相对路径(数据库存储): {relative_path}")
return object_name, relative_path
except S3Error as e:
print_result(False, f"MinIO上传错误: {str(e)}")
import traceback
traceback.print_exc()
return None, None
except Exception as e:
print_result(False, f"上传失败: {str(e)}")
import traceback
traceback.print_exc()
return None, None
def generate_access_url(client, object_name):
"""生成可访问的URL"""
print_section("4. 生成访问URL")
if not object_name:
print("⚠ 没有上传文件跳过URL生成")
return None
try:
# 生成预签名URL7天有效期
# MinIO库需要使用timedelta对象
expires = timedelta(days=7)
url = client.presigned_get_object(
BUCKET_NAME,
object_name,
expires=expires
)
print_result(True, "URL生成成功")
print(f"\n 预签名URL7天有效:")
print(f" {url}")
# 生成公共URL如果存储桶是公共的
protocol = "https" if MINIO_CONFIG['secure'] else "http"
public_url = f"{protocol}://{MINIO_CONFIG['endpoint']}/{BUCKET_NAME}/{object_name}"
print(f"\n 公共URL如果存储桶是公共的:")
print(f" {public_url}")
return url
except Exception as e:
print_result(False, f"URL生成失败: {str(e)}")
import traceback
traceback.print_exc()
return None
def test_download(url):
"""测试下载URL"""
print_section("5. 测试URL下载")
if not url:
print("⚠ 没有URL跳过下载测试")
return False
try:
print(f" 正在测试下载...")
print(f" URL: {url[:80]}...")
response = requests.get(url, timeout=10)
if response.status_code == 200:
print_result(True, f"下载成功!状态码: {response.status_code}")
print(f"\n 响应信息:")
print(f" 内容长度: {len(response.content)} 字节")
print(f" 内容类型: {response.headers.get('Content-Type', 'N/A')}")
# 显示内容预览
try:
content_preview = response.text[:200]
print(f"\n 内容预览:")
print(f" {content_preview}...")
except:
print(f" (无法显示文本预览)")
return True
else:
print_result(False, f"下载失败!状态码: {response.status_code}")
print(f" 响应内容: {response.text[:200]}")
return False
except requests.exceptions.Timeout:
print_result(False, "下载超时")
return False
except requests.exceptions.SSLError as e:
print_result(False, f"SSL错误: {str(e)}")
print(" 提示可能需要验证SSL证书")
return False
except Exception as e:
print_result(False, f"下载失败: {str(e)}")
import traceback
traceback.print_exc()
return False
def test_list_objects(client):
"""测试列出对象"""
print_section("6. 测试列出存储桶中的对象")
if not client:
print("⚠ 没有客户端连接,跳过列表测试")
return
try:
# 检查存储桶是否存在
if not client.bucket_exists(BUCKET_NAME):
print(f"⚠ 存储桶 '{BUCKET_NAME}' 不存在")
return
print(f"\n 列出存储桶 '{BUCKET_NAME}' 中的对象最多10个:")
objects = client.list_objects(BUCKET_NAME, recursive=True)
count = 0
for obj in objects:
count += 1
if count <= 10:
print(f" {count}. {obj.object_name} ({obj.size} 字节, 修改时间: {obj.last_modified})")
else:
break
if count == 0:
print(" (存储桶为空)")
elif count > 10:
print(f" ... 还有更多对象总共可能超过10个")
print_result(True, f"成功列出对象显示前10个")
except Exception as e:
print_result(False, f"列出对象失败: {str(e)}")
import traceback
traceback.print_exc()
def cleanup_test_file(local_file_path):
"""清理测试文件"""
try:
if local_file_path and os.path.exists(local_file_path):
os.unlink(local_file_path)
print(f"\n✓ 已清理测试文件: {local_file_path}")
except Exception as e:
print(f"\n⚠ 清理测试文件失败: {e}")
def main():
"""主函数"""
print("\n" + "="*60)
print(" MinIO连接测试")
print("="*60)
print(f"\n配置信息:")
print(f" 端点: {MINIO_CONFIG['endpoint']}")
print(f" 使用HTTPS: {MINIO_CONFIG['secure']}")
print(f" 存储桶: {BUCKET_NAME}")
print(f" 访问密钥: {MINIO_CONFIG['access_key']}")
test_file_path = None
object_name = None
try:
# 1. 测试连接
client, bucket_exists = test_connection()
if not client:
print("\n✗ 连接失败,无法继续测试")
return
# 2. 列出对象
test_list_objects(client)
# 3. 创建测试文件
test_file_path = create_test_file()
# 4. 上传文件
object_name, relative_path = upload_file(client, test_file_path, bucket_exists)
# 5. 生成访问URL
access_url = generate_access_url(client, object_name)
# 6. 测试下载
download_success = test_download(access_url)
# 总结
print_section("测试总结")
results = {
'连接': client is not None,
'存储桶存在': bucket_exists,
'文件上传': object_name is not None,
'URL生成': access_url is not None,
'URL下载': download_success
}
print("\n测试结果:")
for test_name, success in results.items():
status = "✓ 通过" if success else "✗ 失败"
print(f" {test_name}: {status}")
passed = sum(1 for v in results.values() if v)
total = len(results)
print(f"\n通过率: {passed}/{total} ({passed*100//total if total > 0 else 0}%)")
if passed == total:
print("\n✓ 所有测试通过MinIO配置正确可以正常使用。")
else:
print("\n⚠ 部分测试失败,请检查配置和网络连接。")
# 显示使用建议
if object_name:
print("\n" + "="*60)
print(" 使用建议")
print("="*60)
print(f"\n上传的文件路径(用于数据库存储):")
print(f" {relative_path}")
print(f"\n访问URL预签名7天有效:")
print(f" {access_url}")
except KeyboardInterrupt:
print("\n\n测试已中断")
except Exception as e:
print(f"\n✗ 测试过程中发生错误: {e}")
import traceback
traceback.print_exc()
finally:
# 清理测试文件
if test_file_path:
cleanup_test_file(test_file_path)
if __name__ == '__main__':
main()