118 lines
3.8 KiB
Python
118 lines
3.8 KiB
Python
"""
|
||
获取模板文件的下载链接
|
||
"""
|
||
from minio import Minio
|
||
from datetime import timedelta
|
||
import requests
|
||
|
||
# MinIO连接配置
|
||
MINIO_CONFIG = {
|
||
'endpoint': 'minio.datacubeworld.com:9000',
|
||
'access_key': 'JOLXFXny3avFSzB0uRA5',
|
||
'secret_key': 'G1BR8jStNfovkfH5ou39EmPl34E4l7dGrnd3Cz0I',
|
||
'secure': True
|
||
}
|
||
|
||
BUCKET_NAME = 'finyx'
|
||
OBJECT_NAME = '615873064429507639/TEMPLATE/2025/12/初步核实审批表模板.docx'
|
||
|
||
def get_template_url():
|
||
"""生成模板文件的下载链接"""
|
||
print("="*60)
|
||
print("获取模板文件下载链接")
|
||
print("="*60)
|
||
|
||
try:
|
||
# 创建MinIO客户端
|
||
client = Minio(
|
||
MINIO_CONFIG['endpoint'],
|
||
access_key=MINIO_CONFIG['access_key'],
|
||
secret_key=MINIO_CONFIG['secret_key'],
|
||
secure=MINIO_CONFIG['secure']
|
||
)
|
||
|
||
# 检查文件是否存在
|
||
print(f"\n检查文件是否存在...")
|
||
try:
|
||
stat = client.stat_object(BUCKET_NAME, OBJECT_NAME)
|
||
print(f"✓ 文件存在")
|
||
print(f" 文件名: {OBJECT_NAME}")
|
||
print(f" 文件大小: {stat.size} 字节")
|
||
print(f" 最后修改: {stat.last_modified}")
|
||
print(f" 内容类型: {stat.content_type}")
|
||
except Exception as e:
|
||
print(f"✗ 文件不存在或无法访问: {e}")
|
||
return
|
||
|
||
# 生成预签名URL(7天有效期)
|
||
print(f"\n生成预签名URL(7天有效)...")
|
||
url = client.presigned_get_object(
|
||
BUCKET_NAME,
|
||
OBJECT_NAME,
|
||
expires=timedelta(days=7)
|
||
)
|
||
|
||
print(f"\n" + "="*60)
|
||
print("下载链接(7天有效)")
|
||
print("="*60)
|
||
print(f"\n{url}\n")
|
||
|
||
# 生成公共URL(如果存储桶是公共的)
|
||
protocol = "https" if MINIO_CONFIG['secure'] else "http"
|
||
public_url = f"{protocol}://{MINIO_CONFIG['endpoint']}/{BUCKET_NAME}/{OBJECT_NAME}"
|
||
|
||
print("="*60)
|
||
print("公共URL(如果存储桶是公共的)")
|
||
print("="*60)
|
||
print(f"\n{public_url}\n")
|
||
|
||
# 测试下载
|
||
print("="*60)
|
||
print("测试下载链接")
|
||
print("="*60)
|
||
|
||
print(f"\n正在测试预签名URL...")
|
||
try:
|
||
response = requests.get(url, timeout=10, stream=True)
|
||
if response.status_code == 200:
|
||
print(f"✓ 预签名URL可以正常下载")
|
||
print(f" 状态码: {response.status_code}")
|
||
print(f" 内容长度: {len(response.content)} 字节")
|
||
print(f" 内容类型: {response.headers.get('Content-Type', 'N/A')}")
|
||
else:
|
||
print(f"✗ 下载失败,状态码: {response.status_code}")
|
||
except Exception as e:
|
||
print(f"✗ 下载测试失败: {e}")
|
||
|
||
print("\n" + "="*60)
|
||
print("使用说明")
|
||
print("="*60)
|
||
print("\n1. 预签名URL(推荐):")
|
||
print(f" {url}")
|
||
print("\n 特点:")
|
||
print(" - 安全,带有签名验证")
|
||
print(" - 有效期:7天")
|
||
print(" - 可以直接在浏览器中打开下载")
|
||
|
||
print("\n2. 公共URL(如果存储桶是公共的):")
|
||
print(f" {public_url}")
|
||
print("\n 特点:")
|
||
print(" - 需要存储桶设置为公共访问")
|
||
print(" - 永久有效(如果存储桶保持公共)")
|
||
|
||
print("\n" + "="*60)
|
||
print("完成")
|
||
print("="*60)
|
||
|
||
return url, public_url
|
||
|
||
except Exception as e:
|
||
print(f"\n✗ 错误: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None, None
|
||
|
||
if __name__ == '__main__':
|
||
get_template_url()
|
||
|