133 lines
4.6 KiB
Python
133 lines
4.6 KiB
Python
"""
|
||
检查MinIO中是否有测试文件,并列出所有文件
|
||
"""
|
||
from minio import Minio
|
||
from datetime import datetime
|
||
import os
|
||
|
||
# MinIO连接配置
|
||
MINIO_CONFIG = {
|
||
'endpoint': 'minio.datacubeworld.com:9000',
|
||
'access_key': 'JOLXFXny3avFSzB0uRA5',
|
||
'secret_key': 'G1BR8jStNfovkfH5ou39EmPl34E4l7dGrnd3Cz0I',
|
||
'secure': True
|
||
}
|
||
|
||
BUCKET_NAME = 'finyx'
|
||
TENANT_ID = '615873064429507639'
|
||
|
||
def check_test_files():
|
||
"""检查测试文件是否存在"""
|
||
print("="*60)
|
||
print("检查MinIO中的测试文件")
|
||
print("="*60)
|
||
|
||
try:
|
||
# 创建MinIO客户端
|
||
client = Minio(
|
||
MINIO_CONFIG['endpoint'],
|
||
access_key=MINIO_CONFIG['access_key'],
|
||
secret_key=MINIO_CONFIG['secret_key'],
|
||
secure=MINIO_CONFIG['secure']
|
||
)
|
||
|
||
# 检查存储桶
|
||
if not client.bucket_exists(BUCKET_NAME):
|
||
print(f"✗ 存储桶 '{BUCKET_NAME}' 不存在")
|
||
return
|
||
|
||
print(f"\n✓ 存储桶 '{BUCKET_NAME}' 存在")
|
||
|
||
# 查找测试文件(TEST目录下的文件)
|
||
print(f"\n查找测试文件(路径包含 TEST)...")
|
||
test_files = []
|
||
|
||
objects = client.list_objects(BUCKET_NAME, prefix=f"{TENANT_ID}/TEST/", recursive=True)
|
||
for obj in objects:
|
||
test_files.append({
|
||
'name': obj.object_name,
|
||
'size': obj.size,
|
||
'last_modified': obj.last_modified
|
||
})
|
||
|
||
if test_files:
|
||
print(f"\n找到 {len(test_files)} 个测试文件:")
|
||
for i, file_info in enumerate(test_files, 1):
|
||
print(f"\n{i}. 文件名称: {file_info['name']}")
|
||
print(f" 大小: {file_info['size']} 字节")
|
||
print(f" 修改时间: {file_info['last_modified']}")
|
||
|
||
# 生成访问URL
|
||
from datetime import timedelta
|
||
url = client.presigned_get_object(
|
||
BUCKET_NAME,
|
||
file_info['name'],
|
||
expires=timedelta(days=7)
|
||
)
|
||
print(f" 访问URL: {url[:100]}...")
|
||
else:
|
||
print("\n⚠ 未找到测试文件")
|
||
print(" 可能原因:")
|
||
print(" 1. 文件已被删除")
|
||
print(" 2. 文件路径不同")
|
||
print(" 3. 文件在其他位置")
|
||
|
||
# 列出所有与TENANT_ID相关的文件
|
||
print(f"\n" + "="*60)
|
||
print(f"列出所有与租户 {TENANT_ID} 相关的文件")
|
||
print("="*60)
|
||
|
||
all_files = []
|
||
objects = client.list_objects(BUCKET_NAME, prefix=f"{TENANT_ID}/", recursive=True)
|
||
for obj in objects:
|
||
all_files.append({
|
||
'name': obj.object_name,
|
||
'size': obj.size,
|
||
'last_modified': obj.last_modified
|
||
})
|
||
|
||
if all_files:
|
||
print(f"\n找到 {len(all_files)} 个文件:")
|
||
for i, file_info in enumerate(all_files[:20], 1): # 只显示前20个
|
||
print(f"\n{i}. {file_info['name']}")
|
||
print(f" 大小: {file_info['size']} 字节")
|
||
print(f" 修改时间: {file_info['last_modified']}")
|
||
|
||
if len(all_files) > 20:
|
||
print(f"\n... 还有 {len(all_files) - 20} 个文件未显示")
|
||
else:
|
||
print(f"\n⚠ 未找到与租户 {TENANT_ID} 相关的文件")
|
||
|
||
# 检查是否有TEMPLATE目录
|
||
print(f"\n" + "="*60)
|
||
print("检查模板文件")
|
||
print("="*60)
|
||
|
||
template_files = []
|
||
objects = client.list_objects(BUCKET_NAME, prefix=f"{TENANT_ID}/TEMPLATE/", recursive=True)
|
||
for obj in objects:
|
||
template_files.append({
|
||
'name': obj.object_name,
|
||
'size': obj.size,
|
||
'last_modified': obj.last_modified
|
||
})
|
||
|
||
if template_files:
|
||
print(f"\n找到 {len(template_files)} 个模板文件:")
|
||
for i, file_info in enumerate(template_files, 1):
|
||
print(f"\n{i}. {file_info['name']}")
|
||
print(f" 大小: {file_info['size']} 字节")
|
||
print(f" 修改时间: {file_info['last_modified']}")
|
||
else:
|
||
print(f"\n⚠ 未找到模板文件")
|
||
print(f" 模板文件应该在: {TENANT_ID}/TEMPLATE/ 目录下")
|
||
|
||
except Exception as e:
|
||
print(f"\n✗ 检查失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
if __name__ == '__main__':
|
||
check_test_files()
|
||
|