1# Copyright 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Python wrapper of the wrapped C++ Streaming AEAD key manager.""" 15 16import io 17from typing import BinaryIO 18 19from tink import core 20from tink.cc.pybind import tink_bindings 21from tink.streaming_aead import _decrypting_stream 22from tink.streaming_aead import _encrypting_stream 23from tink.streaming_aead import _raw_streaming_aead 24from tink.streaming_aead import _streaming_aead_wrapper 25 26 27class _StreamingAeadCcToPyWrapper(_raw_streaming_aead.RawStreamingAead): 28 """Transforms C++ StreamingAead into a RawStreamingAead Python primitive.""" 29 30 def __init__(self, cc_streaming_aead: tink_bindings.StreamingAead): 31 self._cc_streaming_aead = cc_streaming_aead 32 33 def new_raw_encrypting_stream(self, ciphertext_destination: BinaryIO, 34 associated_data: bytes) -> io.RawIOBase: 35 return _encrypting_stream.RawEncryptingStream(self._cc_streaming_aead, 36 ciphertext_destination, 37 associated_data) 38 39 def new_raw_decrypting_stream( 40 self, 41 ciphertext_source: BinaryIO, 42 associated_data: bytes, 43 close_ciphertext_source: bool) -> io.RawIOBase: 44 return _decrypting_stream.RawDecryptingStream( 45 self._cc_streaming_aead, 46 ciphertext_source, 47 associated_data, 48 close_ciphertext_source=close_ciphertext_source) 49 50 51def from_cc_registry( 52 type_url: str) -> core.KeyManager[_raw_streaming_aead.RawStreamingAead]: 53 return core.KeyManagerCcToPyWrapper( 54 tink_bindings.StreamingAeadKeyManager.from_cc_registry(type_url), 55 _raw_streaming_aead.RawStreamingAead, _StreamingAeadCcToPyWrapper) 56 57 58def register() -> None: 59 """Registers Streaming AEAD key managers and the wrapper in the Registry.""" 60 tink_bindings.register() 61 for ident in ( 62 'AesCtrHmacStreamingKey', 63 'AesGcmHkdfStreamingKey', 64 ): 65 type_url = 'type.googleapis.com/google.crypto.tink.{}'.format(ident) 66 key_manager = core.KeyManagerCcToPyWrapper( 67 tink_bindings.StreamingAeadKeyManager.from_cc_registry(type_url), 68 _raw_streaming_aead.RawStreamingAead, _StreamingAeadCcToPyWrapper) 69 core.Registry.register_key_manager(key_manager, new_key_allowed=True) 70 core.Registry.register_primitive_wrapper( 71 _streaming_aead_wrapper.StreamingAeadWrapper()) 72