1# Copyright 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Module for envelope encryption with KMS.""" 15 16import struct 17 18from tink.proto import tink_pb2 19from tink import core 20from tink.aead import _aead 21 22_SUPPORTED_DEK_KEY_TYPES = frozenset({ 23 'type.googleapis.com/google.crypto.tink.AesGcmKey', 24 'type.googleapis.com/google.crypto.tink.XChaCha20Poly1305Key', 25 'type.googleapis.com/google.crypto.tink.AesCtrHmacAeadKey', 26 'type.googleapis.com/google.crypto.tink.AesEaxKey', 27 'type.googleapis.com/google.crypto.tink.AesGcmSivKey', 28}) 29 30 31def is_supported_dek_key_type(type_url: str) -> bool: 32 return type_url in _SUPPORTED_DEK_KEY_TYPES 33 34 35class KmsEnvelopeAead(_aead.Aead): 36 """Implements envelope encryption. 37 38 Envelope encryption generates a data encryption key (DEK) which is used 39 to encrypt the payload. The DEK is then send to a KMS to be encrypted and 40 the encrypted DEK is attached to the ciphertext. In order to decrypt the 41 ciphertext, the DEK first has to be decrypted by the KMS, and then the DEK 42 can be used to decrypt the ciphertext. For further information see 43 https://cloud.google.com/kms/docs/envelope-encryption. 44 45 DEK key template must be a KeyTemplate for any of these Tink AEAD key types 46 (any other key template will be rejected): 47 * AesGcmKey 48 * XChaCha20Poly1305 49 * AesCtrHmacAeadKey 50 * AesEaxKey 51 * AesGcmSivKey 52 53 The ciphertext structure is as follows: 54 * Length of the encrypted DEK: 4 bytes (big endian) 55 * Encrypted DEK: variable length, specified by the previous 4 bytes 56 * AEAD payload: variable length 57 """ 58 59 # Defines in how many bytes the DEK length will be encoded. 60 DEK_LEN_BYTES = 4 61 62 def __init__(self, key_template: tink_pb2.KeyTemplate, remote: _aead.Aead): 63 if not is_supported_dek_key_type(key_template.type_url): 64 raise core.TinkError( 65 'Unsupported DEK key type: %s' % key_template.type_url 66 ) 67 # Create a dek to make sure that it works, so that KmsEnvelopeAead already 68 # fails when it is created, and not just when it is used. 69 # The C++ implementation does the same check, and we want this 70 # implementation to be consistent with the C++ implementation. 71 _ = core.Registry.new_key_data(key_template) 72 73 self.key_template = key_template 74 self.remote_aead = remote 75 76 def encrypt(self, plaintext: bytes, associated_data: bytes) -> bytes: 77 # Get new key from template 78 dek = core.Registry.new_key_data(self.key_template) 79 dek_aead = core.Registry.primitive(dek, _aead.Aead) 80 81 # Encrypt plaintext 82 ciphertext = dek_aead.encrypt(plaintext, associated_data) 83 84 # Wrap DEK key values with remote 85 encrypted_dek = self.remote_aead.encrypt(dek.value, b'') 86 87 # Construct ciphertext, DEK length encoded as big endian 88 enc_dek_len = struct.pack('>I', len(encrypted_dek)) 89 return enc_dek_len + encrypted_dek + ciphertext 90 91 def decrypt(self, ciphertext: bytes, associated_data: bytes) -> bytes: 92 ct_len = len(ciphertext) 93 94 # Recover DEK length 95 if ct_len < self.DEK_LEN_BYTES: 96 raise core.TinkError 97 98 dek_len = struct.unpack('>I', ciphertext[0:self.DEK_LEN_BYTES])[0] 99 100 # Basic check if DEK length can be valid. 101 if dek_len > (ct_len - self.DEK_LEN_BYTES) or dek_len < 0: 102 raise core.TinkError 103 104 # Decrypt DEK with remote AEAD 105 encrypted_dek_bytes = ciphertext[self.DEK_LEN_BYTES:self.DEK_LEN_BYTES + 106 dek_len] 107 dek_bytes = self.remote_aead.decrypt(encrypted_dek_bytes, b'') 108 109 # Get AEAD primitive based on DEK 110 dek = tink_pb2.KeyData( 111 type_url=self.key_template.type_url, 112 value=dek_bytes, 113 key_material_type=tink_pb2.KeyData.SYMMETRIC, 114 ) 115 dek_aead = core.Registry.primitive(dek, _aead.Aead) 116 117 # Extract ciphertext payload and decrypt 118 ct_bytes = ciphertext[self.DEK_LEN_BYTES + dek_len:] 119 120 return dek_aead.decrypt(ct_bytes, associated_data) 121