""" 检查MinIO中是否有测试文件,并列出所有文件 """ from minio import Minio from datetime import datetime import os # MinIO连接配置 MINIO_CONFIG = { 'endpoint': 'minio.datacubeworld.com:9000', 'access_key': 'JOLXFXny3avFSzB0uRA5', 'secret_key': 'G1BR8jStNfovkfH5ou39EmPl34E4l7dGrnd3Cz0I', 'secure': True } BUCKET_NAME = 'finyx' TENANT_ID = '615873064429507639' def check_test_files(): """检查测试文件是否存在""" print("="*60) print("检查MinIO中的测试文件") print("="*60) try: # 创建MinIO客户端 client = Minio( MINIO_CONFIG['endpoint'], access_key=MINIO_CONFIG['access_key'], secret_key=MINIO_CONFIG['secret_key'], secure=MINIO_CONFIG['secure'] ) # 检查存储桶 if not client.bucket_exists(BUCKET_NAME): print(f"✗ 存储桶 '{BUCKET_NAME}' 不存在") return print(f"\n✓ 存储桶 '{BUCKET_NAME}' 存在") # 查找测试文件(TEST目录下的文件) print(f"\n查找测试文件(路径包含 TEST)...") test_files = [] objects = client.list_objects(BUCKET_NAME, prefix=f"{TENANT_ID}/TEST/", recursive=True) for obj in objects: test_files.append({ 'name': obj.object_name, 'size': obj.size, 'last_modified': obj.last_modified }) if test_files: print(f"\n找到 {len(test_files)} 个测试文件:") for i, file_info in enumerate(test_files, 1): print(f"\n{i}. 文件名称: {file_info['name']}") print(f" 大小: {file_info['size']} 字节") print(f" 修改时间: {file_info['last_modified']}") # 生成访问URL from datetime import timedelta url = client.presigned_get_object( BUCKET_NAME, file_info['name'], expires=timedelta(days=7) ) print(f" 访问URL: {url[:100]}...") else: print("\n⚠ 未找到测试文件") print(" 可能原因:") print(" 1. 文件已被删除") print(" 2. 文件路径不同") print(" 3. 文件在其他位置") # 列出所有与TENANT_ID相关的文件 print(f"\n" + "="*60) print(f"列出所有与租户 {TENANT_ID} 相关的文件") print("="*60) all_files = [] objects = client.list_objects(BUCKET_NAME, prefix=f"{TENANT_ID}/", recursive=True) for obj in objects: all_files.append({ 'name': obj.object_name, 'size': obj.size, 'last_modified': obj.last_modified }) if all_files: print(f"\n找到 {len(all_files)} 个文件:") for i, file_info in enumerate(all_files[:20], 1): # 只显示前20个 print(f"\n{i}. {file_info['name']}") print(f" 大小: {file_info['size']} 字节") print(f" 修改时间: {file_info['last_modified']}") if len(all_files) > 20: print(f"\n... 还有 {len(all_files) - 20} 个文件未显示") else: print(f"\n⚠ 未找到与租户 {TENANT_ID} 相关的文件") # 检查是否有TEMPLATE目录 print(f"\n" + "="*60) print("检查模板文件") print("="*60) template_files = [] objects = client.list_objects(BUCKET_NAME, prefix=f"{TENANT_ID}/TEMPLATE/", recursive=True) for obj in objects: template_files.append({ 'name': obj.object_name, 'size': obj.size, 'last_modified': obj.last_modified }) if template_files: print(f"\n找到 {len(template_files)} 个模板文件:") for i, file_info in enumerate(template_files, 1): print(f"\n{i}. {file_info['name']}") print(f" 大小: {file_info['size']} 字节") print(f" 修改时间: {file_info['last_modified']}") else: print(f"\n⚠ 未找到模板文件") print(f" 模板文件应该在: {TENANT_ID}/TEMPLATE/ 目录下") except Exception as e: print(f"\n✗ 检查失败: {e}") import traceback traceback.print_exc() if __name__ == '__main__': check_test_files()