xref: /aosp_15_r20/external/tink/python/tink/_keyset_writer.py (revision e7b1675dde1b92d52ec075b0a92829627f2c52a5)
1# Copyright 2019 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Writes Keysets to file."""
16
17import abc
18from typing import BinaryIO, TextIO
19
20from google.protobuf import json_format
21from tink.proto import tink_pb2
22from tink import core
23
24
25class KeysetWriter(metaclass=abc.ABCMeta):
26  """Knows how to write keysets to some storage system."""
27
28  @abc.abstractmethod
29  def write(self, keyset: tink_pb2.Keyset) -> None:
30    """Tries to write a tink_pb2.Keyset to some storage system."""
31    raise NotImplementedError()
32
33  @abc.abstractmethod
34  def write_encrypted(self, encrypted_keyset: tink_pb2.EncryptedKeyset) -> None:
35    """Tries to write an tink_pb2.EncryptedKeyset to some storage system."""
36    raise NotImplementedError()
37
38
39class JsonKeysetWriter(KeysetWriter):
40  """Writes keysets in proto JSON wire format to some storage system.
41
42  cf. https://developers.google.com/protocol-buffers/docs/encoding
43  """
44
45  def __init__(self, text_io_stream: TextIO):
46    self._io_stream = text_io_stream
47
48  def write(self, keyset: tink_pb2.Keyset) -> None:
49    if not isinstance(keyset, tink_pb2.Keyset):
50      raise core.TinkError('invalid keyset.')
51    json_keyset = json_format.MessageToJson(keyset)
52    self._io_stream.write(json_keyset)
53    self._io_stream.flush()
54
55  def write_encrypted(self, encrypted_keyset: tink_pb2.EncryptedKeyset) -> None:
56    if not isinstance(encrypted_keyset, tink_pb2.EncryptedKeyset):
57      raise core.TinkError('invalid encrypted keyset.')
58    json_keyset = json_format.MessageToJson(encrypted_keyset)
59    self._io_stream.write(json_keyset)
60    self._io_stream.flush()
61
62
63class BinaryKeysetWriter(KeysetWriter):
64  """Writes keysets in proto binary wire format to some storage system.
65
66  cf. https://developers.google.com/protocol-buffers/docs/encoding
67  """
68
69  def __init__(self, binary_io_stream: BinaryIO):
70    self._io_stream = binary_io_stream
71
72  def write(self, keyset: tink_pb2.Keyset) -> None:
73    if not isinstance(keyset, tink_pb2.Keyset):
74      raise core.TinkError('invalid keyset.')
75    self._io_stream.write(keyset.SerializeToString())
76    self._io_stream.flush()
77
78  def write_encrypted(self, encrypted_keyset: tink_pb2.EncryptedKeyset) -> None:
79    if not isinstance(encrypted_keyset, tink_pb2.EncryptedKeyset):
80      raise core.TinkError('invalid encrypted keyset.')
81    self._io_stream.write(encrypted_keyset.SerializeToString())
82    self._io_stream.flush()
83