xref: /aosp_15_r20/external/tink/testing/python/services.py (revision e7b1675dde1b92d52ec075b0a92829627f2c52a5)
1# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS-IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Testing service API implementations in Python."""
15
16import io
17
18import grpc
19import tink
20from tink import aead
21from tink import cleartext_keyset_handle
22from tink import daead
23from tink import hybrid
24from tink import jwt
25from tink import mac
26from tink import prf
27from tink import signature
28from tink import streaming_aead
29from tink.proto import tink_pb2
30from tink.testing import bytes_io
31from protos import testing_api_pb2
32from protos import testing_api_pb2_grpc
33
34
35# All KeyTemplate (as Protobuf) defined in the Python API.
36_KEY_TEMPLATE = {
37    'AES128_EAX':
38        aead.aead_key_templates.AES128_EAX,
39    'AES128_EAX_RAW':
40        aead.aead_key_templates.AES128_EAX_RAW,
41    'AES256_EAX':
42        aead.aead_key_templates.AES256_EAX,
43    'AES256_EAX_RAW':
44        aead.aead_key_templates.AES256_EAX_RAW,
45    'AES128_GCM':
46        aead.aead_key_templates.AES128_GCM,
47    'AES128_GCM_RAW':
48        aead.aead_key_templates.AES128_GCM_RAW,
49    'AES256_GCM':
50        aead.aead_key_templates.AES256_GCM,
51    'AES256_GCM_RAW':
52        aead.aead_key_templates.AES256_GCM_RAW,
53    'AES128_GCM_SIV':
54        aead.aead_key_templates.AES128_GCM_SIV,
55    'AES128_GCM_SIV_RAW':
56        aead.aead_key_templates.AES128_GCM_SIV_RAW,
57    'AES256_GCM_SIV':
58        aead.aead_key_templates.AES256_GCM_SIV,
59    'AES256_GCM_SIV_RAW':
60        aead.aead_key_templates.AES256_GCM_SIV_RAW,
61    'AES128_CTR_HMAC_SHA256':
62        aead.aead_key_templates.AES128_CTR_HMAC_SHA256,
63    'AES128_CTR_HMAC_SHA256_RAW':
64        aead.aead_key_templates.AES128_CTR_HMAC_SHA256_RAW,
65    'AES256_CTR_HMAC_SHA256':
66        aead.aead_key_templates.AES256_CTR_HMAC_SHA256,
67    'AES256_CTR_HMAC_SHA256_RAW':
68        aead.aead_key_templates.AES256_CTR_HMAC_SHA256_RAW,
69    'XCHACHA20_POLY1305':
70        aead.aead_key_templates.XCHACHA20_POLY1305,
71    'XCHACHA20_POLY1305_RAW':
72        aead.aead_key_templates.XCHACHA20_POLY1305_RAW,
73    'AES256_SIV':
74        daead.deterministic_aead_key_templates.AES256_SIV,
75    'AES128_CTR_HMAC_SHA256_4KB':
76        streaming_aead.streaming_aead_key_templates.AES128_CTR_HMAC_SHA256_4KB,
77    'AES128_CTR_HMAC_SHA256_1MB':
78        streaming_aead.streaming_aead_key_templates.AES128_CTR_HMAC_SHA256_1MB,
79    'AES256_CTR_HMAC_SHA256_4KB':
80        streaming_aead.streaming_aead_key_templates.AES256_CTR_HMAC_SHA256_4KB,
81    'AES256_CTR_HMAC_SHA256_1MB':
82        streaming_aead.streaming_aead_key_templates.AES256_CTR_HMAC_SHA256_1MB,
83    'AES128_GCM_HKDF_4KB':
84        streaming_aead.streaming_aead_key_templates.AES128_GCM_HKDF_4KB,
85    'AES128_GCM_HKDF_1MB':
86        streaming_aead.streaming_aead_key_templates.AES128_GCM_HKDF_1MB,
87    'AES256_GCM_HKDF_4KB':
88        streaming_aead.streaming_aead_key_templates.AES256_GCM_HKDF_4KB,
89    'AES256_GCM_HKDF_1MB':
90        streaming_aead.streaming_aead_key_templates.AES256_GCM_HKDF_1MB,
91    'ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM':
92        hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM,
93    'ECIES_P256_COMPRESSED_HKDF_HMAC_SHA256_AES128_GCM':
94        hybrid.hybrid_key_templates
95        .ECIES_P256_COMPRESSED_HKDF_HMAC_SHA256_AES128_GCM,
96    'ECIES_P256_HKDF_HMAC_SHA256_AES128_CTR_HMAC_SHA256':
97        hybrid.hybrid_key_templates
98        .ECIES_P256_HKDF_HMAC_SHA256_AES128_CTR_HMAC_SHA256,
99    'ECIES_P256_COMPRESSED_HKDF_HMAC_SHA256_AES128_CTR_HMAC_SHA256':
100        hybrid.hybrid_key_templates
101        .ECIES_P256_COMPRESSED_HKDF_HMAC_SHA256_AES128_CTR_HMAC_SHA256,
102    'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_128_GCM':
103        hybrid.hybrid_key_templates
104        .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_128_GCM,
105    'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_128_GCM_RAW':
106        hybrid.hybrid_key_templates
107        .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_128_GCM_RAW,
108    'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_256_GCM':
109        hybrid.hybrid_key_templates
110        .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_256_GCM,
111    'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_256_GCM_RAW':
112        hybrid.hybrid_key_templates
113        .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_256_GCM_RAW,
114    'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_CHACHA20_POLY1305':
115        hybrid.hybrid_key_templates
116        .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_CHACHA20_POLY1305,
117    'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_CHACHA20_POLY1305_RAW':
118        hybrid.hybrid_key_templates
119        .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_CHACHA20_POLY1305_RAW,
120    'AES_CMAC':
121        mac.mac_key_templates.AES_CMAC,
122    'HMAC_SHA256_128BITTAG':
123        mac.mac_key_templates.HMAC_SHA256_128BITTAG,
124    'HMAC_SHA256_256BITTAG':
125        mac.mac_key_templates.HMAC_SHA256_256BITTAG,
126    'HMAC_SHA512_256BITTAG':
127        mac.mac_key_templates.HMAC_SHA512_256BITTAG,
128    'HMAC_SHA512_512BITTAG':
129        mac.mac_key_templates.HMAC_SHA512_512BITTAG,
130    'ECDSA_P256':
131        signature.signature_key_templates.ECDSA_P256,
132    'ECDSA_P256_RAW':
133        signature.signature_key_templates.ECDSA_P256_RAW,
134    'ECDSA_P384':
135        signature.signature_key_templates.ECDSA_P384,
136    'ECDSA_P384_SHA384':
137        signature.signature_key_templates.ECDSA_P384_SHA384,
138    'ECDSA_P384_SHA512':
139        signature.signature_key_templates.ECDSA_P384_SHA512,
140    'ECDSA_P521':
141        signature.signature_key_templates.ECDSA_P521,
142    'ECDSA_P256_IEEE_P1363':
143        signature.signature_key_templates.ECDSA_P256_IEEE_P1363,
144    'ECDSA_P384_IEEE_P1363':
145        signature.signature_key_templates.ECDSA_P384_IEEE_P1363,
146    'ECDSA_P384_SHA384_IEEE_P1363':
147        signature.signature_key_templates.ECDSA_P384_SHA384_IEEE_P1363,
148    'ECDSA_P521_IEEE_P1363':
149        signature.signature_key_templates.ECDSA_P521_IEEE_P1363,
150    'ED25519':
151        signature.signature_key_templates.ED25519,
152    'RSA_SSA_PKCS1_3072_SHA256_F4':
153        signature.signature_key_templates.RSA_SSA_PKCS1_3072_SHA256_F4,
154    'RSA_SSA_PKCS1_4096_SHA512_F4':
155        signature.signature_key_templates.RSA_SSA_PKCS1_4096_SHA512_F4,
156    'RSA_SSA_PSS_3072_SHA256_SHA256_32_F4':
157        signature.signature_key_templates.RSA_SSA_PSS_3072_SHA256_SHA256_32_F4,
158    'RSA_SSA_PSS_4096_SHA512_SHA512_64_F4':
159        signature.signature_key_templates.RSA_SSA_PSS_4096_SHA512_SHA512_64_F4,
160    'AES_CMAC_PRF':
161        prf.prf_key_templates.AES_CMAC,
162    'HMAC_SHA256_PRF':
163        prf.prf_key_templates.HMAC_SHA256,
164    'HMAC_SHA512_PRF':
165        prf.prf_key_templates.HMAC_SHA512,
166    'HKDF_SHA256':
167        prf.prf_key_templates.HKDF_SHA256,
168    'JWT_HS256':
169        jwt.jwt_hs256_template(),
170    'JWT_HS256_RAW':
171        jwt.raw_jwt_hs256_template(),
172    'JWT_HS384':
173        jwt.jwt_hs384_template(),
174    'JWT_HS384_RAW':
175        jwt.raw_jwt_hs384_template(),
176    'JWT_HS512':
177        jwt.jwt_hs512_template(),
178    'JWT_HS512_RAW':
179        jwt.raw_jwt_hs512_template(),
180    'JWT_ES256':
181        jwt.jwt_es256_template(),
182    'JWT_ES256_RAW':
183        jwt.raw_jwt_es256_template(),
184    'JWT_ES384':
185        jwt.jwt_es384_template(),
186    'JWT_ES384_RAW':
187        jwt.raw_jwt_es384_template(),
188    'JWT_ES512':
189        jwt.jwt_es512_template(),
190    'JWT_ES512_RAW':
191        jwt.raw_jwt_es512_template(),
192    'JWT_RS256_2048_F4':
193        jwt.jwt_rs256_2048_f4_template(),
194    'JWT_RS256_2048_F4_RAW':
195        jwt.raw_jwt_rs256_2048_f4_template(),
196    'JWT_RS256_3072_F4':
197        jwt.jwt_rs256_3072_f4_template(),
198    'JWT_RS256_3072_F4_RAW':
199        jwt.raw_jwt_rs256_3072_f4_template(),
200    'JWT_RS384_3072_F4':
201        jwt.jwt_rs384_3072_f4_template(),
202    'JWT_RS384_3072_F4_RAW':
203        jwt.raw_jwt_rs384_3072_f4_template(),
204    'JWT_RS512_4096_F4':
205        jwt.jwt_rs512_4096_f4_template(),
206    'JWT_RS512_4096_F4_RAW':
207        jwt.raw_jwt_rs512_4096_f4_template(),
208    'JWT_PS256_2048_F4':
209        jwt.jwt_ps256_2048_f4_template(),
210    'JWT_PS256_2048_F4_RAW':
211        jwt.raw_jwt_ps256_2048_f4_template(),
212    'JWT_PS256_3072_F4':
213        jwt.jwt_ps256_3072_f4_template(),
214    'JWT_PS256_3072_F4_RAW':
215        jwt.raw_jwt_ps256_3072_f4_template(),
216    'JWT_PS384_3072_F4':
217        jwt.jwt_ps384_3072_f4_template(),
218    'JWT_PS384_3072_F4_RAW':
219        jwt.raw_jwt_ps384_3072_f4_template(),
220    'JWT_PS512_4096_F4':
221        jwt.jwt_ps512_4096_f4_template(),
222    'JWT_PS512_4096_F4_RAW':
223        jwt.raw_jwt_ps512_4096_f4_template(),
224}
225
226
227class MetadataServicer(testing_api_pb2_grpc.MetadataServicer):
228  """A service with metadata about the server."""
229
230  def GetServerInfo(
231      self, request: testing_api_pb2.ServerInfoRequest,
232      context: grpc.ServicerContext) -> testing_api_pb2.ServerInfoResponse:
233    """Returns information about the server."""
234    return testing_api_pb2.ServerInfoResponse(language='python')
235
236
237class KeysetServicer(testing_api_pb2_grpc.KeysetServicer):
238  """A service for testing Keyset operations."""
239
240  def GetTemplate(
241      self, request: testing_api_pb2.KeysetTemplateRequest,
242      context: grpc.ServicerContext) -> testing_api_pb2.KeysetTemplateResponse:
243    """Returns the key template for the given template name."""
244    if request.template_name not in _KEY_TEMPLATE:
245      return testing_api_pb2.KeysetTemplateResponse(
246          err='template %s not found' % request.template_name)
247    return  testing_api_pb2.KeysetTemplateResponse(
248        key_template=_KEY_TEMPLATE[request.template_name].SerializeToString())
249
250  def Generate(
251      self, request: testing_api_pb2.KeysetGenerateRequest,
252      context: grpc.ServicerContext) -> testing_api_pb2.KeysetGenerateResponse:
253    """Generates a keyset."""
254    try:
255      template = tink_pb2.KeyTemplate()
256      template.ParseFromString(request.template)
257      keyset_handle = tink.new_keyset_handle(template)
258      keyset = io.BytesIO()
259      cleartext_keyset_handle.write(
260          tink.BinaryKeysetWriter(keyset), keyset_handle)
261      return testing_api_pb2.KeysetGenerateResponse(keyset=keyset.getvalue())
262    except tink.TinkError as e:
263      return testing_api_pb2.KeysetGenerateResponse(err=str(e))
264
265  def Public(
266      self, request: testing_api_pb2.KeysetPublicRequest,
267      context: grpc.ServicerContext) -> testing_api_pb2.KeysetPublicResponse:
268    """Generates a public-key keyset from a private-key keyset."""
269    try:
270      private_keyset_handle = cleartext_keyset_handle.read(
271          tink.BinaryKeysetReader(request.private_keyset))
272      public_keyset_handle = private_keyset_handle.public_keyset_handle()
273      public_keyset = io.BytesIO()
274      cleartext_keyset_handle.write(
275          tink.BinaryKeysetWriter(public_keyset), public_keyset_handle)
276      return testing_api_pb2.KeysetPublicResponse(
277          public_keyset=public_keyset.getvalue())
278    except tink.TinkError as e:
279      return testing_api_pb2.KeysetPublicResponse(err=str(e))
280
281  def ToJson(
282      self, request: testing_api_pb2.KeysetToJsonRequest,
283      context: grpc.ServicerContext) -> testing_api_pb2.KeysetToJsonResponse:
284    """Converts a keyset from binary to JSON format."""
285    try:
286      keyset_handle = cleartext_keyset_handle.read(
287          tink.BinaryKeysetReader(request.keyset))
288      json_keyset = io.StringIO()
289      cleartext_keyset_handle.write(
290          tink.JsonKeysetWriter(json_keyset), keyset_handle)
291      return testing_api_pb2.KeysetToJsonResponse(
292          json_keyset=json_keyset.getvalue())
293    except tink.TinkError as e:
294      return testing_api_pb2.KeysetToJsonResponse(err=str(e))
295
296  def FromJson(
297      self, request: testing_api_pb2.KeysetFromJsonRequest,
298      context: grpc.ServicerContext) -> testing_api_pb2.KeysetFromJsonResponse:
299    """Converts a keyset from JSON to binary format."""
300    try:
301      keyset_handle = cleartext_keyset_handle.read(
302          tink.JsonKeysetReader(request.json_keyset))
303      keyset = io.BytesIO()
304      cleartext_keyset_handle.write(
305          tink.BinaryKeysetWriter(keyset), keyset_handle)
306      return testing_api_pb2.KeysetFromJsonResponse(keyset=keyset.getvalue())
307    except tink.TinkError as e:
308      return testing_api_pb2.KeysetFromJsonResponse(err=str(e))
309
310  def ReadEncrypted(
311      self, request: testing_api_pb2.KeysetReadEncryptedRequest,
312      context: grpc.ServicerContext
313  ) -> testing_api_pb2.KeysetReadEncryptedResponse:
314    """Reads an encrypted keyset."""
315    try:
316      master_keyset_handle = cleartext_keyset_handle.read(
317          tink.BinaryKeysetReader(request.master_keyset))
318      master_aead = master_keyset_handle.primitive(aead.Aead)
319
320      if request.keyset_reader_type == testing_api_pb2.KEYSET_READER_BINARY:
321        reader = tink.BinaryKeysetReader(request.encrypted_keyset)
322      elif request.keyset_reader_type == testing_api_pb2.KEYSET_READER_JSON:
323        reader = tink.JsonKeysetReader(request.encrypted_keyset.decode('utf8'))
324      else:
325        raise ValueError('unknown keyset reader type')
326      if request.HasField('associated_data'):
327        keyset_handle = tink.read_keyset_handle_with_associated_data(
328            reader, master_aead, request.associated_data.value)
329      else:
330        keyset_handle = tink.read_keyset_handle(reader, master_aead)
331
332      keyset = io.BytesIO()
333      cleartext_keyset_handle.write(
334          tink.BinaryKeysetWriter(keyset), keyset_handle)
335      return testing_api_pb2.KeysetReadEncryptedResponse(
336          keyset=keyset.getvalue())
337    except tink.TinkError as e:
338      return testing_api_pb2.KeysetReadEncryptedResponse(err=str(e))
339
340  def WriteEncrypted(
341      self, request: testing_api_pb2.KeysetWriteEncryptedRequest,
342      context: grpc.ServicerContext
343  ) -> testing_api_pb2.KeysetWriteEncryptedResponse:
344    """Writes an encrypted keyset."""
345    try:
346      master_keyset_handle = cleartext_keyset_handle.read(
347          tink.BinaryKeysetReader(request.master_keyset))
348      keyset_handle = cleartext_keyset_handle.read(
349          tink.BinaryKeysetReader(request.keyset))
350      master_aead = master_keyset_handle.primitive(aead.Aead)
351
352      if request.keyset_writer_type == testing_api_pb2.KEYSET_WRITER_BINARY:
353        encrypted_keyset = io.BytesIO()
354        writer = tink.BinaryKeysetWriter(encrypted_keyset)
355        if request.HasField('associated_data'):
356          keyset_handle.write_with_associated_data(
357              writer, master_aead, request.associated_data.value)
358        else:
359          keyset_handle.write(writer, master_aead)
360        return testing_api_pb2.KeysetWriteEncryptedResponse(
361            encrypted_keyset=encrypted_keyset.getvalue())
362      elif request.keyset_writer_type == testing_api_pb2.KEYSET_WRITER_JSON:
363        encrypted_keyset = io.StringIO()
364        writer = tink.JsonKeysetWriter(encrypted_keyset)
365        if request.HasField('associated_data'):
366          keyset_handle.write_with_associated_data(
367              writer, master_aead, request.associated_data.value)
368        else:
369          keyset_handle.write(writer, master_aead)
370        return testing_api_pb2.KeysetWriteEncryptedResponse(
371            encrypted_keyset=encrypted_keyset.getvalue().encode('utf8'))
372      else:
373        raise ValueError('unknown keyset writer type')
374    except tink.TinkError as e:
375      return testing_api_pb2.KeysetWriteEncryptedResponse(err=str(e))
376
377
378class AeadServicer(testing_api_pb2_grpc.AeadServicer):
379  """A service for testing AEAD encryption."""
380
381  def Create(self, request: testing_api_pb2.CreationRequest,
382             context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse:
383    """Creates an AEAD without using it."""
384    try:
385      keyset_handle = cleartext_keyset_handle.read(
386          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
387      keyset_handle.primitive(aead.Aead)
388      return testing_api_pb2.CreationResponse()
389    except tink.TinkError as e:
390      return testing_api_pb2.CreationResponse(err=str(e))
391
392  def Encrypt(
393      self, request: testing_api_pb2.AeadEncryptRequest,
394      context: grpc.ServicerContext) -> testing_api_pb2.AeadEncryptResponse:
395    """Encrypts a message."""
396    keyset_handle = cleartext_keyset_handle.read(
397        tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
398    p = keyset_handle.primitive(aead.Aead)
399    try:
400      ciphertext = p.encrypt(request.plaintext, request.associated_data)
401      return testing_api_pb2.AeadEncryptResponse(ciphertext=ciphertext)
402    except tink.TinkError as e:
403      return testing_api_pb2.AeadEncryptResponse(err=str(e))
404
405  def Decrypt(
406      self, request: testing_api_pb2.AeadDecryptRequest,
407      context: grpc.ServicerContext) -> testing_api_pb2.AeadDecryptResponse:
408    """Decrypts a message."""
409    keyset_handle = cleartext_keyset_handle.read(
410        tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
411    p = keyset_handle.primitive(aead.Aead)
412    try:
413      plaintext = p.decrypt(request.ciphertext, request.associated_data)
414      return testing_api_pb2.AeadDecryptResponse(plaintext=plaintext)
415    except tink.TinkError as e:
416      return testing_api_pb2.AeadDecryptResponse(err=str(e))
417
418
419class StreamingAeadServicer(testing_api_pb2_grpc.StreamingAeadServicer):
420  """A service for testing StreamingAEAD encryption."""
421
422  def Create(self, request: testing_api_pb2.CreationRequest,
423             context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse:
424    """Creates a Streaming Aead without using it."""
425    try:
426      keyset_handle = cleartext_keyset_handle.read(
427          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
428      keyset_handle.primitive(streaming_aead.StreamingAead)
429      return testing_api_pb2.CreationResponse()
430    except tink.TinkError as e:
431      return testing_api_pb2.CreationResponse(err=str(e))
432
433  def Encrypt(
434      self, request: testing_api_pb2.StreamingAeadEncryptRequest,
435      context: grpc.ServicerContext
436  ) -> testing_api_pb2.StreamingAeadEncryptResponse:
437    """Encrypts a message."""
438    try:
439      keyset_handle = cleartext_keyset_handle.read(
440          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
441      p = keyset_handle.primitive(streaming_aead.StreamingAead)
442      ciphertext_destination = bytes_io.BytesIOWithValueAfterClose()
443      with p.new_encrypting_stream(ciphertext_destination,
444                                   request.associated_data) as plaintext_stream:
445        plaintext_stream.write(request.plaintext)
446      return testing_api_pb2.StreamingAeadEncryptResponse(
447          ciphertext=ciphertext_destination.value_after_close())
448    except tink.TinkError as e:
449      return testing_api_pb2.StreamingAeadEncryptResponse(err=str(e))
450
451  def Decrypt(
452      self, request: testing_api_pb2.StreamingAeadDecryptRequest,
453      context: grpc.ServicerContext
454  ) -> testing_api_pb2.StreamingAeadDecryptResponse:
455    """Decrypts a message."""
456    try:
457      keyset_handle = cleartext_keyset_handle.read(
458          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
459      p = keyset_handle.primitive(streaming_aead.StreamingAead)
460      stream = io.BytesIO(request.ciphertext)
461      with p.new_decrypting_stream(stream, request.associated_data) as s:
462        plaintext = s.read()
463      return testing_api_pb2.StreamingAeadDecryptResponse(plaintext=plaintext)
464    except tink.TinkError as e:
465      return testing_api_pb2.StreamingAeadDecryptResponse(err=str(e))
466
467
468class DeterministicAeadServicer(testing_api_pb2_grpc.DeterministicAeadServicer):
469  """A service for testing Deterministic AEAD encryption."""
470
471  def Create(self, request: testing_api_pb2.CreationRequest,
472             context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse:
473    """Creates a Deterministic AEAD without using it."""
474    try:
475      keyset_handle = cleartext_keyset_handle.read(
476          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
477      keyset_handle.primitive(daead.DeterministicAead)
478      return testing_api_pb2.CreationResponse()
479    except tink.TinkError as e:
480      return testing_api_pb2.CreationResponse(err=str(e))
481
482  def EncryptDeterministically(
483      self, request: testing_api_pb2.DeterministicAeadEncryptRequest,
484      context: grpc.ServicerContext
485  ) -> testing_api_pb2.DeterministicAeadEncryptResponse:
486    """Encrypts a message."""
487    keyset_handle = cleartext_keyset_handle.read(
488        tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
489    p = keyset_handle.primitive(daead.DeterministicAead)
490    try:
491      ciphertext = p.encrypt_deterministically(request.plaintext,
492                                               request.associated_data)
493      return testing_api_pb2.DeterministicAeadEncryptResponse(
494          ciphertext=ciphertext)
495    except tink.TinkError as e:
496      return testing_api_pb2.DeterministicAeadEncryptResponse(err=str(e))
497
498  def DecryptDeterministically(
499      self, request: testing_api_pb2.DeterministicAeadDecryptRequest,
500      context: grpc.ServicerContext
501  ) -> testing_api_pb2.DeterministicAeadDecryptResponse:
502    """Decrypts a message."""
503    keyset_handle = cleartext_keyset_handle.read(
504        tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
505    p = keyset_handle.primitive(daead.DeterministicAead)
506    try:
507      plaintext = p.decrypt_deterministically(request.ciphertext,
508                                              request.associated_data)
509      return testing_api_pb2.DeterministicAeadDecryptResponse(
510          plaintext=plaintext)
511    except tink.TinkError as e:
512      return testing_api_pb2.DeterministicAeadDecryptResponse(err=str(e))
513
514
515class MacServicer(testing_api_pb2_grpc.MacServicer):
516  """A service for testing MACs."""
517
518  def Create(self, request: testing_api_pb2.CreationRequest,
519             context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse:
520    """Creates a MAC without using it."""
521    try:
522      keyset_handle = cleartext_keyset_handle.read(
523          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
524      keyset_handle.primitive(mac.Mac)
525      return testing_api_pb2.CreationResponse()
526    except tink.TinkError as e:
527      return testing_api_pb2.CreationResponse(err=str(e))
528
529  def ComputeMac(
530      self, request: testing_api_pb2.ComputeMacRequest,
531      context: grpc.ServicerContext) -> testing_api_pb2.ComputeMacResponse:
532    """Computes a MAC."""
533    try:
534      keyset_handle = cleartext_keyset_handle.read(
535          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
536      p = keyset_handle.primitive(mac.Mac)
537      mac_value = p.compute_mac(request.data)
538      return testing_api_pb2.ComputeMacResponse(mac_value=mac_value)
539    except tink.TinkError as e:
540      return testing_api_pb2.ComputeMacResponse(err=str(e))
541
542  def VerifyMac(
543      self, request: testing_api_pb2.VerifyMacRequest,
544      context: grpc.ServicerContext) -> testing_api_pb2.VerifyMacResponse:
545    """Verifies a MAC value."""
546    try:
547      keyset_handle = cleartext_keyset_handle.read(
548          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
549      p = keyset_handle.primitive(mac.Mac)
550      p.verify_mac(request.mac_value, request.data)
551      return testing_api_pb2.VerifyMacResponse()
552    except tink.TinkError as e:
553      return testing_api_pb2.VerifyMacResponse(err=str(e))
554
555
556class HybridServicer(testing_api_pb2_grpc.HybridServicer):
557  """A service for testing hybrid encryption and decryption."""
558
559  def CreateHybridEncrypt(
560      self, request: testing_api_pb2.CreationRequest,
561      context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse:
562    """Creates a HybridEncrypt without using it."""
563    try:
564      keyset_handle = cleartext_keyset_handle.read(
565          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
566      keyset_handle.primitive(hybrid.HybridEncrypt)
567      return testing_api_pb2.CreationResponse()
568    except tink.TinkError as e:
569      return testing_api_pb2.CreationResponse(err=str(e))
570
571  def CreateHybridDecrypt(
572      self, request: testing_api_pb2.CreationRequest,
573      context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse:
574    """Creates a HybridDecrypt without using it."""
575    try:
576      keyset_handle = cleartext_keyset_handle.read(
577          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
578      keyset_handle.primitive(hybrid.HybridDecrypt)
579      return testing_api_pb2.CreationResponse()
580    except tink.TinkError as e:
581      return testing_api_pb2.CreationResponse(err=str(e))
582
583  def Encrypt(
584      self, request: testing_api_pb2.HybridEncryptRequest,
585      context: grpc.ServicerContext) -> testing_api_pb2.HybridEncryptResponse:
586    """Encrypts a message."""
587    try:
588      public_keyset_handle = cleartext_keyset_handle.read(
589          tink.BinaryKeysetReader(
590              request.public_annotated_keyset.serialized_keyset))
591      p = public_keyset_handle.primitive(hybrid.HybridEncrypt)
592      ciphertext = p.encrypt(request.plaintext, request.context_info)
593      return testing_api_pb2.HybridEncryptResponse(ciphertext=ciphertext)
594    except tink.TinkError as e:
595      return testing_api_pb2.HybridEncryptResponse(err=str(e))
596
597  def Decrypt(
598      self, request: testing_api_pb2.HybridDecryptRequest,
599      context: grpc.ServicerContext) -> testing_api_pb2.HybridDecryptResponse:
600    """Decrypts a message."""
601    try:
602      private_keyset_handle = cleartext_keyset_handle.read(
603          tink.BinaryKeysetReader(
604              request.private_annotated_keyset.serialized_keyset))
605      p = private_keyset_handle.primitive(hybrid.HybridDecrypt)
606      plaintext = p.decrypt(request.ciphertext, request.context_info)
607      return testing_api_pb2.HybridDecryptResponse(plaintext=plaintext)
608    except tink.TinkError as e:
609      return testing_api_pb2.HybridDecryptResponse(err=str(e))
610
611
612class SignatureServicer(testing_api_pb2_grpc.SignatureServicer):
613  """A service for testing signatures."""
614
615  def CreatePublicKeySign(
616      self, request: testing_api_pb2.CreationRequest,
617      context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse:
618    """Creates a PublicKeySign without using it."""
619    try:
620      keyset_handle = cleartext_keyset_handle.read(
621          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
622      keyset_handle.primitive(signature.PublicKeySign)
623      return testing_api_pb2.CreationResponse()
624    except tink.TinkError as e:
625      return testing_api_pb2.CreationResponse(err=str(e))
626
627  def CreatePublicKeyVerify(
628      self, request: testing_api_pb2.CreationRequest,
629      context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse:
630    """Creates a PublicKeyVerify without using it."""
631    try:
632      keyset_handle = cleartext_keyset_handle.read(
633          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
634      keyset_handle.primitive(signature.PublicKeyVerify)
635      return testing_api_pb2.CreationResponse()
636    except tink.TinkError as e:
637      return testing_api_pb2.CreationResponse(err=str(e))
638
639  def Sign(
640      self, request: testing_api_pb2.SignatureSignRequest,
641      context: grpc.ServicerContext) -> testing_api_pb2.SignatureSignResponse:
642    """Signs a message."""
643    try:
644      private_keyset_handle = cleartext_keyset_handle.read(
645          tink.BinaryKeysetReader(
646              request.private_annotated_keyset.serialized_keyset))
647      p = private_keyset_handle.primitive(signature.PublicKeySign)
648      signature_value = p.sign(request.data)
649      return testing_api_pb2.SignatureSignResponse(signature=signature_value)
650    except tink.TinkError as e:
651      return testing_api_pb2.SignatureSignResponse(err=str(e))
652
653  def Verify(
654      self, request: testing_api_pb2.SignatureVerifyRequest,
655      context: grpc.ServicerContext) -> testing_api_pb2.SignatureVerifyResponse:
656    """Verifies a signature."""
657    try:
658      public_keyset_handle = cleartext_keyset_handle.read(
659          tink.BinaryKeysetReader(
660              request.public_annotated_keyset.serialized_keyset))
661      p = public_keyset_handle.primitive(signature.PublicKeyVerify)
662      p.verify(request.signature, request.data)
663      return testing_api_pb2.SignatureVerifyResponse()
664    except tink.TinkError as e:
665      return testing_api_pb2.SignatureVerifyResponse(err=str(e))
666
667
668class PrfSetServicer(testing_api_pb2_grpc.PrfSetServicer):
669  """A service for testing PrfSet."""
670
671  def Create(self, request: testing_api_pb2.CreationRequest,
672             context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse:
673    """Creates a PrfSet without using it."""
674    try:
675      keyset_handle = cleartext_keyset_handle.read(
676          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
677      keyset_handle.primitive(prf.PrfSet)
678      return testing_api_pb2.CreationResponse()
679    except tink.TinkError as e:
680      return testing_api_pb2.CreationResponse(err=str(e))
681
682  def KeyIds(
683      self, request: testing_api_pb2.PrfSetKeyIdsRequest,
684      context: grpc.ServicerContext) -> testing_api_pb2.PrfSetKeyIdsResponse:
685    """Returns all key IDs and the primary key ID."""
686    try:
687      keyset_handle = cleartext_keyset_handle.read(
688          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
689      p = keyset_handle.primitive(prf.PrfSet)
690      prfs = p.all()
691      response = testing_api_pb2.PrfSetKeyIdsResponse()
692      response.output.primary_key_id = p.primary_id()
693      response.output.key_id.extend(prfs.keys())
694      return response
695    except tink.TinkError as e:
696      return testing_api_pb2.PrfSetKeyIdsResponse(err=str(e))
697
698  def Compute(
699      self, request: testing_api_pb2.PrfSetComputeRequest,
700      context: grpc.ServicerContext) -> testing_api_pb2.PrfSetComputeResponse:
701    """Computes the output of one PRF."""
702    try:
703      keyset_handle = cleartext_keyset_handle.read(
704          tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset))
705      f = keyset_handle.primitive(prf.PrfSet).all()[request.key_id]
706      return testing_api_pb2.PrfSetComputeResponse(
707          output=f.compute(request.input_data, request.output_length))
708    except tink.TinkError as e:
709      return testing_api_pb2.PrfSetComputeResponse(err=str(e))
710