112 lines
3.4 KiB
Python

# -*- coding: utf-8 -*-
# MinIO Python Library for Amazon S3 Compatible Cloud Storage,
# (C) 2018 MinIO, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
minio.sse
~~~~~~~~~~~~~~~~~~~
This module contains core API parsers.
:copyright: (c) 2018 by MinIO, Inc.
:license: Apache 2.0, see LICENSE for more details.
"""
from __future__ import absolute_import, annotations
import base64
import json
from abc import ABCMeta, abstractmethod
from typing import Any, cast
class Sse:
"""Server-side encryption base class."""
__metaclass__ = ABCMeta
@abstractmethod
def headers(self) -> dict[str, str]:
"""Return headers."""
def tls_required(self) -> bool: # pylint: disable=no-self-use
"""Return TLS required to use this server-side encryption."""
return True
def copy_headers(self) -> dict[str, str]: # pylint: disable=no-self-use
"""Return copy headers."""
return {}
class SseCustomerKey(Sse):
""" Server-side encryption - customer key type."""
def __init__(self, key: bytes):
if len(key) != 32:
raise ValueError(
"SSE-C keys need to be 256 bit base64 encoded",
)
b64key = base64.b64encode(key).decode()
from .helpers import \
md5sum_hash # pylint: disable=import-outside-toplevel
md5key = cast(str, md5sum_hash(key))
self._headers: dict[str, str] = {
"X-Amz-Server-Side-Encryption-Customer-Algorithm": "AES256",
"X-Amz-Server-Side-Encryption-Customer-Key": b64key,
"X-Amz-Server-Side-Encryption-Customer-Key-MD5": md5key,
}
self._copy_headers: dict[str, str] = {
"X-Amz-Copy-Source-Server-Side-Encryption-Customer-Algorithm":
"AES256",
"X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key": b64key,
"X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key-MD5":
md5key,
}
def headers(self) -> dict[str, str]:
return self._headers.copy()
def copy_headers(self) -> dict[str, str]:
return self._copy_headers.copy()
class SseKMS(Sse):
"""Server-side encryption - KMS type."""
def __init__(self, key: str, context: dict[str, Any]):
self._headers = {
"X-Amz-Server-Side-Encryption-Aws-Kms-Key-Id": key,
"X-Amz-Server-Side-Encryption": "aws:kms"
}
if context:
data = bytes(json.dumps(context), "utf-8")
self._headers["X-Amz-Server-Side-Encryption-Context"] = (
base64.b64encode(data).decode()
)
def headers(self) -> dict[str, str]:
return self._headers.copy()
class SseS3(Sse):
"""Server-side encryption - S3 type."""
def headers(self) -> dict[str, str]:
return {
"X-Amz-Server-Side-Encryption": "AES256"
}
def tls_required(self) -> bool:
return False