# Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS-IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Testing service API implementations in Python.""" import io import grpc import tink from tink import aead from tink import cleartext_keyset_handle from tink import daead from tink import hybrid from tink import jwt from tink import mac from tink import prf from tink import signature from tink import streaming_aead from tink.proto import tink_pb2 from tink.testing import bytes_io from protos import testing_api_pb2 from protos import testing_api_pb2_grpc # All KeyTemplate (as Protobuf) defined in the Python API. _KEY_TEMPLATE = { 'AES128_EAX': aead.aead_key_templates.AES128_EAX, 'AES128_EAX_RAW': aead.aead_key_templates.AES128_EAX_RAW, 'AES256_EAX': aead.aead_key_templates.AES256_EAX, 'AES256_EAX_RAW': aead.aead_key_templates.AES256_EAX_RAW, 'AES128_GCM': aead.aead_key_templates.AES128_GCM, 'AES128_GCM_RAW': aead.aead_key_templates.AES128_GCM_RAW, 'AES256_GCM': aead.aead_key_templates.AES256_GCM, 'AES256_GCM_RAW': aead.aead_key_templates.AES256_GCM_RAW, 'AES128_GCM_SIV': aead.aead_key_templates.AES128_GCM_SIV, 'AES128_GCM_SIV_RAW': aead.aead_key_templates.AES128_GCM_SIV_RAW, 'AES256_GCM_SIV': aead.aead_key_templates.AES256_GCM_SIV, 'AES256_GCM_SIV_RAW': aead.aead_key_templates.AES256_GCM_SIV_RAW, 'AES128_CTR_HMAC_SHA256': aead.aead_key_templates.AES128_CTR_HMAC_SHA256, 'AES128_CTR_HMAC_SHA256_RAW': aead.aead_key_templates.AES128_CTR_HMAC_SHA256_RAW, 'AES256_CTR_HMAC_SHA256': aead.aead_key_templates.AES256_CTR_HMAC_SHA256, 'AES256_CTR_HMAC_SHA256_RAW': aead.aead_key_templates.AES256_CTR_HMAC_SHA256_RAW, 'XCHACHA20_POLY1305': aead.aead_key_templates.XCHACHA20_POLY1305, 'XCHACHA20_POLY1305_RAW': aead.aead_key_templates.XCHACHA20_POLY1305_RAW, 'AES256_SIV': daead.deterministic_aead_key_templates.AES256_SIV, 'AES128_CTR_HMAC_SHA256_4KB': streaming_aead.streaming_aead_key_templates.AES128_CTR_HMAC_SHA256_4KB, 'AES128_CTR_HMAC_SHA256_1MB': streaming_aead.streaming_aead_key_templates.AES128_CTR_HMAC_SHA256_1MB, 'AES256_CTR_HMAC_SHA256_4KB': streaming_aead.streaming_aead_key_templates.AES256_CTR_HMAC_SHA256_4KB, 'AES256_CTR_HMAC_SHA256_1MB': streaming_aead.streaming_aead_key_templates.AES256_CTR_HMAC_SHA256_1MB, 'AES128_GCM_HKDF_4KB': streaming_aead.streaming_aead_key_templates.AES128_GCM_HKDF_4KB, 'AES128_GCM_HKDF_1MB': streaming_aead.streaming_aead_key_templates.AES128_GCM_HKDF_1MB, 'AES256_GCM_HKDF_4KB': streaming_aead.streaming_aead_key_templates.AES256_GCM_HKDF_4KB, 'AES256_GCM_HKDF_1MB': streaming_aead.streaming_aead_key_templates.AES256_GCM_HKDF_1MB, 'ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM': hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM, 'ECIES_P256_COMPRESSED_HKDF_HMAC_SHA256_AES128_GCM': hybrid.hybrid_key_templates .ECIES_P256_COMPRESSED_HKDF_HMAC_SHA256_AES128_GCM, 'ECIES_P256_HKDF_HMAC_SHA256_AES128_CTR_HMAC_SHA256': hybrid.hybrid_key_templates .ECIES_P256_HKDF_HMAC_SHA256_AES128_CTR_HMAC_SHA256, 'ECIES_P256_COMPRESSED_HKDF_HMAC_SHA256_AES128_CTR_HMAC_SHA256': hybrid.hybrid_key_templates .ECIES_P256_COMPRESSED_HKDF_HMAC_SHA256_AES128_CTR_HMAC_SHA256, 'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_128_GCM': hybrid.hybrid_key_templates .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_128_GCM, 'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_128_GCM_RAW': hybrid.hybrid_key_templates .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_128_GCM_RAW, 'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_256_GCM': hybrid.hybrid_key_templates .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_256_GCM, 'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_256_GCM_RAW': hybrid.hybrid_key_templates .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_256_GCM_RAW, 'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_CHACHA20_POLY1305': hybrid.hybrid_key_templates .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_CHACHA20_POLY1305, 'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_CHACHA20_POLY1305_RAW': hybrid.hybrid_key_templates .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_CHACHA20_POLY1305_RAW, 'AES_CMAC': mac.mac_key_templates.AES_CMAC, 'HMAC_SHA256_128BITTAG': mac.mac_key_templates.HMAC_SHA256_128BITTAG, 'HMAC_SHA256_256BITTAG': mac.mac_key_templates.HMAC_SHA256_256BITTAG, 'HMAC_SHA512_256BITTAG': mac.mac_key_templates.HMAC_SHA512_256BITTAG, 'HMAC_SHA512_512BITTAG': mac.mac_key_templates.HMAC_SHA512_512BITTAG, 'ECDSA_P256': signature.signature_key_templates.ECDSA_P256, 'ECDSA_P256_RAW': signature.signature_key_templates.ECDSA_P256_RAW, 'ECDSA_P384': signature.signature_key_templates.ECDSA_P384, 'ECDSA_P384_SHA384': signature.signature_key_templates.ECDSA_P384_SHA384, 'ECDSA_P384_SHA512': signature.signature_key_templates.ECDSA_P384_SHA512, 'ECDSA_P521': signature.signature_key_templates.ECDSA_P521, 'ECDSA_P256_IEEE_P1363': signature.signature_key_templates.ECDSA_P256_IEEE_P1363, 'ECDSA_P384_IEEE_P1363': signature.signature_key_templates.ECDSA_P384_IEEE_P1363, 'ECDSA_P384_SHA384_IEEE_P1363': signature.signature_key_templates.ECDSA_P384_SHA384_IEEE_P1363, 'ECDSA_P521_IEEE_P1363': signature.signature_key_templates.ECDSA_P521_IEEE_P1363, 'ED25519': signature.signature_key_templates.ED25519, 'RSA_SSA_PKCS1_3072_SHA256_F4': signature.signature_key_templates.RSA_SSA_PKCS1_3072_SHA256_F4, 'RSA_SSA_PKCS1_4096_SHA512_F4': signature.signature_key_templates.RSA_SSA_PKCS1_4096_SHA512_F4, 'RSA_SSA_PSS_3072_SHA256_SHA256_32_F4': signature.signature_key_templates.RSA_SSA_PSS_3072_SHA256_SHA256_32_F4, 'RSA_SSA_PSS_4096_SHA512_SHA512_64_F4': signature.signature_key_templates.RSA_SSA_PSS_4096_SHA512_SHA512_64_F4, 'AES_CMAC_PRF': prf.prf_key_templates.AES_CMAC, 'HMAC_SHA256_PRF': prf.prf_key_templates.HMAC_SHA256, 'HMAC_SHA512_PRF': prf.prf_key_templates.HMAC_SHA512, 'HKDF_SHA256': prf.prf_key_templates.HKDF_SHA256, 'JWT_HS256': jwt.jwt_hs256_template(), 'JWT_HS256_RAW': jwt.raw_jwt_hs256_template(), 'JWT_HS384': jwt.jwt_hs384_template(), 'JWT_HS384_RAW': jwt.raw_jwt_hs384_template(), 'JWT_HS512': jwt.jwt_hs512_template(), 'JWT_HS512_RAW': jwt.raw_jwt_hs512_template(), 'JWT_ES256': jwt.jwt_es256_template(), 'JWT_ES256_RAW': jwt.raw_jwt_es256_template(), 'JWT_ES384': jwt.jwt_es384_template(), 'JWT_ES384_RAW': jwt.raw_jwt_es384_template(), 'JWT_ES512': jwt.jwt_es512_template(), 'JWT_ES512_RAW': jwt.raw_jwt_es512_template(), 'JWT_RS256_2048_F4': jwt.jwt_rs256_2048_f4_template(), 'JWT_RS256_2048_F4_RAW': jwt.raw_jwt_rs256_2048_f4_template(), 'JWT_RS256_3072_F4': jwt.jwt_rs256_3072_f4_template(), 'JWT_RS256_3072_F4_RAW': jwt.raw_jwt_rs256_3072_f4_template(), 'JWT_RS384_3072_F4': jwt.jwt_rs384_3072_f4_template(), 'JWT_RS384_3072_F4_RAW': jwt.raw_jwt_rs384_3072_f4_template(), 'JWT_RS512_4096_F4': jwt.jwt_rs512_4096_f4_template(), 'JWT_RS512_4096_F4_RAW': jwt.raw_jwt_rs512_4096_f4_template(), 'JWT_PS256_2048_F4': jwt.jwt_ps256_2048_f4_template(), 'JWT_PS256_2048_F4_RAW': jwt.raw_jwt_ps256_2048_f4_template(), 'JWT_PS256_3072_F4': jwt.jwt_ps256_3072_f4_template(), 'JWT_PS256_3072_F4_RAW': jwt.raw_jwt_ps256_3072_f4_template(), 'JWT_PS384_3072_F4': jwt.jwt_ps384_3072_f4_template(), 'JWT_PS384_3072_F4_RAW': jwt.raw_jwt_ps384_3072_f4_template(), 'JWT_PS512_4096_F4': jwt.jwt_ps512_4096_f4_template(), 'JWT_PS512_4096_F4_RAW': jwt.raw_jwt_ps512_4096_f4_template(), } class MetadataServicer(testing_api_pb2_grpc.MetadataServicer): """A service with metadata about the server.""" def GetServerInfo( self, request: testing_api_pb2.ServerInfoRequest, context: grpc.ServicerContext) -> testing_api_pb2.ServerInfoResponse: """Returns information about the server.""" return testing_api_pb2.ServerInfoResponse(language='python') class KeysetServicer(testing_api_pb2_grpc.KeysetServicer): """A service for testing Keyset operations.""" def GetTemplate( self, request: testing_api_pb2.KeysetTemplateRequest, context: grpc.ServicerContext) -> testing_api_pb2.KeysetTemplateResponse: """Returns the key template for the given template name.""" if request.template_name not in _KEY_TEMPLATE: return testing_api_pb2.KeysetTemplateResponse( err='template %s not found' % request.template_name) return testing_api_pb2.KeysetTemplateResponse( key_template=_KEY_TEMPLATE[request.template_name].SerializeToString()) def Generate( self, request: testing_api_pb2.KeysetGenerateRequest, context: grpc.ServicerContext) -> testing_api_pb2.KeysetGenerateResponse: """Generates a keyset.""" try: template = tink_pb2.KeyTemplate() template.ParseFromString(request.template) keyset_handle = tink.new_keyset_handle(template) keyset = io.BytesIO() cleartext_keyset_handle.write( tink.BinaryKeysetWriter(keyset), keyset_handle) return testing_api_pb2.KeysetGenerateResponse(keyset=keyset.getvalue()) except tink.TinkError as e: return testing_api_pb2.KeysetGenerateResponse(err=str(e)) def Public( self, request: testing_api_pb2.KeysetPublicRequest, context: grpc.ServicerContext) -> testing_api_pb2.KeysetPublicResponse: """Generates a public-key keyset from a private-key keyset.""" try: private_keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.private_keyset)) public_keyset_handle = private_keyset_handle.public_keyset_handle() public_keyset = io.BytesIO() cleartext_keyset_handle.write( tink.BinaryKeysetWriter(public_keyset), public_keyset_handle) return testing_api_pb2.KeysetPublicResponse( public_keyset=public_keyset.getvalue()) except tink.TinkError as e: return testing_api_pb2.KeysetPublicResponse(err=str(e)) def ToJson( self, request: testing_api_pb2.KeysetToJsonRequest, context: grpc.ServicerContext) -> testing_api_pb2.KeysetToJsonResponse: """Converts a keyset from binary to JSON format.""" try: keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.keyset)) json_keyset = io.StringIO() cleartext_keyset_handle.write( tink.JsonKeysetWriter(json_keyset), keyset_handle) return testing_api_pb2.KeysetToJsonResponse( json_keyset=json_keyset.getvalue()) except tink.TinkError as e: return testing_api_pb2.KeysetToJsonResponse(err=str(e)) def FromJson( self, request: testing_api_pb2.KeysetFromJsonRequest, context: grpc.ServicerContext) -> testing_api_pb2.KeysetFromJsonResponse: """Converts a keyset from JSON to binary format.""" try: keyset_handle = cleartext_keyset_handle.read( tink.JsonKeysetReader(request.json_keyset)) keyset = io.BytesIO() cleartext_keyset_handle.write( tink.BinaryKeysetWriter(keyset), keyset_handle) return testing_api_pb2.KeysetFromJsonResponse(keyset=keyset.getvalue()) except tink.TinkError as e: return testing_api_pb2.KeysetFromJsonResponse(err=str(e)) def ReadEncrypted( self, request: testing_api_pb2.KeysetReadEncryptedRequest, context: grpc.ServicerContext ) -> testing_api_pb2.KeysetReadEncryptedResponse: """Reads an encrypted keyset.""" try: master_keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.master_keyset)) master_aead = master_keyset_handle.primitive(aead.Aead) if request.keyset_reader_type == testing_api_pb2.KEYSET_READER_BINARY: reader = tink.BinaryKeysetReader(request.encrypted_keyset) elif request.keyset_reader_type == testing_api_pb2.KEYSET_READER_JSON: reader = tink.JsonKeysetReader(request.encrypted_keyset.decode('utf8')) else: raise ValueError('unknown keyset reader type') if request.HasField('associated_data'): keyset_handle = tink.read_keyset_handle_with_associated_data( reader, master_aead, request.associated_data.value) else: keyset_handle = tink.read_keyset_handle(reader, master_aead) keyset = io.BytesIO() cleartext_keyset_handle.write( tink.BinaryKeysetWriter(keyset), keyset_handle) return testing_api_pb2.KeysetReadEncryptedResponse( keyset=keyset.getvalue()) except tink.TinkError as e: return testing_api_pb2.KeysetReadEncryptedResponse(err=str(e)) def WriteEncrypted( self, request: testing_api_pb2.KeysetWriteEncryptedRequest, context: grpc.ServicerContext ) -> testing_api_pb2.KeysetWriteEncryptedResponse: """Writes an encrypted keyset.""" try: master_keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.master_keyset)) keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.keyset)) master_aead = master_keyset_handle.primitive(aead.Aead) if request.keyset_writer_type == testing_api_pb2.KEYSET_WRITER_BINARY: encrypted_keyset = io.BytesIO() writer = tink.BinaryKeysetWriter(encrypted_keyset) if request.HasField('associated_data'): keyset_handle.write_with_associated_data( writer, master_aead, request.associated_data.value) else: keyset_handle.write(writer, master_aead) return testing_api_pb2.KeysetWriteEncryptedResponse( encrypted_keyset=encrypted_keyset.getvalue()) elif request.keyset_writer_type == testing_api_pb2.KEYSET_WRITER_JSON: encrypted_keyset = io.StringIO() writer = tink.JsonKeysetWriter(encrypted_keyset) if request.HasField('associated_data'): keyset_handle.write_with_associated_data( writer, master_aead, request.associated_data.value) else: keyset_handle.write(writer, master_aead) return testing_api_pb2.KeysetWriteEncryptedResponse( encrypted_keyset=encrypted_keyset.getvalue().encode('utf8')) else: raise ValueError('unknown keyset writer type') except tink.TinkError as e: return testing_api_pb2.KeysetWriteEncryptedResponse(err=str(e)) class AeadServicer(testing_api_pb2_grpc.AeadServicer): """A service for testing AEAD encryption.""" def Create(self, request: testing_api_pb2.CreationRequest, context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: """Creates an AEAD without using it.""" try: keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) keyset_handle.primitive(aead.Aead) return testing_api_pb2.CreationResponse() except tink.TinkError as e: return testing_api_pb2.CreationResponse(err=str(e)) def Encrypt( self, request: testing_api_pb2.AeadEncryptRequest, context: grpc.ServicerContext) -> testing_api_pb2.AeadEncryptResponse: """Encrypts a message.""" keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) p = keyset_handle.primitive(aead.Aead) try: ciphertext = p.encrypt(request.plaintext, request.associated_data) return testing_api_pb2.AeadEncryptResponse(ciphertext=ciphertext) except tink.TinkError as e: return testing_api_pb2.AeadEncryptResponse(err=str(e)) def Decrypt( self, request: testing_api_pb2.AeadDecryptRequest, context: grpc.ServicerContext) -> testing_api_pb2.AeadDecryptResponse: """Decrypts a message.""" keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) p = keyset_handle.primitive(aead.Aead) try: plaintext = p.decrypt(request.ciphertext, request.associated_data) return testing_api_pb2.AeadDecryptResponse(plaintext=plaintext) except tink.TinkError as e: return testing_api_pb2.AeadDecryptResponse(err=str(e)) class StreamingAeadServicer(testing_api_pb2_grpc.StreamingAeadServicer): """A service for testing StreamingAEAD encryption.""" def Create(self, request: testing_api_pb2.CreationRequest, context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: """Creates a Streaming Aead without using it.""" try: keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) keyset_handle.primitive(streaming_aead.StreamingAead) return testing_api_pb2.CreationResponse() except tink.TinkError as e: return testing_api_pb2.CreationResponse(err=str(e)) def Encrypt( self, request: testing_api_pb2.StreamingAeadEncryptRequest, context: grpc.ServicerContext ) -> testing_api_pb2.StreamingAeadEncryptResponse: """Encrypts a message.""" try: keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) p = keyset_handle.primitive(streaming_aead.StreamingAead) ciphertext_destination = bytes_io.BytesIOWithValueAfterClose() with p.new_encrypting_stream(ciphertext_destination, request.associated_data) as plaintext_stream: plaintext_stream.write(request.plaintext) return testing_api_pb2.StreamingAeadEncryptResponse( ciphertext=ciphertext_destination.value_after_close()) except tink.TinkError as e: return testing_api_pb2.StreamingAeadEncryptResponse(err=str(e)) def Decrypt( self, request: testing_api_pb2.StreamingAeadDecryptRequest, context: grpc.ServicerContext ) -> testing_api_pb2.StreamingAeadDecryptResponse: """Decrypts a message.""" try: keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) p = keyset_handle.primitive(streaming_aead.StreamingAead) stream = io.BytesIO(request.ciphertext) with p.new_decrypting_stream(stream, request.associated_data) as s: plaintext = s.read() return testing_api_pb2.StreamingAeadDecryptResponse(plaintext=plaintext) except tink.TinkError as e: return testing_api_pb2.StreamingAeadDecryptResponse(err=str(e)) class DeterministicAeadServicer(testing_api_pb2_grpc.DeterministicAeadServicer): """A service for testing Deterministic AEAD encryption.""" def Create(self, request: testing_api_pb2.CreationRequest, context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: """Creates a Deterministic AEAD without using it.""" try: keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) keyset_handle.primitive(daead.DeterministicAead) return testing_api_pb2.CreationResponse() except tink.TinkError as e: return testing_api_pb2.CreationResponse(err=str(e)) def EncryptDeterministically( self, request: testing_api_pb2.DeterministicAeadEncryptRequest, context: grpc.ServicerContext ) -> testing_api_pb2.DeterministicAeadEncryptResponse: """Encrypts a message.""" keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) p = keyset_handle.primitive(daead.DeterministicAead) try: ciphertext = p.encrypt_deterministically(request.plaintext, request.associated_data) return testing_api_pb2.DeterministicAeadEncryptResponse( ciphertext=ciphertext) except tink.TinkError as e: return testing_api_pb2.DeterministicAeadEncryptResponse(err=str(e)) def DecryptDeterministically( self, request: testing_api_pb2.DeterministicAeadDecryptRequest, context: grpc.ServicerContext ) -> testing_api_pb2.DeterministicAeadDecryptResponse: """Decrypts a message.""" keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) p = keyset_handle.primitive(daead.DeterministicAead) try: plaintext = p.decrypt_deterministically(request.ciphertext, request.associated_data) return testing_api_pb2.DeterministicAeadDecryptResponse( plaintext=plaintext) except tink.TinkError as e: return testing_api_pb2.DeterministicAeadDecryptResponse(err=str(e)) class MacServicer(testing_api_pb2_grpc.MacServicer): """A service for testing MACs.""" def Create(self, request: testing_api_pb2.CreationRequest, context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: """Creates a MAC without using it.""" try: keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) keyset_handle.primitive(mac.Mac) return testing_api_pb2.CreationResponse() except tink.TinkError as e: return testing_api_pb2.CreationResponse(err=str(e)) def ComputeMac( self, request: testing_api_pb2.ComputeMacRequest, context: grpc.ServicerContext) -> testing_api_pb2.ComputeMacResponse: """Computes a MAC.""" try: keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) p = keyset_handle.primitive(mac.Mac) mac_value = p.compute_mac(request.data) return testing_api_pb2.ComputeMacResponse(mac_value=mac_value) except tink.TinkError as e: return testing_api_pb2.ComputeMacResponse(err=str(e)) def VerifyMac( self, request: testing_api_pb2.VerifyMacRequest, context: grpc.ServicerContext) -> testing_api_pb2.VerifyMacResponse: """Verifies a MAC value.""" try: keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) p = keyset_handle.primitive(mac.Mac) p.verify_mac(request.mac_value, request.data) return testing_api_pb2.VerifyMacResponse() except tink.TinkError as e: return testing_api_pb2.VerifyMacResponse(err=str(e)) class HybridServicer(testing_api_pb2_grpc.HybridServicer): """A service for testing hybrid encryption and decryption.""" def CreateHybridEncrypt( self, request: testing_api_pb2.CreationRequest, context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: """Creates a HybridEncrypt without using it.""" try: keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) keyset_handle.primitive(hybrid.HybridEncrypt) return testing_api_pb2.CreationResponse() except tink.TinkError as e: return testing_api_pb2.CreationResponse(err=str(e)) def CreateHybridDecrypt( self, request: testing_api_pb2.CreationRequest, context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: """Creates a HybridDecrypt without using it.""" try: keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) keyset_handle.primitive(hybrid.HybridDecrypt) return testing_api_pb2.CreationResponse() except tink.TinkError as e: return testing_api_pb2.CreationResponse(err=str(e)) def Encrypt( self, request: testing_api_pb2.HybridEncryptRequest, context: grpc.ServicerContext) -> testing_api_pb2.HybridEncryptResponse: """Encrypts a message.""" try: public_keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader( request.public_annotated_keyset.serialized_keyset)) p = public_keyset_handle.primitive(hybrid.HybridEncrypt) ciphertext = p.encrypt(request.plaintext, request.context_info) return testing_api_pb2.HybridEncryptResponse(ciphertext=ciphertext) except tink.TinkError as e: return testing_api_pb2.HybridEncryptResponse(err=str(e)) def Decrypt( self, request: testing_api_pb2.HybridDecryptRequest, context: grpc.ServicerContext) -> testing_api_pb2.HybridDecryptResponse: """Decrypts a message.""" try: private_keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader( request.private_annotated_keyset.serialized_keyset)) p = private_keyset_handle.primitive(hybrid.HybridDecrypt) plaintext = p.decrypt(request.ciphertext, request.context_info) return testing_api_pb2.HybridDecryptResponse(plaintext=plaintext) except tink.TinkError as e: return testing_api_pb2.HybridDecryptResponse(err=str(e)) class SignatureServicer(testing_api_pb2_grpc.SignatureServicer): """A service for testing signatures.""" def CreatePublicKeySign( self, request: testing_api_pb2.CreationRequest, context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: """Creates a PublicKeySign without using it.""" try: keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) keyset_handle.primitive(signature.PublicKeySign) return testing_api_pb2.CreationResponse() except tink.TinkError as e: return testing_api_pb2.CreationResponse(err=str(e)) def CreatePublicKeyVerify( self, request: testing_api_pb2.CreationRequest, context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: """Creates a PublicKeyVerify without using it.""" try: keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) keyset_handle.primitive(signature.PublicKeyVerify) return testing_api_pb2.CreationResponse() except tink.TinkError as e: return testing_api_pb2.CreationResponse(err=str(e)) def Sign( self, request: testing_api_pb2.SignatureSignRequest, context: grpc.ServicerContext) -> testing_api_pb2.SignatureSignResponse: """Signs a message.""" try: private_keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader( request.private_annotated_keyset.serialized_keyset)) p = private_keyset_handle.primitive(signature.PublicKeySign) signature_value = p.sign(request.data) return testing_api_pb2.SignatureSignResponse(signature=signature_value) except tink.TinkError as e: return testing_api_pb2.SignatureSignResponse(err=str(e)) def Verify( self, request: testing_api_pb2.SignatureVerifyRequest, context: grpc.ServicerContext) -> testing_api_pb2.SignatureVerifyResponse: """Verifies a signature.""" try: public_keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader( request.public_annotated_keyset.serialized_keyset)) p = public_keyset_handle.primitive(signature.PublicKeyVerify) p.verify(request.signature, request.data) return testing_api_pb2.SignatureVerifyResponse() except tink.TinkError as e: return testing_api_pb2.SignatureVerifyResponse(err=str(e)) class PrfSetServicer(testing_api_pb2_grpc.PrfSetServicer): """A service for testing PrfSet.""" def Create(self, request: testing_api_pb2.CreationRequest, context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: """Creates a PrfSet without using it.""" try: keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) keyset_handle.primitive(prf.PrfSet) return testing_api_pb2.CreationResponse() except tink.TinkError as e: return testing_api_pb2.CreationResponse(err=str(e)) def KeyIds( self, request: testing_api_pb2.PrfSetKeyIdsRequest, context: grpc.ServicerContext) -> testing_api_pb2.PrfSetKeyIdsResponse: """Returns all key IDs and the primary key ID.""" try: keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) p = keyset_handle.primitive(prf.PrfSet) prfs = p.all() response = testing_api_pb2.PrfSetKeyIdsResponse() response.output.primary_key_id = p.primary_id() response.output.key_id.extend(prfs.keys()) return response except tink.TinkError as e: return testing_api_pb2.PrfSetKeyIdsResponse(err=str(e)) def Compute( self, request: testing_api_pb2.PrfSetComputeRequest, context: grpc.ServicerContext) -> testing_api_pb2.PrfSetComputeResponse: """Computes the output of one PRF.""" try: keyset_handle = cleartext_keyset_handle.read( tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) f = keyset_handle.primitive(prf.PrfSet).all()[request.key_id] return testing_api_pb2.PrfSetComputeResponse( output=f.compute(request.input_data, request.output_length)) except tink.TinkError as e: return testing_api_pb2.PrfSetComputeResponse(err=str(e))