1# Copyright 2021 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""A client for Fake KMS.""" 15 16import base64 17 18from typing import Optional 19 20import tink 21from tink import aead 22from tink import cleartext_keyset_handle 23from tink.aead import _kms_aead_key_manager 24 25 26FAKE_KMS_PREFIX = 'fake-kms://' 27 28 29class FakeKmsClient(_kms_aead_key_manager.KmsClient): 30 """A fake KMS client.""" 31 32 def __init__(self, key_uri: Optional[str] = None): 33 if not key_uri: 34 self._key_uri = None 35 elif key_uri.startswith(FAKE_KMS_PREFIX): 36 self._key_uri = key_uri 37 else: 38 raise tink.TinkError('invalid key URI') 39 40 def does_support(self, key_uri: str) -> bool: 41 if not key_uri.startswith(FAKE_KMS_PREFIX): 42 return False 43 if not self._key_uri: 44 return True 45 return key_uri == self._key_uri 46 47 def get_aead(self, key_uri: str) -> aead.Aead: 48 if not key_uri.startswith(FAKE_KMS_PREFIX): 49 raise tink.TinkError('invalid key URI') 50 key_id = key_uri[len(FAKE_KMS_PREFIX) :] 51 serialized_key = base64.urlsafe_b64decode(key_id.encode('utf-8') + b'===') 52 handle = cleartext_keyset_handle.read( 53 tink.BinaryKeysetReader(serialized_key) 54 ) 55 return handle.primitive(aead.Aead) 56 57 58def register_client(key_uri=None, credentials_path=None) -> None: 59 """Registers a fake KMS client.""" 60 _ = credentials_path 61 _kms_aead_key_manager.register_kms_client(FakeKmsClient(key_uri)) 62