1# Copyright 2020 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS-IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Testing service API implementations in Python.""" 15 16import io 17 18import grpc 19import tink 20from tink import aead 21from tink import cleartext_keyset_handle 22from tink import daead 23from tink import hybrid 24from tink import jwt 25from tink import mac 26from tink import prf 27from tink import signature 28from tink import streaming_aead 29from tink.proto import tink_pb2 30from tink.testing import bytes_io 31from protos import testing_api_pb2 32from protos import testing_api_pb2_grpc 33 34 35# All KeyTemplate (as Protobuf) defined in the Python API. 36_KEY_TEMPLATE = { 37 'AES128_EAX': 38 aead.aead_key_templates.AES128_EAX, 39 'AES128_EAX_RAW': 40 aead.aead_key_templates.AES128_EAX_RAW, 41 'AES256_EAX': 42 aead.aead_key_templates.AES256_EAX, 43 'AES256_EAX_RAW': 44 aead.aead_key_templates.AES256_EAX_RAW, 45 'AES128_GCM': 46 aead.aead_key_templates.AES128_GCM, 47 'AES128_GCM_RAW': 48 aead.aead_key_templates.AES128_GCM_RAW, 49 'AES256_GCM': 50 aead.aead_key_templates.AES256_GCM, 51 'AES256_GCM_RAW': 52 aead.aead_key_templates.AES256_GCM_RAW, 53 'AES128_GCM_SIV': 54 aead.aead_key_templates.AES128_GCM_SIV, 55 'AES128_GCM_SIV_RAW': 56 aead.aead_key_templates.AES128_GCM_SIV_RAW, 57 'AES256_GCM_SIV': 58 aead.aead_key_templates.AES256_GCM_SIV, 59 'AES256_GCM_SIV_RAW': 60 aead.aead_key_templates.AES256_GCM_SIV_RAW, 61 'AES128_CTR_HMAC_SHA256': 62 aead.aead_key_templates.AES128_CTR_HMAC_SHA256, 63 'AES128_CTR_HMAC_SHA256_RAW': 64 aead.aead_key_templates.AES128_CTR_HMAC_SHA256_RAW, 65 'AES256_CTR_HMAC_SHA256': 66 aead.aead_key_templates.AES256_CTR_HMAC_SHA256, 67 'AES256_CTR_HMAC_SHA256_RAW': 68 aead.aead_key_templates.AES256_CTR_HMAC_SHA256_RAW, 69 'XCHACHA20_POLY1305': 70 aead.aead_key_templates.XCHACHA20_POLY1305, 71 'XCHACHA20_POLY1305_RAW': 72 aead.aead_key_templates.XCHACHA20_POLY1305_RAW, 73 'AES256_SIV': 74 daead.deterministic_aead_key_templates.AES256_SIV, 75 'AES128_CTR_HMAC_SHA256_4KB': 76 streaming_aead.streaming_aead_key_templates.AES128_CTR_HMAC_SHA256_4KB, 77 'AES128_CTR_HMAC_SHA256_1MB': 78 streaming_aead.streaming_aead_key_templates.AES128_CTR_HMAC_SHA256_1MB, 79 'AES256_CTR_HMAC_SHA256_4KB': 80 streaming_aead.streaming_aead_key_templates.AES256_CTR_HMAC_SHA256_4KB, 81 'AES256_CTR_HMAC_SHA256_1MB': 82 streaming_aead.streaming_aead_key_templates.AES256_CTR_HMAC_SHA256_1MB, 83 'AES128_GCM_HKDF_4KB': 84 streaming_aead.streaming_aead_key_templates.AES128_GCM_HKDF_4KB, 85 'AES128_GCM_HKDF_1MB': 86 streaming_aead.streaming_aead_key_templates.AES128_GCM_HKDF_1MB, 87 'AES256_GCM_HKDF_4KB': 88 streaming_aead.streaming_aead_key_templates.AES256_GCM_HKDF_4KB, 89 'AES256_GCM_HKDF_1MB': 90 streaming_aead.streaming_aead_key_templates.AES256_GCM_HKDF_1MB, 91 'ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM': 92 hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM, 93 'ECIES_P256_COMPRESSED_HKDF_HMAC_SHA256_AES128_GCM': 94 hybrid.hybrid_key_templates 95 .ECIES_P256_COMPRESSED_HKDF_HMAC_SHA256_AES128_GCM, 96 'ECIES_P256_HKDF_HMAC_SHA256_AES128_CTR_HMAC_SHA256': 97 hybrid.hybrid_key_templates 98 .ECIES_P256_HKDF_HMAC_SHA256_AES128_CTR_HMAC_SHA256, 99 'ECIES_P256_COMPRESSED_HKDF_HMAC_SHA256_AES128_CTR_HMAC_SHA256': 100 hybrid.hybrid_key_templates 101 .ECIES_P256_COMPRESSED_HKDF_HMAC_SHA256_AES128_CTR_HMAC_SHA256, 102 'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_128_GCM': 103 hybrid.hybrid_key_templates 104 .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_128_GCM, 105 'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_128_GCM_RAW': 106 hybrid.hybrid_key_templates 107 .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_128_GCM_RAW, 108 'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_256_GCM': 109 hybrid.hybrid_key_templates 110 .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_256_GCM, 111 'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_256_GCM_RAW': 112 hybrid.hybrid_key_templates 113 .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_AES_256_GCM_RAW, 114 'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_CHACHA20_POLY1305': 115 hybrid.hybrid_key_templates 116 .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_CHACHA20_POLY1305, 117 'DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_CHACHA20_POLY1305_RAW': 118 hybrid.hybrid_key_templates 119 .DHKEM_X25519_HKDF_SHA256_HKDF_SHA256_CHACHA20_POLY1305_RAW, 120 'AES_CMAC': 121 mac.mac_key_templates.AES_CMAC, 122 'HMAC_SHA256_128BITTAG': 123 mac.mac_key_templates.HMAC_SHA256_128BITTAG, 124 'HMAC_SHA256_256BITTAG': 125 mac.mac_key_templates.HMAC_SHA256_256BITTAG, 126 'HMAC_SHA512_256BITTAG': 127 mac.mac_key_templates.HMAC_SHA512_256BITTAG, 128 'HMAC_SHA512_512BITTAG': 129 mac.mac_key_templates.HMAC_SHA512_512BITTAG, 130 'ECDSA_P256': 131 signature.signature_key_templates.ECDSA_P256, 132 'ECDSA_P256_RAW': 133 signature.signature_key_templates.ECDSA_P256_RAW, 134 'ECDSA_P384': 135 signature.signature_key_templates.ECDSA_P384, 136 'ECDSA_P384_SHA384': 137 signature.signature_key_templates.ECDSA_P384_SHA384, 138 'ECDSA_P384_SHA512': 139 signature.signature_key_templates.ECDSA_P384_SHA512, 140 'ECDSA_P521': 141 signature.signature_key_templates.ECDSA_P521, 142 'ECDSA_P256_IEEE_P1363': 143 signature.signature_key_templates.ECDSA_P256_IEEE_P1363, 144 'ECDSA_P384_IEEE_P1363': 145 signature.signature_key_templates.ECDSA_P384_IEEE_P1363, 146 'ECDSA_P384_SHA384_IEEE_P1363': 147 signature.signature_key_templates.ECDSA_P384_SHA384_IEEE_P1363, 148 'ECDSA_P521_IEEE_P1363': 149 signature.signature_key_templates.ECDSA_P521_IEEE_P1363, 150 'ED25519': 151 signature.signature_key_templates.ED25519, 152 'RSA_SSA_PKCS1_3072_SHA256_F4': 153 signature.signature_key_templates.RSA_SSA_PKCS1_3072_SHA256_F4, 154 'RSA_SSA_PKCS1_4096_SHA512_F4': 155 signature.signature_key_templates.RSA_SSA_PKCS1_4096_SHA512_F4, 156 'RSA_SSA_PSS_3072_SHA256_SHA256_32_F4': 157 signature.signature_key_templates.RSA_SSA_PSS_3072_SHA256_SHA256_32_F4, 158 'RSA_SSA_PSS_4096_SHA512_SHA512_64_F4': 159 signature.signature_key_templates.RSA_SSA_PSS_4096_SHA512_SHA512_64_F4, 160 'AES_CMAC_PRF': 161 prf.prf_key_templates.AES_CMAC, 162 'HMAC_SHA256_PRF': 163 prf.prf_key_templates.HMAC_SHA256, 164 'HMAC_SHA512_PRF': 165 prf.prf_key_templates.HMAC_SHA512, 166 'HKDF_SHA256': 167 prf.prf_key_templates.HKDF_SHA256, 168 'JWT_HS256': 169 jwt.jwt_hs256_template(), 170 'JWT_HS256_RAW': 171 jwt.raw_jwt_hs256_template(), 172 'JWT_HS384': 173 jwt.jwt_hs384_template(), 174 'JWT_HS384_RAW': 175 jwt.raw_jwt_hs384_template(), 176 'JWT_HS512': 177 jwt.jwt_hs512_template(), 178 'JWT_HS512_RAW': 179 jwt.raw_jwt_hs512_template(), 180 'JWT_ES256': 181 jwt.jwt_es256_template(), 182 'JWT_ES256_RAW': 183 jwt.raw_jwt_es256_template(), 184 'JWT_ES384': 185 jwt.jwt_es384_template(), 186 'JWT_ES384_RAW': 187 jwt.raw_jwt_es384_template(), 188 'JWT_ES512': 189 jwt.jwt_es512_template(), 190 'JWT_ES512_RAW': 191 jwt.raw_jwt_es512_template(), 192 'JWT_RS256_2048_F4': 193 jwt.jwt_rs256_2048_f4_template(), 194 'JWT_RS256_2048_F4_RAW': 195 jwt.raw_jwt_rs256_2048_f4_template(), 196 'JWT_RS256_3072_F4': 197 jwt.jwt_rs256_3072_f4_template(), 198 'JWT_RS256_3072_F4_RAW': 199 jwt.raw_jwt_rs256_3072_f4_template(), 200 'JWT_RS384_3072_F4': 201 jwt.jwt_rs384_3072_f4_template(), 202 'JWT_RS384_3072_F4_RAW': 203 jwt.raw_jwt_rs384_3072_f4_template(), 204 'JWT_RS512_4096_F4': 205 jwt.jwt_rs512_4096_f4_template(), 206 'JWT_RS512_4096_F4_RAW': 207 jwt.raw_jwt_rs512_4096_f4_template(), 208 'JWT_PS256_2048_F4': 209 jwt.jwt_ps256_2048_f4_template(), 210 'JWT_PS256_2048_F4_RAW': 211 jwt.raw_jwt_ps256_2048_f4_template(), 212 'JWT_PS256_3072_F4': 213 jwt.jwt_ps256_3072_f4_template(), 214 'JWT_PS256_3072_F4_RAW': 215 jwt.raw_jwt_ps256_3072_f4_template(), 216 'JWT_PS384_3072_F4': 217 jwt.jwt_ps384_3072_f4_template(), 218 'JWT_PS384_3072_F4_RAW': 219 jwt.raw_jwt_ps384_3072_f4_template(), 220 'JWT_PS512_4096_F4': 221 jwt.jwt_ps512_4096_f4_template(), 222 'JWT_PS512_4096_F4_RAW': 223 jwt.raw_jwt_ps512_4096_f4_template(), 224} 225 226 227class MetadataServicer(testing_api_pb2_grpc.MetadataServicer): 228 """A service with metadata about the server.""" 229 230 def GetServerInfo( 231 self, request: testing_api_pb2.ServerInfoRequest, 232 context: grpc.ServicerContext) -> testing_api_pb2.ServerInfoResponse: 233 """Returns information about the server.""" 234 return testing_api_pb2.ServerInfoResponse(language='python') 235 236 237class KeysetServicer(testing_api_pb2_grpc.KeysetServicer): 238 """A service for testing Keyset operations.""" 239 240 def GetTemplate( 241 self, request: testing_api_pb2.KeysetTemplateRequest, 242 context: grpc.ServicerContext) -> testing_api_pb2.KeysetTemplateResponse: 243 """Returns the key template for the given template name.""" 244 if request.template_name not in _KEY_TEMPLATE: 245 return testing_api_pb2.KeysetTemplateResponse( 246 err='template %s not found' % request.template_name) 247 return testing_api_pb2.KeysetTemplateResponse( 248 key_template=_KEY_TEMPLATE[request.template_name].SerializeToString()) 249 250 def Generate( 251 self, request: testing_api_pb2.KeysetGenerateRequest, 252 context: grpc.ServicerContext) -> testing_api_pb2.KeysetGenerateResponse: 253 """Generates a keyset.""" 254 try: 255 template = tink_pb2.KeyTemplate() 256 template.ParseFromString(request.template) 257 keyset_handle = tink.new_keyset_handle(template) 258 keyset = io.BytesIO() 259 cleartext_keyset_handle.write( 260 tink.BinaryKeysetWriter(keyset), keyset_handle) 261 return testing_api_pb2.KeysetGenerateResponse(keyset=keyset.getvalue()) 262 except tink.TinkError as e: 263 return testing_api_pb2.KeysetGenerateResponse(err=str(e)) 264 265 def Public( 266 self, request: testing_api_pb2.KeysetPublicRequest, 267 context: grpc.ServicerContext) -> testing_api_pb2.KeysetPublicResponse: 268 """Generates a public-key keyset from a private-key keyset.""" 269 try: 270 private_keyset_handle = cleartext_keyset_handle.read( 271 tink.BinaryKeysetReader(request.private_keyset)) 272 public_keyset_handle = private_keyset_handle.public_keyset_handle() 273 public_keyset = io.BytesIO() 274 cleartext_keyset_handle.write( 275 tink.BinaryKeysetWriter(public_keyset), public_keyset_handle) 276 return testing_api_pb2.KeysetPublicResponse( 277 public_keyset=public_keyset.getvalue()) 278 except tink.TinkError as e: 279 return testing_api_pb2.KeysetPublicResponse(err=str(e)) 280 281 def ToJson( 282 self, request: testing_api_pb2.KeysetToJsonRequest, 283 context: grpc.ServicerContext) -> testing_api_pb2.KeysetToJsonResponse: 284 """Converts a keyset from binary to JSON format.""" 285 try: 286 keyset_handle = cleartext_keyset_handle.read( 287 tink.BinaryKeysetReader(request.keyset)) 288 json_keyset = io.StringIO() 289 cleartext_keyset_handle.write( 290 tink.JsonKeysetWriter(json_keyset), keyset_handle) 291 return testing_api_pb2.KeysetToJsonResponse( 292 json_keyset=json_keyset.getvalue()) 293 except tink.TinkError as e: 294 return testing_api_pb2.KeysetToJsonResponse(err=str(e)) 295 296 def FromJson( 297 self, request: testing_api_pb2.KeysetFromJsonRequest, 298 context: grpc.ServicerContext) -> testing_api_pb2.KeysetFromJsonResponse: 299 """Converts a keyset from JSON to binary format.""" 300 try: 301 keyset_handle = cleartext_keyset_handle.read( 302 tink.JsonKeysetReader(request.json_keyset)) 303 keyset = io.BytesIO() 304 cleartext_keyset_handle.write( 305 tink.BinaryKeysetWriter(keyset), keyset_handle) 306 return testing_api_pb2.KeysetFromJsonResponse(keyset=keyset.getvalue()) 307 except tink.TinkError as e: 308 return testing_api_pb2.KeysetFromJsonResponse(err=str(e)) 309 310 def ReadEncrypted( 311 self, request: testing_api_pb2.KeysetReadEncryptedRequest, 312 context: grpc.ServicerContext 313 ) -> testing_api_pb2.KeysetReadEncryptedResponse: 314 """Reads an encrypted keyset.""" 315 try: 316 master_keyset_handle = cleartext_keyset_handle.read( 317 tink.BinaryKeysetReader(request.master_keyset)) 318 master_aead = master_keyset_handle.primitive(aead.Aead) 319 320 if request.keyset_reader_type == testing_api_pb2.KEYSET_READER_BINARY: 321 reader = tink.BinaryKeysetReader(request.encrypted_keyset) 322 elif request.keyset_reader_type == testing_api_pb2.KEYSET_READER_JSON: 323 reader = tink.JsonKeysetReader(request.encrypted_keyset.decode('utf8')) 324 else: 325 raise ValueError('unknown keyset reader type') 326 if request.HasField('associated_data'): 327 keyset_handle = tink.read_keyset_handle_with_associated_data( 328 reader, master_aead, request.associated_data.value) 329 else: 330 keyset_handle = tink.read_keyset_handle(reader, master_aead) 331 332 keyset = io.BytesIO() 333 cleartext_keyset_handle.write( 334 tink.BinaryKeysetWriter(keyset), keyset_handle) 335 return testing_api_pb2.KeysetReadEncryptedResponse( 336 keyset=keyset.getvalue()) 337 except tink.TinkError as e: 338 return testing_api_pb2.KeysetReadEncryptedResponse(err=str(e)) 339 340 def WriteEncrypted( 341 self, request: testing_api_pb2.KeysetWriteEncryptedRequest, 342 context: grpc.ServicerContext 343 ) -> testing_api_pb2.KeysetWriteEncryptedResponse: 344 """Writes an encrypted keyset.""" 345 try: 346 master_keyset_handle = cleartext_keyset_handle.read( 347 tink.BinaryKeysetReader(request.master_keyset)) 348 keyset_handle = cleartext_keyset_handle.read( 349 tink.BinaryKeysetReader(request.keyset)) 350 master_aead = master_keyset_handle.primitive(aead.Aead) 351 352 if request.keyset_writer_type == testing_api_pb2.KEYSET_WRITER_BINARY: 353 encrypted_keyset = io.BytesIO() 354 writer = tink.BinaryKeysetWriter(encrypted_keyset) 355 if request.HasField('associated_data'): 356 keyset_handle.write_with_associated_data( 357 writer, master_aead, request.associated_data.value) 358 else: 359 keyset_handle.write(writer, master_aead) 360 return testing_api_pb2.KeysetWriteEncryptedResponse( 361 encrypted_keyset=encrypted_keyset.getvalue()) 362 elif request.keyset_writer_type == testing_api_pb2.KEYSET_WRITER_JSON: 363 encrypted_keyset = io.StringIO() 364 writer = tink.JsonKeysetWriter(encrypted_keyset) 365 if request.HasField('associated_data'): 366 keyset_handle.write_with_associated_data( 367 writer, master_aead, request.associated_data.value) 368 else: 369 keyset_handle.write(writer, master_aead) 370 return testing_api_pb2.KeysetWriteEncryptedResponse( 371 encrypted_keyset=encrypted_keyset.getvalue().encode('utf8')) 372 else: 373 raise ValueError('unknown keyset writer type') 374 except tink.TinkError as e: 375 return testing_api_pb2.KeysetWriteEncryptedResponse(err=str(e)) 376 377 378class AeadServicer(testing_api_pb2_grpc.AeadServicer): 379 """A service for testing AEAD encryption.""" 380 381 def Create(self, request: testing_api_pb2.CreationRequest, 382 context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: 383 """Creates an AEAD without using it.""" 384 try: 385 keyset_handle = cleartext_keyset_handle.read( 386 tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) 387 keyset_handle.primitive(aead.Aead) 388 return testing_api_pb2.CreationResponse() 389 except tink.TinkError as e: 390 return testing_api_pb2.CreationResponse(err=str(e)) 391 392 def Encrypt( 393 self, request: testing_api_pb2.AeadEncryptRequest, 394 context: grpc.ServicerContext) -> testing_api_pb2.AeadEncryptResponse: 395 """Encrypts a message.""" 396 keyset_handle = cleartext_keyset_handle.read( 397 tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) 398 p = keyset_handle.primitive(aead.Aead) 399 try: 400 ciphertext = p.encrypt(request.plaintext, request.associated_data) 401 return testing_api_pb2.AeadEncryptResponse(ciphertext=ciphertext) 402 except tink.TinkError as e: 403 return testing_api_pb2.AeadEncryptResponse(err=str(e)) 404 405 def Decrypt( 406 self, request: testing_api_pb2.AeadDecryptRequest, 407 context: grpc.ServicerContext) -> testing_api_pb2.AeadDecryptResponse: 408 """Decrypts a message.""" 409 keyset_handle = cleartext_keyset_handle.read( 410 tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) 411 p = keyset_handle.primitive(aead.Aead) 412 try: 413 plaintext = p.decrypt(request.ciphertext, request.associated_data) 414 return testing_api_pb2.AeadDecryptResponse(plaintext=plaintext) 415 except tink.TinkError as e: 416 return testing_api_pb2.AeadDecryptResponse(err=str(e)) 417 418 419class StreamingAeadServicer(testing_api_pb2_grpc.StreamingAeadServicer): 420 """A service for testing StreamingAEAD encryption.""" 421 422 def Create(self, request: testing_api_pb2.CreationRequest, 423 context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: 424 """Creates a Streaming Aead without using it.""" 425 try: 426 keyset_handle = cleartext_keyset_handle.read( 427 tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) 428 keyset_handle.primitive(streaming_aead.StreamingAead) 429 return testing_api_pb2.CreationResponse() 430 except tink.TinkError as e: 431 return testing_api_pb2.CreationResponse(err=str(e)) 432 433 def Encrypt( 434 self, request: testing_api_pb2.StreamingAeadEncryptRequest, 435 context: grpc.ServicerContext 436 ) -> testing_api_pb2.StreamingAeadEncryptResponse: 437 """Encrypts a message.""" 438 try: 439 keyset_handle = cleartext_keyset_handle.read( 440 tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) 441 p = keyset_handle.primitive(streaming_aead.StreamingAead) 442 ciphertext_destination = bytes_io.BytesIOWithValueAfterClose() 443 with p.new_encrypting_stream(ciphertext_destination, 444 request.associated_data) as plaintext_stream: 445 plaintext_stream.write(request.plaintext) 446 return testing_api_pb2.StreamingAeadEncryptResponse( 447 ciphertext=ciphertext_destination.value_after_close()) 448 except tink.TinkError as e: 449 return testing_api_pb2.StreamingAeadEncryptResponse(err=str(e)) 450 451 def Decrypt( 452 self, request: testing_api_pb2.StreamingAeadDecryptRequest, 453 context: grpc.ServicerContext 454 ) -> testing_api_pb2.StreamingAeadDecryptResponse: 455 """Decrypts a message.""" 456 try: 457 keyset_handle = cleartext_keyset_handle.read( 458 tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) 459 p = keyset_handle.primitive(streaming_aead.StreamingAead) 460 stream = io.BytesIO(request.ciphertext) 461 with p.new_decrypting_stream(stream, request.associated_data) as s: 462 plaintext = s.read() 463 return testing_api_pb2.StreamingAeadDecryptResponse(plaintext=plaintext) 464 except tink.TinkError as e: 465 return testing_api_pb2.StreamingAeadDecryptResponse(err=str(e)) 466 467 468class DeterministicAeadServicer(testing_api_pb2_grpc.DeterministicAeadServicer): 469 """A service for testing Deterministic AEAD encryption.""" 470 471 def Create(self, request: testing_api_pb2.CreationRequest, 472 context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: 473 """Creates a Deterministic AEAD without using it.""" 474 try: 475 keyset_handle = cleartext_keyset_handle.read( 476 tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) 477 keyset_handle.primitive(daead.DeterministicAead) 478 return testing_api_pb2.CreationResponse() 479 except tink.TinkError as e: 480 return testing_api_pb2.CreationResponse(err=str(e)) 481 482 def EncryptDeterministically( 483 self, request: testing_api_pb2.DeterministicAeadEncryptRequest, 484 context: grpc.ServicerContext 485 ) -> testing_api_pb2.DeterministicAeadEncryptResponse: 486 """Encrypts a message.""" 487 keyset_handle = cleartext_keyset_handle.read( 488 tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) 489 p = keyset_handle.primitive(daead.DeterministicAead) 490 try: 491 ciphertext = p.encrypt_deterministically(request.plaintext, 492 request.associated_data) 493 return testing_api_pb2.DeterministicAeadEncryptResponse( 494 ciphertext=ciphertext) 495 except tink.TinkError as e: 496 return testing_api_pb2.DeterministicAeadEncryptResponse(err=str(e)) 497 498 def DecryptDeterministically( 499 self, request: testing_api_pb2.DeterministicAeadDecryptRequest, 500 context: grpc.ServicerContext 501 ) -> testing_api_pb2.DeterministicAeadDecryptResponse: 502 """Decrypts a message.""" 503 keyset_handle = cleartext_keyset_handle.read( 504 tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) 505 p = keyset_handle.primitive(daead.DeterministicAead) 506 try: 507 plaintext = p.decrypt_deterministically(request.ciphertext, 508 request.associated_data) 509 return testing_api_pb2.DeterministicAeadDecryptResponse( 510 plaintext=plaintext) 511 except tink.TinkError as e: 512 return testing_api_pb2.DeterministicAeadDecryptResponse(err=str(e)) 513 514 515class MacServicer(testing_api_pb2_grpc.MacServicer): 516 """A service for testing MACs.""" 517 518 def Create(self, request: testing_api_pb2.CreationRequest, 519 context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: 520 """Creates a MAC without using it.""" 521 try: 522 keyset_handle = cleartext_keyset_handle.read( 523 tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) 524 keyset_handle.primitive(mac.Mac) 525 return testing_api_pb2.CreationResponse() 526 except tink.TinkError as e: 527 return testing_api_pb2.CreationResponse(err=str(e)) 528 529 def ComputeMac( 530 self, request: testing_api_pb2.ComputeMacRequest, 531 context: grpc.ServicerContext) -> testing_api_pb2.ComputeMacResponse: 532 """Computes a MAC.""" 533 try: 534 keyset_handle = cleartext_keyset_handle.read( 535 tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) 536 p = keyset_handle.primitive(mac.Mac) 537 mac_value = p.compute_mac(request.data) 538 return testing_api_pb2.ComputeMacResponse(mac_value=mac_value) 539 except tink.TinkError as e: 540 return testing_api_pb2.ComputeMacResponse(err=str(e)) 541 542 def VerifyMac( 543 self, request: testing_api_pb2.VerifyMacRequest, 544 context: grpc.ServicerContext) -> testing_api_pb2.VerifyMacResponse: 545 """Verifies a MAC value.""" 546 try: 547 keyset_handle = cleartext_keyset_handle.read( 548 tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) 549 p = keyset_handle.primitive(mac.Mac) 550 p.verify_mac(request.mac_value, request.data) 551 return testing_api_pb2.VerifyMacResponse() 552 except tink.TinkError as e: 553 return testing_api_pb2.VerifyMacResponse(err=str(e)) 554 555 556class HybridServicer(testing_api_pb2_grpc.HybridServicer): 557 """A service for testing hybrid encryption and decryption.""" 558 559 def CreateHybridEncrypt( 560 self, request: testing_api_pb2.CreationRequest, 561 context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: 562 """Creates a HybridEncrypt without using it.""" 563 try: 564 keyset_handle = cleartext_keyset_handle.read( 565 tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) 566 keyset_handle.primitive(hybrid.HybridEncrypt) 567 return testing_api_pb2.CreationResponse() 568 except tink.TinkError as e: 569 return testing_api_pb2.CreationResponse(err=str(e)) 570 571 def CreateHybridDecrypt( 572 self, request: testing_api_pb2.CreationRequest, 573 context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: 574 """Creates a HybridDecrypt without using it.""" 575 try: 576 keyset_handle = cleartext_keyset_handle.read( 577 tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) 578 keyset_handle.primitive(hybrid.HybridDecrypt) 579 return testing_api_pb2.CreationResponse() 580 except tink.TinkError as e: 581 return testing_api_pb2.CreationResponse(err=str(e)) 582 583 def Encrypt( 584 self, request: testing_api_pb2.HybridEncryptRequest, 585 context: grpc.ServicerContext) -> testing_api_pb2.HybridEncryptResponse: 586 """Encrypts a message.""" 587 try: 588 public_keyset_handle = cleartext_keyset_handle.read( 589 tink.BinaryKeysetReader( 590 request.public_annotated_keyset.serialized_keyset)) 591 p = public_keyset_handle.primitive(hybrid.HybridEncrypt) 592 ciphertext = p.encrypt(request.plaintext, request.context_info) 593 return testing_api_pb2.HybridEncryptResponse(ciphertext=ciphertext) 594 except tink.TinkError as e: 595 return testing_api_pb2.HybridEncryptResponse(err=str(e)) 596 597 def Decrypt( 598 self, request: testing_api_pb2.HybridDecryptRequest, 599 context: grpc.ServicerContext) -> testing_api_pb2.HybridDecryptResponse: 600 """Decrypts a message.""" 601 try: 602 private_keyset_handle = cleartext_keyset_handle.read( 603 tink.BinaryKeysetReader( 604 request.private_annotated_keyset.serialized_keyset)) 605 p = private_keyset_handle.primitive(hybrid.HybridDecrypt) 606 plaintext = p.decrypt(request.ciphertext, request.context_info) 607 return testing_api_pb2.HybridDecryptResponse(plaintext=plaintext) 608 except tink.TinkError as e: 609 return testing_api_pb2.HybridDecryptResponse(err=str(e)) 610 611 612class SignatureServicer(testing_api_pb2_grpc.SignatureServicer): 613 """A service for testing signatures.""" 614 615 def CreatePublicKeySign( 616 self, request: testing_api_pb2.CreationRequest, 617 context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: 618 """Creates a PublicKeySign without using it.""" 619 try: 620 keyset_handle = cleartext_keyset_handle.read( 621 tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) 622 keyset_handle.primitive(signature.PublicKeySign) 623 return testing_api_pb2.CreationResponse() 624 except tink.TinkError as e: 625 return testing_api_pb2.CreationResponse(err=str(e)) 626 627 def CreatePublicKeyVerify( 628 self, request: testing_api_pb2.CreationRequest, 629 context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: 630 """Creates a PublicKeyVerify without using it.""" 631 try: 632 keyset_handle = cleartext_keyset_handle.read( 633 tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) 634 keyset_handle.primitive(signature.PublicKeyVerify) 635 return testing_api_pb2.CreationResponse() 636 except tink.TinkError as e: 637 return testing_api_pb2.CreationResponse(err=str(e)) 638 639 def Sign( 640 self, request: testing_api_pb2.SignatureSignRequest, 641 context: grpc.ServicerContext) -> testing_api_pb2.SignatureSignResponse: 642 """Signs a message.""" 643 try: 644 private_keyset_handle = cleartext_keyset_handle.read( 645 tink.BinaryKeysetReader( 646 request.private_annotated_keyset.serialized_keyset)) 647 p = private_keyset_handle.primitive(signature.PublicKeySign) 648 signature_value = p.sign(request.data) 649 return testing_api_pb2.SignatureSignResponse(signature=signature_value) 650 except tink.TinkError as e: 651 return testing_api_pb2.SignatureSignResponse(err=str(e)) 652 653 def Verify( 654 self, request: testing_api_pb2.SignatureVerifyRequest, 655 context: grpc.ServicerContext) -> testing_api_pb2.SignatureVerifyResponse: 656 """Verifies a signature.""" 657 try: 658 public_keyset_handle = cleartext_keyset_handle.read( 659 tink.BinaryKeysetReader( 660 request.public_annotated_keyset.serialized_keyset)) 661 p = public_keyset_handle.primitive(signature.PublicKeyVerify) 662 p.verify(request.signature, request.data) 663 return testing_api_pb2.SignatureVerifyResponse() 664 except tink.TinkError as e: 665 return testing_api_pb2.SignatureVerifyResponse(err=str(e)) 666 667 668class PrfSetServicer(testing_api_pb2_grpc.PrfSetServicer): 669 """A service for testing PrfSet.""" 670 671 def Create(self, request: testing_api_pb2.CreationRequest, 672 context: grpc.ServicerContext) -> testing_api_pb2.CreationResponse: 673 """Creates a PrfSet without using it.""" 674 try: 675 keyset_handle = cleartext_keyset_handle.read( 676 tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) 677 keyset_handle.primitive(prf.PrfSet) 678 return testing_api_pb2.CreationResponse() 679 except tink.TinkError as e: 680 return testing_api_pb2.CreationResponse(err=str(e)) 681 682 def KeyIds( 683 self, request: testing_api_pb2.PrfSetKeyIdsRequest, 684 context: grpc.ServicerContext) -> testing_api_pb2.PrfSetKeyIdsResponse: 685 """Returns all key IDs and the primary key ID.""" 686 try: 687 keyset_handle = cleartext_keyset_handle.read( 688 tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) 689 p = keyset_handle.primitive(prf.PrfSet) 690 prfs = p.all() 691 response = testing_api_pb2.PrfSetKeyIdsResponse() 692 response.output.primary_key_id = p.primary_id() 693 response.output.key_id.extend(prfs.keys()) 694 return response 695 except tink.TinkError as e: 696 return testing_api_pb2.PrfSetKeyIdsResponse(err=str(e)) 697 698 def Compute( 699 self, request: testing_api_pb2.PrfSetComputeRequest, 700 context: grpc.ServicerContext) -> testing_api_pb2.PrfSetComputeResponse: 701 """Computes the output of one PRF.""" 702 try: 703 keyset_handle = cleartext_keyset_handle.read( 704 tink.BinaryKeysetReader(request.annotated_keyset.serialized_keyset)) 705 f = keyset_handle.primitive(prf.PrfSet).all()[request.key_id] 706 return testing_api_pb2.PrfSetComputeResponse( 707 output=f.compute(request.input_data, request.output_length)) 708 except tink.TinkError as e: 709 return testing_api_pb2.PrfSetComputeResponse(err=str(e)) 710