1# Copyright 2019 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""This module defines KeysetHandle.""" 15 16import random 17 18from typing import Type, TypeVar 19 20from google.protobuf import message 21from tink.proto import tink_pb2 22from tink import _keyset_reader 23from tink import _keyset_writer 24from tink import aead 25from tink import core 26from tink import secret_key_access 27 28P = TypeVar('P') 29 30MAX_INT32 = 2147483647 # = 2^31 - 1 31 32 33class PublicKeyAccess(core.KeyAccess): 34 pass 35 36 37# KeyAccess token that gives access to public keys. 38PUBLIC_KEY_ACCESS_TOKEN = PublicKeyAccess() 39 40 41def has_secret_key_access(token: core.KeyAccess) -> bool: 42 """Returns True if token is secret_key_access.TOKEN, and False otherwise.""" 43 return isinstance(token, secret_key_access.SecretKeyAccess) 44 45 46class KeysetHandle: 47 """A KeysetHandle provides abstracted access to Keyset. 48 49 KeysetHandle limits the exposure of actual protocol buffers that hold 50 sensitive key material. This class allows reading and writing encrypted 51 keysets. 52 """ 53 54 def __new__(cls): 55 raise core.TinkError(('KeysetHandle cannot be instantiated directly.')) 56 57 def __init__(self, keyset: tink_pb2.Keyset): 58 _validate_keyset(keyset) 59 self._keyset = keyset 60 61 @classmethod 62 def generate_new(cls, key_template: tink_pb2.KeyTemplate) -> 'KeysetHandle': 63 """Return a new KeysetHandle. 64 65 It contains a single fresh key generated according to key_template. 66 67 Args: 68 key_template: A tink_pb2.KeyTemplate object. 69 70 Returns: 71 A new KeysetHandle. 72 """ 73 keyset = tink_pb2.Keyset() 74 key_data = core.Registry.new_key_data(key_template) 75 key_id = _generate_unused_key_id(keyset) 76 key = keyset.key.add() 77 key.key_data.CopyFrom(key_data) 78 key.status = tink_pb2.ENABLED 79 key.key_id = key_id 80 key.output_prefix_type = key_template.output_prefix_type 81 keyset.primary_key_id = key_id 82 return cls._create(keyset) 83 84 @classmethod 85 def read(cls, keyset_reader: _keyset_reader.KeysetReader, 86 master_key_aead: aead.Aead) -> 'KeysetHandle': 87 """Tries to create a KeysetHandle from an encrypted keyset.""" 88 return cls.read_with_associated_data(keyset_reader, master_key_aead, b'') 89 90 @classmethod 91 def read_with_associated_data(cls, keyset_reader: _keyset_reader.KeysetReader, 92 master_key_aead: aead.Aead, 93 associated_data: bytes) -> 'KeysetHandle': 94 """Tries to create a KeysetHandle from an encrypted keyset using the provided associated data.""" 95 encrypted_keyset = keyset_reader.read_encrypted() 96 _assert_enough_encrypted_key_material(encrypted_keyset) 97 return cls._create( 98 _decrypt(encrypted_keyset, master_key_aead, associated_data)) 99 100 @classmethod 101 def read_no_secret( 102 cls, keyset_reader: _keyset_reader.KeysetReader) -> 'KeysetHandle': 103 """Creates a KeysetHandle from a keyset with no secret key material. 104 105 This can be used to load public keysets or envelope encryption keysets. 106 107 Args: 108 keyset_reader: A _keyset_reader.KeysetReader object. 109 110 Returns: 111 A new KeysetHandle. 112 """ 113 keyset = keyset_reader.read() 114 _assert_no_secret_key_material(keyset) 115 return cls._create(keyset) 116 117 @classmethod 118 def _create(cls, keyset: tink_pb2.Keyset): 119 o = object.__new__(cls) 120 o.__init__(keyset) 121 return o 122 123 def keyset_info(self) -> tink_pb2.KeysetInfo: 124 """Returns the KeysetInfo that doesn't contain actual key material.""" 125 return _keyset_info(self._keyset) 126 127 def write(self, keyset_writer: _keyset_writer.KeysetWriter, 128 master_key_primitive: aead.Aead) -> None: 129 """Serializes, encrypts with master_key_primitive and writes the keyset.""" 130 self.write_with_associated_data(keyset_writer, master_key_primitive, b'') 131 132 def write_with_associated_data(self, 133 keyset_writer: _keyset_writer.KeysetWriter, 134 master_key_primitive: aead.Aead, 135 associated_data: bytes) -> None: 136 """Serializes, encrypts with master_key_primitive and writes the keyset.""" 137 encrypted_keyset = _encrypt(self._keyset, master_key_primitive, 138 associated_data) 139 keyset_writer.write_encrypted(encrypted_keyset) 140 141 def write_no_secret(self, keyset_writer: _keyset_writer.KeysetWriter) -> None: 142 """Writes the underlying keyset to keyset_writer. 143 144 Writes the underlying keyset to keyset_writer only if the keyset does not 145 contain any secret key material. 146 This can be used to persist public keysets or envelope encryption keysets. 147 Users that need to persist keysets with secret material can use 148 cleartext_keyset_handle. 149 150 Args: 151 keyset_writer: A KeysetWriter object. 152 """ 153 _assert_no_secret_key_material(self._keyset) 154 keyset_writer.write(self._keyset) 155 156 def public_keyset_handle(self) -> 'KeysetHandle': 157 """Returns a new KeysetHandle for the corresponding public keys.""" 158 public_keyset = tink_pb2.Keyset() 159 for key in self._keyset.key: 160 public_key = public_keyset.key.add() 161 public_key.CopyFrom(key) 162 public_key.key_data.CopyFrom(core.Registry.public_key_data(key.key_data)) 163 _validate_key(public_key) 164 public_keyset.primary_key_id = self._keyset.primary_key_id 165 return self._create(public_keyset) 166 167 def primitive(self, primitive_class: Type[P]) -> P: 168 """Returns a wrapped primitive from this KeysetHandle. 169 170 Uses the KeyManager and the PrimitiveWrapper objects in the global 171 registry.Registry 172 to create the primitive. This function is the most common way of creating a 173 primitive. 174 175 Args: 176 primitive_class: The class of the primitive. 177 178 Returns: 179 The primitive. 180 Raises: 181 tink.TinkError if creation of the primitive fails, for example if 182 primitive_class cannot be used with this KeysetHandle. 183 """ 184 _validate_keyset(self._keyset) 185 input_primitive_class = core.Registry.input_primitive_class(primitive_class) 186 pset = core.PrimitiveSet(input_primitive_class) 187 for key in self._keyset.key: 188 if key.status == tink_pb2.ENABLED: 189 primitive = core.Registry.primitive(key.key_data, input_primitive_class) 190 entry = pset.add_primitive(primitive, key) 191 if key.key_id == self._keyset.primary_key_id: 192 pset.set_primary(entry) 193 return core.Registry.wrap(pset, primitive_class) 194 195 196def new_keyset_handle(key_template: tink_pb2.KeyTemplate) -> KeysetHandle: 197 return KeysetHandle.generate_new(key_template) 198 199 200def read_keyset_handle(keyset_reader: _keyset_reader.KeysetReader, 201 master_key_aead: aead.Aead) -> KeysetHandle: 202 return KeysetHandle.read(keyset_reader, master_key_aead) 203 204 205def read_keyset_handle_with_associated_data( 206 keyset_reader: _keyset_reader.KeysetReader, master_key_aead: aead.Aead, 207 associated_data: bytes) -> KeysetHandle: 208 return KeysetHandle.read_with_associated_data(keyset_reader, master_key_aead, 209 associated_data) 210 211 212def read_no_secret_keyset_handle( 213 keyset_reader: _keyset_reader.KeysetReader) -> KeysetHandle: 214 return KeysetHandle.read_no_secret(keyset_reader) 215 216 217def _keyset_info(keyset: tink_pb2.Keyset) -> tink_pb2.KeysetInfo: 218 keyset_info = tink_pb2.KeysetInfo(primary_key_id=keyset.primary_key_id) 219 for key in keyset.key: 220 key_info = keyset_info.key_info.add() 221 key_info.type_url = key.key_data.type_url 222 key_info.status = key.status 223 key_info.output_prefix_type = key.output_prefix_type 224 key_info.key_id = key.key_id 225 return keyset_info 226 227 228def _encrypt(keyset: tink_pb2.Keyset, master_key_primitive: aead.Aead, 229 associated_data: bytes) -> tink_pb2.EncryptedKeyset: 230 """Encrypts a Keyset and returns an EncryptedKeyset.""" 231 encrypted_keyset = master_key_primitive.encrypt(keyset.SerializeToString(), 232 associated_data) 233 # Check if we can decrypt, to detect errors 234 try: 235 keyset2 = tink_pb2.Keyset.FromString( 236 master_key_primitive.decrypt(encrypted_keyset, associated_data)) 237 if keyset != keyset2: 238 raise core.TinkError('cannot encrypt keyset: %s != %s' % 239 (keyset, keyset2)) 240 except message.DecodeError: 241 raise core.TinkError('invalid keyset, corrupted key material') 242 return tink_pb2.EncryptedKeyset( 243 encrypted_keyset=encrypted_keyset, keyset_info=_keyset_info(keyset)) 244 245 246def _decrypt(encrypted_keyset: tink_pb2.EncryptedKeyset, 247 master_key_aead: aead.Aead, 248 associated_data: bytes) -> tink_pb2.Keyset: 249 """Decrypts an EncryptedKeyset and returns a Keyset.""" 250 try: 251 keyset = tink_pb2.Keyset.FromString( 252 master_key_aead.decrypt(encrypted_keyset.encrypted_keyset, 253 associated_data)) 254 # Check emptiness here too, in case the encrypted keys unwrapped to nothing? 255 _assert_enough_key_material(keyset) 256 return keyset 257 except message.DecodeError: 258 raise core.TinkError('invalid keyset, corrupted key material') 259 260 261def _validate_keyset(keyset: tink_pb2.Keyset): 262 """Raises tink_error.TinkError if keyset is not valid.""" 263 for key in keyset.key: 264 if key.status != tink_pb2.DESTROYED: 265 _validate_key(key) 266 num_non_destroyed_keys = sum( 267 1 for key in keyset.key if key.status != tink_pb2.DESTROYED) 268 num_non_public_key_material = sum( 269 1 for key in keyset.key 270 if key.key_data.key_material_type != tink_pb2.KeyData.ASYMMETRIC_PUBLIC) 271 num_primary_keys = sum( 272 1 for key in keyset.key 273 if key.status == tink_pb2.ENABLED and key.key_id == keyset.primary_key_id) 274 if num_non_destroyed_keys == 0: 275 raise core.TinkError('empty keyset') 276 if num_primary_keys > 1: 277 raise core.TinkError('keyset contains multiple primary keys') 278 if num_primary_keys == 0 and num_non_public_key_material > 0: 279 raise core.TinkError('keyset does not contain a valid primary key') 280 281 282def _validate_key(key: tink_pb2.Keyset.Key): 283 """Raises tink_error.TinkError if key is not valid.""" 284 if not key.HasField('key_data'): 285 raise core.TinkError('key {} has no key data'.format(key.key_id)) 286 if key.output_prefix_type == tink_pb2.UNKNOWN_PREFIX: 287 raise core.TinkError('key {} has unknown prefix'.format(key.key_id)) 288 if key.status == tink_pb2.UNKNOWN_STATUS: 289 raise core.TinkError('key {} has unknown status'.format(key.key_id)) 290 291 292def _assert_no_secret_key_material(keyset: tink_pb2.Keyset): 293 for key in keyset.key: 294 if key.key_data.key_material_type in (tink_pb2.KeyData.UNKNOWN_KEYMATERIAL, 295 tink_pb2.KeyData.SYMMETRIC, 296 tink_pb2.KeyData.ASYMMETRIC_PRIVATE): 297 raise core.TinkError('keyset contains secret key material') 298 299 300def _assert_enough_key_material(keyset: tink_pb2.Keyset): 301 if not keyset or not keyset.key: 302 raise core.TinkError('empty keyset') 303 304 305def _assert_enough_encrypted_key_material( 306 encrypted_keyset: tink_pb2.EncryptedKeyset): 307 if not encrypted_keyset or not encrypted_keyset.encrypted_keyset: 308 raise core.TinkError('empty keyset') 309 310 311def _generate_unused_key_id(keyset: tink_pb2.Keyset) -> int: 312 while True: 313 key_id = random.randint(1, MAX_INT32) 314 if key_id not in {key.key_id for key in keyset.key}: 315 return key_id 316