xref: /aosp_15_r20/external/tink/python/tink/aead/_kms_envelope_aead.py (revision e7b1675dde1b92d52ec075b0a92829627f2c52a5)
1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Module for envelope encryption with KMS."""
15
16import struct
17
18from tink.proto import tink_pb2
19from tink import core
20from tink.aead import _aead
21
22_SUPPORTED_DEK_KEY_TYPES = frozenset({
23    'type.googleapis.com/google.crypto.tink.AesGcmKey',
24    'type.googleapis.com/google.crypto.tink.XChaCha20Poly1305Key',
25    'type.googleapis.com/google.crypto.tink.AesCtrHmacAeadKey',
26    'type.googleapis.com/google.crypto.tink.AesEaxKey',
27    'type.googleapis.com/google.crypto.tink.AesGcmSivKey',
28})
29
30
31def is_supported_dek_key_type(type_url: str) -> bool:
32  return type_url in _SUPPORTED_DEK_KEY_TYPES
33
34
35class KmsEnvelopeAead(_aead.Aead):
36  """Implements envelope encryption.
37
38  Envelope encryption generates a data encryption key (DEK) which is used
39  to encrypt the payload. The DEK is then send to a KMS to be encrypted and
40  the encrypted DEK is attached to the ciphertext. In order to decrypt the
41  ciphertext, the DEK first has to be decrypted by the KMS, and then the DEK
42  can be used to decrypt the ciphertext. For further information see
43  https://cloud.google.com/kms/docs/envelope-encryption.
44
45  DEK key template must be a KeyTemplate for any of these Tink AEAD key types
46  (any other key template will be rejected):
47  * AesGcmKey
48  * XChaCha20Poly1305
49  * AesCtrHmacAeadKey
50  * AesEaxKey
51  * AesGcmSivKey
52
53  The ciphertext structure is as follows:
54  * Length of the encrypted DEK: 4 bytes (big endian)
55  * Encrypted DEK: variable length, specified by the previous 4 bytes
56  * AEAD payload: variable length
57  """
58
59  # Defines in how many bytes the DEK length will be encoded.
60  DEK_LEN_BYTES = 4
61
62  def __init__(self, key_template: tink_pb2.KeyTemplate, remote: _aead.Aead):
63    if not is_supported_dek_key_type(key_template.type_url):
64      raise core.TinkError(
65          'Unsupported DEK key type: %s' % key_template.type_url
66      )
67    # Create a dek to make sure that it works, so that KmsEnvelopeAead already
68    # fails when it is created, and not just when it is used.
69    # The C++ implementation does the same check, and we want this
70    # implementation to be consistent with the C++ implementation.
71    _ = core.Registry.new_key_data(key_template)
72
73    self.key_template = key_template
74    self.remote_aead = remote
75
76  def encrypt(self, plaintext: bytes, associated_data: bytes) -> bytes:
77    # Get new key from template
78    dek = core.Registry.new_key_data(self.key_template)
79    dek_aead = core.Registry.primitive(dek, _aead.Aead)
80
81    # Encrypt plaintext
82    ciphertext = dek_aead.encrypt(plaintext, associated_data)
83
84    # Wrap DEK key values with remote
85    encrypted_dek = self.remote_aead.encrypt(dek.value, b'')
86
87    # Construct ciphertext, DEK length encoded as big endian
88    enc_dek_len = struct.pack('>I', len(encrypted_dek))
89    return enc_dek_len + encrypted_dek + ciphertext
90
91  def decrypt(self, ciphertext: bytes, associated_data: bytes) -> bytes:
92    ct_len = len(ciphertext)
93
94    # Recover DEK length
95    if ct_len < self.DEK_LEN_BYTES:
96      raise core.TinkError
97
98    dek_len = struct.unpack('>I', ciphertext[0:self.DEK_LEN_BYTES])[0]
99
100    # Basic check if DEK length can be valid.
101    if dek_len > (ct_len - self.DEK_LEN_BYTES) or dek_len < 0:
102      raise core.TinkError
103
104    # Decrypt DEK with remote AEAD
105    encrypted_dek_bytes = ciphertext[self.DEK_LEN_BYTES:self.DEK_LEN_BYTES +
106                                     dek_len]
107    dek_bytes = self.remote_aead.decrypt(encrypted_dek_bytes, b'')
108
109    # Get AEAD primitive based on DEK
110    dek = tink_pb2.KeyData(
111        type_url=self.key_template.type_url,
112        value=dek_bytes,
113        key_material_type=tink_pb2.KeyData.SYMMETRIC,
114    )
115    dek_aead = core.Registry.primitive(dek, _aead.Aead)
116
117    # Extract ciphertext payload and decrypt
118    ct_bytes = ciphertext[self.DEK_LEN_BYTES + dek_len:]
119
120    return dek_aead.decrypt(ct_bytes, associated_data)
121