1# Copyright 2019 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Reads Keysets from file.""" 16 17import abc 18 19from google.protobuf import json_format 20from google.protobuf import message 21from tink.proto import tink_pb2 22from tink import core 23 24 25class KeysetReader(metaclass=abc.ABCMeta): 26 """Reads a Keyset.""" 27 28 @abc.abstractmethod 29 def read(self) -> tink_pb2.Keyset: 30 """Reads and returns a (cleartext) tink_pb2.Keyset from its source.""" 31 raise NotImplementedError() 32 33 @abc.abstractmethod 34 def read_encrypted(self) -> tink_pb2.EncryptedKeyset: 35 """Reads and returns an tink_pb2.EncryptedKeyset from its source.""" 36 raise NotImplementedError() 37 38 39class JsonKeysetReader(KeysetReader): 40 """Reads a JSON Keyset.""" 41 42 def __init__(self, serialized_keyset: str): 43 self._serialized_keyset = serialized_keyset 44 45 def read(self) -> tink_pb2.Keyset: 46 try: 47 return json_format.Parse(self._serialized_keyset, tink_pb2.Keyset()) 48 except json_format.ParseError as e: 49 raise core.TinkError(e) 50 51 def read_encrypted(self) -> tink_pb2.EncryptedKeyset: 52 try: 53 return json_format.Parse(self._serialized_keyset, 54 tink_pb2.EncryptedKeyset()) 55 except json_format.ParseError as e: 56 raise core.TinkError(e) 57 58 59class BinaryKeysetReader(KeysetReader): 60 """Reads a binary Keyset.""" 61 62 def __init__(self, serialized_keyset: bytes): 63 self._serialized_keyset = serialized_keyset 64 65 def read(self) -> tink_pb2.Keyset: 66 if not self._serialized_keyset: 67 raise core.TinkError('No keyset found') 68 try: 69 return tink_pb2.Keyset.FromString(self._serialized_keyset) 70 except message.DecodeError as e: 71 raise core.TinkError(e) 72 73 def read_encrypted(self) -> tink_pb2.EncryptedKeyset: 74 if not self._serialized_keyset: 75 raise core.TinkError('No keyset found') 76 try: 77 return tink_pb2.EncryptedKeyset.FromString(self._serialized_keyset) 78 except message.DecodeError as e: 79 raise core.TinkError(e) 80