1# Copyright 2019 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Writes Keysets to file.""" 16 17import abc 18from typing import BinaryIO, TextIO 19 20from google.protobuf import json_format 21from tink.proto import tink_pb2 22from tink import core 23 24 25class KeysetWriter(metaclass=abc.ABCMeta): 26 """Knows how to write keysets to some storage system.""" 27 28 @abc.abstractmethod 29 def write(self, keyset: tink_pb2.Keyset) -> None: 30 """Tries to write a tink_pb2.Keyset to some storage system.""" 31 raise NotImplementedError() 32 33 @abc.abstractmethod 34 def write_encrypted(self, encrypted_keyset: tink_pb2.EncryptedKeyset) -> None: 35 """Tries to write an tink_pb2.EncryptedKeyset to some storage system.""" 36 raise NotImplementedError() 37 38 39class JsonKeysetWriter(KeysetWriter): 40 """Writes keysets in proto JSON wire format to some storage system. 41 42 cf. https://developers.google.com/protocol-buffers/docs/encoding 43 """ 44 45 def __init__(self, text_io_stream: TextIO): 46 self._io_stream = text_io_stream 47 48 def write(self, keyset: tink_pb2.Keyset) -> None: 49 if not isinstance(keyset, tink_pb2.Keyset): 50 raise core.TinkError('invalid keyset.') 51 json_keyset = json_format.MessageToJson(keyset) 52 self._io_stream.write(json_keyset) 53 self._io_stream.flush() 54 55 def write_encrypted(self, encrypted_keyset: tink_pb2.EncryptedKeyset) -> None: 56 if not isinstance(encrypted_keyset, tink_pb2.EncryptedKeyset): 57 raise core.TinkError('invalid encrypted keyset.') 58 json_keyset = json_format.MessageToJson(encrypted_keyset) 59 self._io_stream.write(json_keyset) 60 self._io_stream.flush() 61 62 63class BinaryKeysetWriter(KeysetWriter): 64 """Writes keysets in proto binary wire format to some storage system. 65 66 cf. https://developers.google.com/protocol-buffers/docs/encoding 67 """ 68 69 def __init__(self, binary_io_stream: BinaryIO): 70 self._io_stream = binary_io_stream 71 72 def write(self, keyset: tink_pb2.Keyset) -> None: 73 if not isinstance(keyset, tink_pb2.Keyset): 74 raise core.TinkError('invalid keyset.') 75 self._io_stream.write(keyset.SerializeToString()) 76 self._io_stream.flush() 77 78 def write_encrypted(self, encrypted_keyset: tink_pb2.EncryptedKeyset) -> None: 79 if not isinstance(encrypted_keyset, tink_pb2.EncryptedKeyset): 80 raise core.TinkError('invalid encrypted keyset.') 81 self._io_stream.write(encrypted_keyset.SerializeToString()) 82 self._io_stream.flush() 83