1*e7b1675dSTing-Kang Chang# Copyright 2020 Google LLC 2*e7b1675dSTing-Kang Chang# 3*e7b1675dSTing-Kang Chang# Licensed under the Apache License, Version 2.0 (the "License"); 4*e7b1675dSTing-Kang Chang# you may not use this file except in compliance with the License. 5*e7b1675dSTing-Kang Chang# You may obtain a copy of the License at 6*e7b1675dSTing-Kang Chang# 7*e7b1675dSTing-Kang Chang# http://www.apache.org/licenses/LICENSE-2.0 8*e7b1675dSTing-Kang Chang# 9*e7b1675dSTing-Kang Chang# Unless required by applicable law or agreed to in writing, software 10*e7b1675dSTing-Kang Chang# distributed under the License is distributed on an "AS-IS" BASIS, 11*e7b1675dSTing-Kang Chang# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12*e7b1675dSTing-Kang Chang# See the License for the specific language governing permissions and 13*e7b1675dSTing-Kang Chang# limitations under the License. 14*e7b1675dSTing-Kang Chang"""Tests for tink.tools.testing.python.testing_server.""" 15*e7b1675dSTing-Kang Chang 16*e7b1675dSTing-Kang Changfrom absl.testing import absltest 17*e7b1675dSTing-Kang Changimport grpc 18*e7b1675dSTing-Kang Chang 19*e7b1675dSTing-Kang Changimport tink 20*e7b1675dSTing-Kang Changfrom tink import aead 21*e7b1675dSTing-Kang Changfrom tink import daead 22*e7b1675dSTing-Kang Changfrom tink import hybrid 23*e7b1675dSTing-Kang Changfrom tink import mac 24*e7b1675dSTing-Kang Changfrom tink import prf 25*e7b1675dSTing-Kang Changfrom tink import signature 26*e7b1675dSTing-Kang Changfrom tink import streaming_aead 27*e7b1675dSTing-Kang Chang 28*e7b1675dSTing-Kang Chang 29*e7b1675dSTing-Kang Changfrom protos import testing_api_pb2 30*e7b1675dSTing-Kang Changimport services 31*e7b1675dSTing-Kang Chang 32*e7b1675dSTing-Kang Chang 33*e7b1675dSTing-Kang Changclass DummyServicerContext(grpc.ServicerContext): 34*e7b1675dSTing-Kang Chang 35*e7b1675dSTing-Kang Chang def is_active(self): 36*e7b1675dSTing-Kang Chang pass 37*e7b1675dSTing-Kang Chang 38*e7b1675dSTing-Kang Chang def time_remaining(self): 39*e7b1675dSTing-Kang Chang pass 40*e7b1675dSTing-Kang Chang 41*e7b1675dSTing-Kang Chang def cancel(self): 42*e7b1675dSTing-Kang Chang pass 43*e7b1675dSTing-Kang Chang 44*e7b1675dSTing-Kang Chang def add_callback(self, callback): 45*e7b1675dSTing-Kang Chang pass 46*e7b1675dSTing-Kang Chang 47*e7b1675dSTing-Kang Chang def invocation_metadata(self): 48*e7b1675dSTing-Kang Chang pass 49*e7b1675dSTing-Kang Chang 50*e7b1675dSTing-Kang Chang def peer(self): 51*e7b1675dSTing-Kang Chang pass 52*e7b1675dSTing-Kang Chang 53*e7b1675dSTing-Kang Chang def peer_identities(self): 54*e7b1675dSTing-Kang Chang pass 55*e7b1675dSTing-Kang Chang 56*e7b1675dSTing-Kang Chang def peer_identity_key(self): 57*e7b1675dSTing-Kang Chang pass 58*e7b1675dSTing-Kang Chang 59*e7b1675dSTing-Kang Chang def auth_context(self): 60*e7b1675dSTing-Kang Chang pass 61*e7b1675dSTing-Kang Chang 62*e7b1675dSTing-Kang Chang def set_compression(self, compression): 63*e7b1675dSTing-Kang Chang pass 64*e7b1675dSTing-Kang Chang 65*e7b1675dSTing-Kang Chang def send_initial_metadata(self, initial_metadata): 66*e7b1675dSTing-Kang Chang pass 67*e7b1675dSTing-Kang Chang 68*e7b1675dSTing-Kang Chang def set_trailing_metadata(self, trailing_metadata): 69*e7b1675dSTing-Kang Chang pass 70*e7b1675dSTing-Kang Chang 71*e7b1675dSTing-Kang Chang def abort(self, code, details): 72*e7b1675dSTing-Kang Chang pass 73*e7b1675dSTing-Kang Chang 74*e7b1675dSTing-Kang Chang def abort_with_status(self, status): 75*e7b1675dSTing-Kang Chang pass 76*e7b1675dSTing-Kang Chang 77*e7b1675dSTing-Kang Chang def set_code(self, code): 78*e7b1675dSTing-Kang Chang pass 79*e7b1675dSTing-Kang Chang 80*e7b1675dSTing-Kang Chang def set_details(self, details): 81*e7b1675dSTing-Kang Chang pass 82*e7b1675dSTing-Kang Chang 83*e7b1675dSTing-Kang Chang def disable_next_message_compression(self): 84*e7b1675dSTing-Kang Chang pass 85*e7b1675dSTing-Kang Chang 86*e7b1675dSTing-Kang Chang 87*e7b1675dSTing-Kang Changclass ServicesTest(absltest.TestCase): 88*e7b1675dSTing-Kang Chang 89*e7b1675dSTing-Kang Chang _ctx = DummyServicerContext() 90*e7b1675dSTing-Kang Chang 91*e7b1675dSTing-Kang Chang @classmethod 92*e7b1675dSTing-Kang Chang def setUpClass(cls): 93*e7b1675dSTing-Kang Chang super().setUpClass() 94*e7b1675dSTing-Kang Chang aead.register() 95*e7b1675dSTing-Kang Chang daead.register() 96*e7b1675dSTing-Kang Chang mac.register() 97*e7b1675dSTing-Kang Chang hybrid.register() 98*e7b1675dSTing-Kang Chang prf.register() 99*e7b1675dSTing-Kang Chang signature.register() 100*e7b1675dSTing-Kang Chang streaming_aead.register() 101*e7b1675dSTing-Kang Chang 102*e7b1675dSTing-Kang Chang def test_from_json(self): 103*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 104*e7b1675dSTing-Kang Chang json_keyset = """ 105*e7b1675dSTing-Kang Chang { 106*e7b1675dSTing-Kang Chang "primaryKeyId": 42, 107*e7b1675dSTing-Kang Chang "key": [ 108*e7b1675dSTing-Kang Chang { 109*e7b1675dSTing-Kang Chang "keyData": { 110*e7b1675dSTing-Kang Chang "typeUrl": "type.googleapis.com/google.crypto.tink.AesGcmKey", 111*e7b1675dSTing-Kang Chang "keyMaterialType": "SYMMETRIC", 112*e7b1675dSTing-Kang Chang "value": "AFakeTestKeyValue1234567" 113*e7b1675dSTing-Kang Chang 114*e7b1675dSTing-Kang Chang }, 115*e7b1675dSTing-Kang Chang "outputPrefixType": "TINK", 116*e7b1675dSTing-Kang Chang "keyId": 42, 117*e7b1675dSTing-Kang Chang "status": "ENABLED" 118*e7b1675dSTing-Kang Chang } 119*e7b1675dSTing-Kang Chang ] 120*e7b1675dSTing-Kang Chang }""" 121*e7b1675dSTing-Kang Chang request = testing_api_pb2.KeysetFromJsonRequest(json_keyset=json_keyset) 122*e7b1675dSTing-Kang Chang response = keyset_servicer.FromJson(request, self._ctx) 123*e7b1675dSTing-Kang Chang self.assertEqual(response.WhichOneof('result'), 'keyset') 124*e7b1675dSTing-Kang Chang keyset = tink.BinaryKeysetReader(response.keyset).read() 125*e7b1675dSTing-Kang Chang self.assertEqual(keyset.primary_key_id, 42) 126*e7b1675dSTing-Kang Chang self.assertLen(keyset.key, 1) 127*e7b1675dSTing-Kang Chang 128*e7b1675dSTing-Kang Chang def test_from_json_fail(self): 129*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 130*e7b1675dSTing-Kang Chang request = testing_api_pb2.KeysetFromJsonRequest(json_keyset='bad json') 131*e7b1675dSTing-Kang Chang response = keyset_servicer.FromJson(request, self._ctx) 132*e7b1675dSTing-Kang Chang self.assertEqual(response.WhichOneof('result'), 'err') 133*e7b1675dSTing-Kang Chang self.assertNotEmpty(response.err) 134*e7b1675dSTing-Kang Chang 135*e7b1675dSTing-Kang Chang def test_generate_to_from_json(self): 136*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 137*e7b1675dSTing-Kang Chang 138*e7b1675dSTing-Kang Chang template = aead.aead_key_templates.AES128_GCM.SerializeToString() 139*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 140*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 141*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 142*e7b1675dSTing-Kang Chang keyset = gen_response.keyset 143*e7b1675dSTing-Kang Chang 144*e7b1675dSTing-Kang Chang tojson_request = testing_api_pb2.KeysetToJsonRequest(keyset=keyset) 145*e7b1675dSTing-Kang Chang tojson_response = keyset_servicer.ToJson(tojson_request, self._ctx) 146*e7b1675dSTing-Kang Chang self.assertEqual(tojson_response.WhichOneof('result'), 'json_keyset') 147*e7b1675dSTing-Kang Chang json_keyset = tojson_response.json_keyset 148*e7b1675dSTing-Kang Chang 149*e7b1675dSTing-Kang Chang fromjson_request = testing_api_pb2.KeysetFromJsonRequest( 150*e7b1675dSTing-Kang Chang json_keyset=json_keyset) 151*e7b1675dSTing-Kang Chang fromjson_response = keyset_servicer.FromJson(fromjson_request, self._ctx) 152*e7b1675dSTing-Kang Chang self.assertEqual(fromjson_response.WhichOneof('result'), 'keyset') 153*e7b1675dSTing-Kang Chang self.assertEqual(fromjson_response.keyset, keyset) 154*e7b1675dSTing-Kang Chang 155*e7b1675dSTing-Kang Chang def test_to_json_fail(self): 156*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 157*e7b1675dSTing-Kang Chang request = testing_api_pb2.KeysetToJsonRequest(keyset=b'bad keyset') 158*e7b1675dSTing-Kang Chang response = keyset_servicer.ToJson(request, self._ctx) 159*e7b1675dSTing-Kang Chang self.assertEqual(response.WhichOneof('result'), 'err') 160*e7b1675dSTing-Kang Chang self.assertNotEmpty(response.err) 161*e7b1675dSTing-Kang Chang 162*e7b1675dSTing-Kang Chang def test_generate_keyset_write_read_encrypted(self): 163*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 164*e7b1675dSTing-Kang Chang 165*e7b1675dSTing-Kang Chang template = aead.aead_key_templates.AES128_GCM.SerializeToString() 166*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 167*e7b1675dSTing-Kang Chang master_response = keyset_servicer.Generate(gen_request, self._ctx) 168*e7b1675dSTing-Kang Chang self.assertEqual(master_response.WhichOneof('result'), 'keyset') 169*e7b1675dSTing-Kang Chang master_keyset = master_response.keyset 170*e7b1675dSTing-Kang Chang 171*e7b1675dSTing-Kang Chang keyset_response = keyset_servicer.Generate(gen_request, self._ctx) 172*e7b1675dSTing-Kang Chang self.assertEqual(keyset_response.WhichOneof('result'), 'keyset') 173*e7b1675dSTing-Kang Chang keyset = keyset_response.keyset 174*e7b1675dSTing-Kang Chang 175*e7b1675dSTing-Kang Chang write_encrypted_request = testing_api_pb2.KeysetWriteEncryptedRequest( 176*e7b1675dSTing-Kang Chang keyset=keyset, 177*e7b1675dSTing-Kang Chang master_keyset=master_keyset, 178*e7b1675dSTing-Kang Chang keyset_writer_type=testing_api_pb2.KEYSET_WRITER_BINARY) 179*e7b1675dSTing-Kang Chang write_encrypted_response = keyset_servicer.WriteEncrypted( 180*e7b1675dSTing-Kang Chang write_encrypted_request, self._ctx) 181*e7b1675dSTing-Kang Chang self.assertEqual( 182*e7b1675dSTing-Kang Chang write_encrypted_response.WhichOneof('result'), 'encrypted_keyset') 183*e7b1675dSTing-Kang Chang encrypted_keyset = write_encrypted_response.encrypted_keyset 184*e7b1675dSTing-Kang Chang 185*e7b1675dSTing-Kang Chang read_encrypted_request = testing_api_pb2.KeysetReadEncryptedRequest( 186*e7b1675dSTing-Kang Chang encrypted_keyset=encrypted_keyset, 187*e7b1675dSTing-Kang Chang master_keyset=master_keyset, 188*e7b1675dSTing-Kang Chang keyset_reader_type=testing_api_pb2.KEYSET_READER_BINARY) 189*e7b1675dSTing-Kang Chang read_encrypted_response = keyset_servicer.ReadEncrypted( 190*e7b1675dSTing-Kang Chang read_encrypted_request, self._ctx) 191*e7b1675dSTing-Kang Chang self.assertEqual(read_encrypted_response.WhichOneof('result'), 'keyset') 192*e7b1675dSTing-Kang Chang self.assertEqual(read_encrypted_response.keyset, keyset) 193*e7b1675dSTing-Kang Chang 194*e7b1675dSTing-Kang Chang def test_generate_keyset_write_read_encrypted_with_associated_data(self): 195*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 196*e7b1675dSTing-Kang Chang 197*e7b1675dSTing-Kang Chang template = aead.aead_key_templates.AES128_GCM.SerializeToString() 198*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 199*e7b1675dSTing-Kang Chang master_response = keyset_servicer.Generate(gen_request, self._ctx) 200*e7b1675dSTing-Kang Chang self.assertEqual(master_response.WhichOneof('result'), 'keyset') 201*e7b1675dSTing-Kang Chang master_keyset = master_response.keyset 202*e7b1675dSTing-Kang Chang 203*e7b1675dSTing-Kang Chang keyset_response = keyset_servicer.Generate(gen_request, self._ctx) 204*e7b1675dSTing-Kang Chang self.assertEqual(keyset_response.WhichOneof('result'), 'keyset') 205*e7b1675dSTing-Kang Chang keyset = keyset_response.keyset 206*e7b1675dSTing-Kang Chang 207*e7b1675dSTing-Kang Chang associated_data = b'associated_data' 208*e7b1675dSTing-Kang Chang 209*e7b1675dSTing-Kang Chang write_encrypted_request = testing_api_pb2.KeysetWriteEncryptedRequest( 210*e7b1675dSTing-Kang Chang keyset=keyset, 211*e7b1675dSTing-Kang Chang master_keyset=master_keyset, 212*e7b1675dSTing-Kang Chang associated_data=testing_api_pb2.BytesValue(value=associated_data), 213*e7b1675dSTing-Kang Chang keyset_writer_type=testing_api_pb2.KEYSET_WRITER_BINARY) 214*e7b1675dSTing-Kang Chang write_encrypted_response = keyset_servicer.WriteEncrypted( 215*e7b1675dSTing-Kang Chang write_encrypted_request, self._ctx) 216*e7b1675dSTing-Kang Chang self.assertEqual( 217*e7b1675dSTing-Kang Chang write_encrypted_response.WhichOneof('result'), 'encrypted_keyset') 218*e7b1675dSTing-Kang Chang encrypted_keyset = write_encrypted_response.encrypted_keyset 219*e7b1675dSTing-Kang Chang 220*e7b1675dSTing-Kang Chang read_encrypted_request = testing_api_pb2.KeysetReadEncryptedRequest( 221*e7b1675dSTing-Kang Chang encrypted_keyset=encrypted_keyset, 222*e7b1675dSTing-Kang Chang master_keyset=master_keyset, 223*e7b1675dSTing-Kang Chang associated_data=testing_api_pb2.BytesValue(value=associated_data), 224*e7b1675dSTing-Kang Chang keyset_reader_type=testing_api_pb2.KEYSET_READER_BINARY) 225*e7b1675dSTing-Kang Chang read_encrypted_response = keyset_servicer.ReadEncrypted( 226*e7b1675dSTing-Kang Chang read_encrypted_request, self._ctx) 227*e7b1675dSTing-Kang Chang self.assertEqual(read_encrypted_response.WhichOneof('result'), 'keyset') 228*e7b1675dSTing-Kang Chang self.assertEqual(read_encrypted_response.keyset, keyset) 229*e7b1675dSTing-Kang Chang 230*e7b1675dSTing-Kang Chang # Using the wrong associated_data fails 231*e7b1675dSTing-Kang Chang read_encrypted_request = testing_api_pb2.KeysetReadEncryptedRequest( 232*e7b1675dSTing-Kang Chang encrypted_keyset=encrypted_keyset, 233*e7b1675dSTing-Kang Chang master_keyset=master_keyset, 234*e7b1675dSTing-Kang Chang associated_data=testing_api_pb2.BytesValue(value=b'wrong ad'), 235*e7b1675dSTing-Kang Chang keyset_reader_type=testing_api_pb2.KEYSET_READER_BINARY) 236*e7b1675dSTing-Kang Chang read_encrypted_response = keyset_servicer.ReadEncrypted( 237*e7b1675dSTing-Kang Chang read_encrypted_request, self._ctx) 238*e7b1675dSTing-Kang Chang self.assertEqual(read_encrypted_response.WhichOneof('result'), 'err') 239*e7b1675dSTing-Kang Chang 240*e7b1675dSTing-Kang Chang def test_keyset_write_encrypted_fails_when_keyset_is_invalid(self): 241*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 242*e7b1675dSTing-Kang Chang 243*e7b1675dSTing-Kang Chang template = aead.aead_key_templates.AES128_GCM.SerializeToString() 244*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 245*e7b1675dSTing-Kang Chang master_response = keyset_servicer.Generate(gen_request, self._ctx) 246*e7b1675dSTing-Kang Chang self.assertEqual(master_response.WhichOneof('result'), 'keyset') 247*e7b1675dSTing-Kang Chang master_keyset = master_response.keyset 248*e7b1675dSTing-Kang Chang 249*e7b1675dSTing-Kang Chang write_encrypted_request = testing_api_pb2.KeysetWriteEncryptedRequest( 250*e7b1675dSTing-Kang Chang keyset=b'invalid', 251*e7b1675dSTing-Kang Chang master_keyset=master_keyset, 252*e7b1675dSTing-Kang Chang keyset_writer_type=testing_api_pb2.KEYSET_WRITER_BINARY) 253*e7b1675dSTing-Kang Chang write_encrypted_response = keyset_servicer.WriteEncrypted( 254*e7b1675dSTing-Kang Chang write_encrypted_request, self._ctx) 255*e7b1675dSTing-Kang Chang self.assertEqual(write_encrypted_response.WhichOneof('result'), 'err') 256*e7b1675dSTing-Kang Chang 257*e7b1675dSTing-Kang Chang def test_keyset_read_encrypted_fails_when_encrypted_keyset_is_invalid(self): 258*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 259*e7b1675dSTing-Kang Chang 260*e7b1675dSTing-Kang Chang template = aead.aead_key_templates.AES128_GCM.SerializeToString() 261*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 262*e7b1675dSTing-Kang Chang master_response = keyset_servicer.Generate(gen_request, self._ctx) 263*e7b1675dSTing-Kang Chang self.assertEqual(master_response.WhichOneof('result'), 'keyset') 264*e7b1675dSTing-Kang Chang master_keyset = master_response.keyset 265*e7b1675dSTing-Kang Chang 266*e7b1675dSTing-Kang Chang read_encrypted_request = testing_api_pb2.KeysetReadEncryptedRequest( 267*e7b1675dSTing-Kang Chang encrypted_keyset=b'invalid', 268*e7b1675dSTing-Kang Chang master_keyset=master_keyset, 269*e7b1675dSTing-Kang Chang keyset_reader_type=testing_api_pb2.KEYSET_READER_BINARY) 270*e7b1675dSTing-Kang Chang read_encrypted_response = keyset_servicer.ReadEncrypted( 271*e7b1675dSTing-Kang Chang read_encrypted_request, self._ctx) 272*e7b1675dSTing-Kang Chang self.assertEqual(read_encrypted_response.WhichOneof('result'), 'err') 273*e7b1675dSTing-Kang Chang 274*e7b1675dSTing-Kang Chang def test_create_aead(self): 275*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 276*e7b1675dSTing-Kang Chang aead_servicer = services.AeadServicer() 277*e7b1675dSTing-Kang Chang 278*e7b1675dSTing-Kang Chang template = aead.aead_key_templates.AES128_GCM.SerializeToString() 279*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 280*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 281*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 282*e7b1675dSTing-Kang Chang 283*e7b1675dSTing-Kang Chang creation_request = testing_api_pb2.CreationRequest( 284*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 285*e7b1675dSTing-Kang Chang serialized_keyset=gen_response.keyset)) 286*e7b1675dSTing-Kang Chang creation_response = aead_servicer.Create(creation_request, self._ctx) 287*e7b1675dSTing-Kang Chang self.assertEmpty(creation_response.err) 288*e7b1675dSTing-Kang Chang 289*e7b1675dSTing-Kang Chang def test_create_aead_broken_keyset(self): 290*e7b1675dSTing-Kang Chang aead_servicer = services.AeadServicer() 291*e7b1675dSTing-Kang Chang 292*e7b1675dSTing-Kang Chang creation_request = testing_api_pb2.CreationRequest( 293*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 294*e7b1675dSTing-Kang Chang serialized_keyset=b'\x80')) 295*e7b1675dSTing-Kang Chang creation_response = aead_servicer.Create(creation_request, self._ctx) 296*e7b1675dSTing-Kang Chang self.assertNotEmpty(creation_response.err) 297*e7b1675dSTing-Kang Chang 298*e7b1675dSTing-Kang Chang def test_encrypt_decrypt_wrong_keyset(self): 299*e7b1675dSTing-Kang Chang aead_servicer = services.AeadServicer() 300*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 301*e7b1675dSTing-Kang Chang # HMAC keysets will not allow creation of an AEAD. 302*e7b1675dSTing-Kang Chang template = mac.mac_key_templates.HMAC_SHA256_128BITTAG.SerializeToString() 303*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 304*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 305*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 306*e7b1675dSTing-Kang Chang keyset = gen_response.keyset 307*e7b1675dSTing-Kang Chang 308*e7b1675dSTing-Kang Chang with self.assertRaises(tink.TinkError): 309*e7b1675dSTing-Kang Chang aead_servicer.Encrypt( 310*e7b1675dSTing-Kang Chang testing_api_pb2.AeadEncryptRequest( 311*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 312*e7b1675dSTing-Kang Chang serialized_keyset=keyset)), self._ctx) 313*e7b1675dSTing-Kang Chang 314*e7b1675dSTing-Kang Chang with self.assertRaises(tink.TinkError): 315*e7b1675dSTing-Kang Chang aead_servicer.Decrypt( 316*e7b1675dSTing-Kang Chang testing_api_pb2.AeadDecryptRequest( 317*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 318*e7b1675dSTing-Kang Chang serialized_keyset=keyset)), self._ctx) 319*e7b1675dSTing-Kang Chang 320*e7b1675dSTing-Kang Chang def test_generate_encrypt_decrypt(self): 321*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 322*e7b1675dSTing-Kang Chang aead_servicer = services.AeadServicer() 323*e7b1675dSTing-Kang Chang 324*e7b1675dSTing-Kang Chang template = aead.aead_key_templates.AES128_GCM.SerializeToString() 325*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 326*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 327*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 328*e7b1675dSTing-Kang Chang keyset = gen_response.keyset 329*e7b1675dSTing-Kang Chang plaintext = b'The quick brown fox jumps over the lazy dog' 330*e7b1675dSTing-Kang Chang associated_data = b'associated_data' 331*e7b1675dSTing-Kang Chang enc_request = testing_api_pb2.AeadEncryptRequest( 332*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 333*e7b1675dSTing-Kang Chang serialized_keyset=keyset), 334*e7b1675dSTing-Kang Chang plaintext=plaintext, 335*e7b1675dSTing-Kang Chang associated_data=associated_data) 336*e7b1675dSTing-Kang Chang enc_response = aead_servicer.Encrypt(enc_request, self._ctx) 337*e7b1675dSTing-Kang Chang self.assertEqual(enc_response.WhichOneof('result'), 'ciphertext') 338*e7b1675dSTing-Kang Chang ciphertext = enc_response.ciphertext 339*e7b1675dSTing-Kang Chang dec_request = testing_api_pb2.AeadDecryptRequest( 340*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 341*e7b1675dSTing-Kang Chang serialized_keyset=keyset), 342*e7b1675dSTing-Kang Chang ciphertext=ciphertext, 343*e7b1675dSTing-Kang Chang associated_data=associated_data) 344*e7b1675dSTing-Kang Chang dec_response = aead_servicer.Decrypt(dec_request, self._ctx) 345*e7b1675dSTing-Kang Chang self.assertEqual(dec_response.WhichOneof('result'), 'plaintext') 346*e7b1675dSTing-Kang Chang self.assertEqual(dec_response.plaintext, plaintext) 347*e7b1675dSTing-Kang Chang 348*e7b1675dSTing-Kang Chang def test_generate_decrypt_fail(self): 349*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 350*e7b1675dSTing-Kang Chang aead_servicer = services.AeadServicer() 351*e7b1675dSTing-Kang Chang 352*e7b1675dSTing-Kang Chang template = aead.aead_key_templates.AES128_GCM.SerializeToString() 353*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 354*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 355*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 356*e7b1675dSTing-Kang Chang keyset = gen_response.keyset 357*e7b1675dSTing-Kang Chang 358*e7b1675dSTing-Kang Chang ciphertext = b'some invalid ciphertext' 359*e7b1675dSTing-Kang Chang associated_data = b'associated_data' 360*e7b1675dSTing-Kang Chang dec_request = testing_api_pb2.AeadDecryptRequest( 361*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 362*e7b1675dSTing-Kang Chang serialized_keyset=keyset), 363*e7b1675dSTing-Kang Chang ciphertext=ciphertext, 364*e7b1675dSTing-Kang Chang associated_data=associated_data) 365*e7b1675dSTing-Kang Chang dec_response = aead_servicer.Decrypt(dec_request, self._ctx) 366*e7b1675dSTing-Kang Chang self.assertEqual(dec_response.WhichOneof('result'), 'err') 367*e7b1675dSTing-Kang Chang self.assertNotEmpty(dec_response.err) 368*e7b1675dSTing-Kang Chang 369*e7b1675dSTing-Kang Chang def test_server_info(self): 370*e7b1675dSTing-Kang Chang metadata_servicer = services.MetadataServicer() 371*e7b1675dSTing-Kang Chang request = testing_api_pb2.ServerInfoRequest() 372*e7b1675dSTing-Kang Chang response = metadata_servicer.GetServerInfo(request, self._ctx) 373*e7b1675dSTing-Kang Chang self.assertEqual(response.language, 'python') 374*e7b1675dSTing-Kang Chang 375*e7b1675dSTing-Kang Chang def test_create_deterministic_aead(self): 376*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 377*e7b1675dSTing-Kang Chang daead_servicer = services.DeterministicAeadServicer() 378*e7b1675dSTing-Kang Chang 379*e7b1675dSTing-Kang Chang template_proto = daead.deterministic_aead_key_templates.AES256_SIV 380*e7b1675dSTing-Kang Chang template = template_proto.SerializeToString() 381*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 382*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 383*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 384*e7b1675dSTing-Kang Chang 385*e7b1675dSTing-Kang Chang creation_request = testing_api_pb2.CreationRequest( 386*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 387*e7b1675dSTing-Kang Chang serialized_keyset=gen_response.keyset)) 388*e7b1675dSTing-Kang Chang creation_response = daead_servicer.Create( 389*e7b1675dSTing-Kang Chang creation_request, self._ctx) 390*e7b1675dSTing-Kang Chang self.assertEmpty(creation_response.err) 391*e7b1675dSTing-Kang Chang 392*e7b1675dSTing-Kang Chang def test_create_deterministic_aead_broken_keyset(self): 393*e7b1675dSTing-Kang Chang daead_servicer = services.DeterministicAeadServicer() 394*e7b1675dSTing-Kang Chang 395*e7b1675dSTing-Kang Chang creation_request = testing_api_pb2.CreationRequest( 396*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 397*e7b1675dSTing-Kang Chang serialized_keyset=b'\x80')) 398*e7b1675dSTing-Kang Chang creation_response = daead_servicer.Create(creation_request, self._ctx) 399*e7b1675dSTing-Kang Chang self.assertNotEmpty(creation_response.err) 400*e7b1675dSTing-Kang Chang 401*e7b1675dSTing-Kang Chang def test_encrypt_decrypt_deterministic_aead_broken_keyset(self): 402*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 403*e7b1675dSTing-Kang Chang daead_servicer = services.DeterministicAeadServicer() 404*e7b1675dSTing-Kang Chang 405*e7b1675dSTing-Kang Chang # AES128_GCM keysets will not allow creation of an Deterministic AEAD. 406*e7b1675dSTing-Kang Chang template = aead.aead_key_templates.AES128_GCM.SerializeToString() 407*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 408*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 409*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 410*e7b1675dSTing-Kang Chang keyset = gen_response.keyset 411*e7b1675dSTing-Kang Chang 412*e7b1675dSTing-Kang Chang enc_request = testing_api_pb2.DeterministicAeadEncryptRequest( 413*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 414*e7b1675dSTing-Kang Chang serialized_keyset=keyset)) 415*e7b1675dSTing-Kang Chang with self.assertRaises(tink.TinkError): 416*e7b1675dSTing-Kang Chang daead_servicer.EncryptDeterministically(enc_request, self._ctx) 417*e7b1675dSTing-Kang Chang dec_request = testing_api_pb2.DeterministicAeadDecryptRequest( 418*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 419*e7b1675dSTing-Kang Chang serialized_keyset=keyset)) 420*e7b1675dSTing-Kang Chang with self.assertRaises(tink.TinkError): 421*e7b1675dSTing-Kang Chang daead_servicer.DecryptDeterministically(dec_request, self._ctx) 422*e7b1675dSTing-Kang Chang 423*e7b1675dSTing-Kang Chang def test_generate_encrypt_decrypt_deterministically(self): 424*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 425*e7b1675dSTing-Kang Chang daead_servicer = services.DeterministicAeadServicer() 426*e7b1675dSTing-Kang Chang 427*e7b1675dSTing-Kang Chang template_proto = daead.deterministic_aead_key_templates.AES256_SIV 428*e7b1675dSTing-Kang Chang template = template_proto.SerializeToString() 429*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 430*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 431*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 432*e7b1675dSTing-Kang Chang keyset = gen_response.keyset 433*e7b1675dSTing-Kang Chang plaintext = b'The quick brown fox jumps over the lazy dog' 434*e7b1675dSTing-Kang Chang associated_data = b'associated_data' 435*e7b1675dSTing-Kang Chang enc_request = testing_api_pb2.DeterministicAeadEncryptRequest( 436*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 437*e7b1675dSTing-Kang Chang serialized_keyset=keyset), 438*e7b1675dSTing-Kang Chang plaintext=plaintext, 439*e7b1675dSTing-Kang Chang associated_data=associated_data) 440*e7b1675dSTing-Kang Chang enc_response = daead_servicer.EncryptDeterministically(enc_request, 441*e7b1675dSTing-Kang Chang self._ctx) 442*e7b1675dSTing-Kang Chang self.assertEqual(enc_response.WhichOneof('result'), 'ciphertext') 443*e7b1675dSTing-Kang Chang enc_response2 = daead_servicer.EncryptDeterministically(enc_request, 444*e7b1675dSTing-Kang Chang self._ctx) 445*e7b1675dSTing-Kang Chang self.assertEqual(enc_response2.WhichOneof('result'), 'ciphertext') 446*e7b1675dSTing-Kang Chang self.assertEqual(enc_response2.ciphertext, enc_response.ciphertext) 447*e7b1675dSTing-Kang Chang ciphertext = enc_response.ciphertext 448*e7b1675dSTing-Kang Chang dec_request = testing_api_pb2.DeterministicAeadDecryptRequest( 449*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 450*e7b1675dSTing-Kang Chang serialized_keyset=keyset), 451*e7b1675dSTing-Kang Chang ciphertext=ciphertext, 452*e7b1675dSTing-Kang Chang associated_data=associated_data) 453*e7b1675dSTing-Kang Chang dec_response = daead_servicer.DecryptDeterministically(dec_request, 454*e7b1675dSTing-Kang Chang self._ctx) 455*e7b1675dSTing-Kang Chang self.assertEqual(dec_response.WhichOneof('result'), 'plaintext') 456*e7b1675dSTing-Kang Chang self.assertEqual(dec_response.plaintext, plaintext) 457*e7b1675dSTing-Kang Chang 458*e7b1675dSTing-Kang Chang def test_generate_decrypt_deterministically_fail(self): 459*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 460*e7b1675dSTing-Kang Chang daead_servicer = services.DeterministicAeadServicer() 461*e7b1675dSTing-Kang Chang 462*e7b1675dSTing-Kang Chang template_proto = daead.deterministic_aead_key_templates.AES256_SIV 463*e7b1675dSTing-Kang Chang template = template_proto.SerializeToString() 464*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 465*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 466*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 467*e7b1675dSTing-Kang Chang keyset = gen_response.keyset 468*e7b1675dSTing-Kang Chang 469*e7b1675dSTing-Kang Chang ciphertext = b'some invalid ciphertext' 470*e7b1675dSTing-Kang Chang associated_data = b'associated_data' 471*e7b1675dSTing-Kang Chang dec_request = testing_api_pb2.DeterministicAeadDecryptRequest( 472*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 473*e7b1675dSTing-Kang Chang serialized_keyset=keyset), 474*e7b1675dSTing-Kang Chang ciphertext=ciphertext, 475*e7b1675dSTing-Kang Chang associated_data=associated_data) 476*e7b1675dSTing-Kang Chang dec_response = daead_servicer.DecryptDeterministically(dec_request, 477*e7b1675dSTing-Kang Chang self._ctx) 478*e7b1675dSTing-Kang Chang self.assertEqual(dec_response.WhichOneof('result'), 'err') 479*e7b1675dSTing-Kang Chang self.assertNotEmpty(dec_response.err) 480*e7b1675dSTing-Kang Chang 481*e7b1675dSTing-Kang Chang def test_create_mac(self): 482*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 483*e7b1675dSTing-Kang Chang mac_servicer = services.MacServicer() 484*e7b1675dSTing-Kang Chang 485*e7b1675dSTing-Kang Chang template = mac.mac_key_templates.HMAC_SHA256_128BITTAG.SerializeToString() 486*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 487*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 488*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 489*e7b1675dSTing-Kang Chang 490*e7b1675dSTing-Kang Chang creation_request = testing_api_pb2.CreationRequest( 491*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 492*e7b1675dSTing-Kang Chang serialized_keyset=gen_response.keyset)) 493*e7b1675dSTing-Kang Chang creation_response = mac_servicer.Create( 494*e7b1675dSTing-Kang Chang creation_request, self._ctx) 495*e7b1675dSTing-Kang Chang self.assertEmpty(creation_response.err) 496*e7b1675dSTing-Kang Chang 497*e7b1675dSTing-Kang Chang def test_create_mac_broken_keyset(self): 498*e7b1675dSTing-Kang Chang mac_servicer = services.MacServicer() 499*e7b1675dSTing-Kang Chang 500*e7b1675dSTing-Kang Chang creation_request = testing_api_pb2.CreationRequest( 501*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 502*e7b1675dSTing-Kang Chang serialized_keyset=b'\x80')) 503*e7b1675dSTing-Kang Chang creation_response = mac_servicer.Create(creation_request, self._ctx) 504*e7b1675dSTing-Kang Chang self.assertNotEmpty(creation_response.err) 505*e7b1675dSTing-Kang Chang 506*e7b1675dSTing-Kang Chang def test_generate_compute_verify_mac(self): 507*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 508*e7b1675dSTing-Kang Chang mac_servicer = services.MacServicer() 509*e7b1675dSTing-Kang Chang 510*e7b1675dSTing-Kang Chang template = mac.mac_key_templates.HMAC_SHA256_128BITTAG.SerializeToString() 511*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 512*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 513*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 514*e7b1675dSTing-Kang Chang keyset = gen_response.keyset 515*e7b1675dSTing-Kang Chang data = b'The quick brown fox jumps over the lazy dog' 516*e7b1675dSTing-Kang Chang comp_request = testing_api_pb2.ComputeMacRequest( 517*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 518*e7b1675dSTing-Kang Chang serialized_keyset=keyset), 519*e7b1675dSTing-Kang Chang data=data) 520*e7b1675dSTing-Kang Chang comp_response = mac_servicer.ComputeMac(comp_request, self._ctx) 521*e7b1675dSTing-Kang Chang self.assertEqual(comp_response.WhichOneof('result'), 'mac_value') 522*e7b1675dSTing-Kang Chang mac_value = comp_response.mac_value 523*e7b1675dSTing-Kang Chang verify_request = testing_api_pb2.VerifyMacRequest( 524*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 525*e7b1675dSTing-Kang Chang serialized_keyset=keyset), 526*e7b1675dSTing-Kang Chang mac_value=mac_value, 527*e7b1675dSTing-Kang Chang data=data) 528*e7b1675dSTing-Kang Chang verify_response = mac_servicer.VerifyMac(verify_request, self._ctx) 529*e7b1675dSTing-Kang Chang self.assertEmpty(verify_response.err) 530*e7b1675dSTing-Kang Chang 531*e7b1675dSTing-Kang Chang def test_generate_compute_verify_mac_fail(self): 532*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 533*e7b1675dSTing-Kang Chang mac_servicer = services.MacServicer() 534*e7b1675dSTing-Kang Chang 535*e7b1675dSTing-Kang Chang template = mac.mac_key_templates.HMAC_SHA256_128BITTAG.SerializeToString() 536*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 537*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 538*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 539*e7b1675dSTing-Kang Chang keyset = gen_response.keyset 540*e7b1675dSTing-Kang Chang 541*e7b1675dSTing-Kang Chang verify_request = testing_api_pb2.VerifyMacRequest( 542*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 543*e7b1675dSTing-Kang Chang serialized_keyset=keyset), 544*e7b1675dSTing-Kang Chang mac_value=b'invalid mac_value', 545*e7b1675dSTing-Kang Chang data=b'data') 546*e7b1675dSTing-Kang Chang verify_response = mac_servicer.VerifyMac(verify_request, self._ctx) 547*e7b1675dSTing-Kang Chang self.assertNotEmpty(verify_response.err) 548*e7b1675dSTing-Kang Chang 549*e7b1675dSTing-Kang Chang def test_create_hybrid_decrypt(self): 550*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 551*e7b1675dSTing-Kang Chang hybrid_servicer = services.HybridServicer() 552*e7b1675dSTing-Kang Chang 553*e7b1675dSTing-Kang Chang tp = hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM 554*e7b1675dSTing-Kang Chang template = tp.SerializeToString() 555*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 556*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 557*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 558*e7b1675dSTing-Kang Chang 559*e7b1675dSTing-Kang Chang creation_request = testing_api_pb2.CreationRequest( 560*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 561*e7b1675dSTing-Kang Chang serialized_keyset=gen_response.keyset)) 562*e7b1675dSTing-Kang Chang creation_response = hybrid_servicer.CreateHybridDecrypt( 563*e7b1675dSTing-Kang Chang creation_request, self._ctx) 564*e7b1675dSTing-Kang Chang self.assertEmpty(creation_response.err) 565*e7b1675dSTing-Kang Chang 566*e7b1675dSTing-Kang Chang def test_create_hybrid_decrypt_bad_keyset(self): 567*e7b1675dSTing-Kang Chang hybrid_servicer = services.HybridServicer() 568*e7b1675dSTing-Kang Chang 569*e7b1675dSTing-Kang Chang creation_request = testing_api_pb2.CreationRequest( 570*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 571*e7b1675dSTing-Kang Chang serialized_keyset=b'\x80')) 572*e7b1675dSTing-Kang Chang creation_response = hybrid_servicer.CreateHybridDecrypt( 573*e7b1675dSTing-Kang Chang creation_request, self._ctx) 574*e7b1675dSTing-Kang Chang self.assertNotEmpty(creation_response.err) 575*e7b1675dSTing-Kang Chang 576*e7b1675dSTing-Kang Chang def test_create_hybrid_encrypt(self): 577*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 578*e7b1675dSTing-Kang Chang hybrid_servicer = services.HybridServicer() 579*e7b1675dSTing-Kang Chang 580*e7b1675dSTing-Kang Chang tp = hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM 581*e7b1675dSTing-Kang Chang template = tp.SerializeToString() 582*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 583*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 584*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 585*e7b1675dSTing-Kang Chang pub_request = testing_api_pb2.KeysetPublicRequest( 586*e7b1675dSTing-Kang Chang private_keyset=gen_response.keyset) 587*e7b1675dSTing-Kang Chang pub_response = keyset_servicer.Public(pub_request, self._ctx) 588*e7b1675dSTing-Kang Chang self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset') 589*e7b1675dSTing-Kang Chang 590*e7b1675dSTing-Kang Chang creation_request = testing_api_pb2.CreationRequest( 591*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 592*e7b1675dSTing-Kang Chang serialized_keyset=pub_response.public_keyset)) 593*e7b1675dSTing-Kang Chang creation_response = hybrid_servicer.CreateHybridEncrypt( 594*e7b1675dSTing-Kang Chang creation_request, self._ctx) 595*e7b1675dSTing-Kang Chang self.assertEmpty(creation_response.err) 596*e7b1675dSTing-Kang Chang 597*e7b1675dSTing-Kang Chang def test_create_hybrid_encrypt_bad_keyset(self): 598*e7b1675dSTing-Kang Chang hybrid_servicer = services.HybridServicer() 599*e7b1675dSTing-Kang Chang 600*e7b1675dSTing-Kang Chang creation_request = testing_api_pb2.CreationRequest( 601*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 602*e7b1675dSTing-Kang Chang serialized_keyset=b'\x80')) 603*e7b1675dSTing-Kang Chang creation_response = hybrid_servicer.CreateHybridEncrypt( 604*e7b1675dSTing-Kang Chang creation_request, self._ctx) 605*e7b1675dSTing-Kang Chang self.assertNotEmpty(creation_response.err) 606*e7b1675dSTing-Kang Chang 607*e7b1675dSTing-Kang Chang def test_generate_hybrid_encrypt_decrypt(self): 608*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 609*e7b1675dSTing-Kang Chang hybrid_servicer = services.HybridServicer() 610*e7b1675dSTing-Kang Chang 611*e7b1675dSTing-Kang Chang tp = hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM 612*e7b1675dSTing-Kang Chang template = tp.SerializeToString() 613*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 614*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 615*e7b1675dSTing-Kang Chang self.assertEmpty(gen_response.err) 616*e7b1675dSTing-Kang Chang private_keyset = gen_response.keyset 617*e7b1675dSTing-Kang Chang 618*e7b1675dSTing-Kang Chang pub_request = testing_api_pb2.KeysetPublicRequest( 619*e7b1675dSTing-Kang Chang private_keyset=private_keyset) 620*e7b1675dSTing-Kang Chang pub_response = keyset_servicer.Public(pub_request, self._ctx) 621*e7b1675dSTing-Kang Chang self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset') 622*e7b1675dSTing-Kang Chang public_keyset = pub_response.public_keyset 623*e7b1675dSTing-Kang Chang 624*e7b1675dSTing-Kang Chang plaintext = b'The quick brown fox jumps over the lazy dog' 625*e7b1675dSTing-Kang Chang context_info = b'context_info' 626*e7b1675dSTing-Kang Chang enc_request = testing_api_pb2.HybridEncryptRequest( 627*e7b1675dSTing-Kang Chang public_annotated_keyset=testing_api_pb2.AnnotatedKeyset( 628*e7b1675dSTing-Kang Chang serialized_keyset=public_keyset), 629*e7b1675dSTing-Kang Chang plaintext=plaintext, 630*e7b1675dSTing-Kang Chang context_info=context_info) 631*e7b1675dSTing-Kang Chang enc_response = hybrid_servicer.Encrypt(enc_request, self._ctx) 632*e7b1675dSTing-Kang Chang self.assertEqual(enc_response.WhichOneof('result'), 'ciphertext') 633*e7b1675dSTing-Kang Chang ciphertext = enc_response.ciphertext 634*e7b1675dSTing-Kang Chang 635*e7b1675dSTing-Kang Chang dec_request = testing_api_pb2.HybridDecryptRequest( 636*e7b1675dSTing-Kang Chang private_annotated_keyset=testing_api_pb2.AnnotatedKeyset( 637*e7b1675dSTing-Kang Chang serialized_keyset=private_keyset), 638*e7b1675dSTing-Kang Chang ciphertext=ciphertext, 639*e7b1675dSTing-Kang Chang context_info=context_info) 640*e7b1675dSTing-Kang Chang dec_response = hybrid_servicer.Decrypt(dec_request, self._ctx) 641*e7b1675dSTing-Kang Chang self.assertEqual(dec_response.WhichOneof('result'), 'plaintext') 642*e7b1675dSTing-Kang Chang self.assertEqual(dec_response.plaintext, plaintext) 643*e7b1675dSTing-Kang Chang 644*e7b1675dSTing-Kang Chang def test_generate_hybrid_encrypt_decrypt_fail(self): 645*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 646*e7b1675dSTing-Kang Chang hybrid_servicer = services.HybridServicer() 647*e7b1675dSTing-Kang Chang 648*e7b1675dSTing-Kang Chang tp = hybrid.hybrid_key_templates.ECIES_P256_HKDF_HMAC_SHA256_AES128_GCM 649*e7b1675dSTing-Kang Chang template = tp.SerializeToString() 650*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 651*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 652*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 653*e7b1675dSTing-Kang Chang private_keyset = gen_response.keyset 654*e7b1675dSTing-Kang Chang 655*e7b1675dSTing-Kang Chang dec_request = testing_api_pb2.HybridDecryptRequest( 656*e7b1675dSTing-Kang Chang private_annotated_keyset=testing_api_pb2.AnnotatedKeyset( 657*e7b1675dSTing-Kang Chang serialized_keyset=private_keyset), 658*e7b1675dSTing-Kang Chang ciphertext=b'invalid ciphertext', 659*e7b1675dSTing-Kang Chang context_info=b'context_info') 660*e7b1675dSTing-Kang Chang dec_response = hybrid_servicer.Decrypt(dec_request, self._ctx) 661*e7b1675dSTing-Kang Chang self.assertEqual(dec_response.WhichOneof('result'), 'err') 662*e7b1675dSTing-Kang Chang self.assertNotEmpty(dec_response.err) 663*e7b1675dSTing-Kang Chang 664*e7b1675dSTing-Kang Chang def test_create_public_key_sign(self): 665*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 666*e7b1675dSTing-Kang Chang signature_servicer = services.SignatureServicer() 667*e7b1675dSTing-Kang Chang 668*e7b1675dSTing-Kang Chang template = signature.signature_key_templates.ECDSA_P256.SerializeToString() 669*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 670*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 671*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 672*e7b1675dSTing-Kang Chang 673*e7b1675dSTing-Kang Chang creation_request = testing_api_pb2.CreationRequest( 674*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 675*e7b1675dSTing-Kang Chang serialized_keyset=gen_response.keyset)) 676*e7b1675dSTing-Kang Chang creation_response = signature_servicer.CreatePublicKeySign( 677*e7b1675dSTing-Kang Chang creation_request, self._ctx) 678*e7b1675dSTing-Kang Chang self.assertEmpty(creation_response.err) 679*e7b1675dSTing-Kang Chang 680*e7b1675dSTing-Kang Chang def test_create_public_key_sign_bad_keyset(self): 681*e7b1675dSTing-Kang Chang signature_servicer = services.SignatureServicer() 682*e7b1675dSTing-Kang Chang 683*e7b1675dSTing-Kang Chang creation_request = testing_api_pb2.CreationRequest( 684*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 685*e7b1675dSTing-Kang Chang serialized_keyset=b'\x80')) 686*e7b1675dSTing-Kang Chang creation_response = signature_servicer.CreatePublicKeySign( 687*e7b1675dSTing-Kang Chang creation_request, self._ctx) 688*e7b1675dSTing-Kang Chang self.assertNotEmpty(creation_response.err) 689*e7b1675dSTing-Kang Chang 690*e7b1675dSTing-Kang Chang def test_create_public_key_verify(self): 691*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 692*e7b1675dSTing-Kang Chang signature_servicer = services.SignatureServicer() 693*e7b1675dSTing-Kang Chang 694*e7b1675dSTing-Kang Chang template = signature.signature_key_templates.ECDSA_P256.SerializeToString() 695*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 696*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 697*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 698*e7b1675dSTing-Kang Chang pub_request = testing_api_pb2.KeysetPublicRequest( 699*e7b1675dSTing-Kang Chang private_keyset=gen_response.keyset) 700*e7b1675dSTing-Kang Chang pub_response = keyset_servicer.Public(pub_request, self._ctx) 701*e7b1675dSTing-Kang Chang self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset') 702*e7b1675dSTing-Kang Chang 703*e7b1675dSTing-Kang Chang creation_request = testing_api_pb2.CreationRequest( 704*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 705*e7b1675dSTing-Kang Chang serialized_keyset=pub_response.public_keyset)) 706*e7b1675dSTing-Kang Chang creation_response = signature_servicer.CreatePublicKeyVerify( 707*e7b1675dSTing-Kang Chang creation_request, self._ctx) 708*e7b1675dSTing-Kang Chang self.assertEmpty(creation_response.err) 709*e7b1675dSTing-Kang Chang 710*e7b1675dSTing-Kang Chang def test_create_public_key_verify_bad_keyset(self): 711*e7b1675dSTing-Kang Chang signature_servicer = services.SignatureServicer() 712*e7b1675dSTing-Kang Chang 713*e7b1675dSTing-Kang Chang creation_request = testing_api_pb2.CreationRequest( 714*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 715*e7b1675dSTing-Kang Chang serialized_keyset=b'\x80')) 716*e7b1675dSTing-Kang Chang creation_response = signature_servicer.CreatePublicKeyVerify( 717*e7b1675dSTing-Kang Chang creation_request, self._ctx) 718*e7b1675dSTing-Kang Chang self.assertNotEmpty(creation_response.err) 719*e7b1675dSTing-Kang Chang 720*e7b1675dSTing-Kang Chang def test_sign_verify(self): 721*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 722*e7b1675dSTing-Kang Chang signature_servicer = services.SignatureServicer() 723*e7b1675dSTing-Kang Chang 724*e7b1675dSTing-Kang Chang template = signature.signature_key_templates.ECDSA_P256.SerializeToString() 725*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 726*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 727*e7b1675dSTing-Kang Chang 728*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 729*e7b1675dSTing-Kang Chang private_keyset = gen_response.keyset 730*e7b1675dSTing-Kang Chang 731*e7b1675dSTing-Kang Chang pub_request = testing_api_pb2.KeysetPublicRequest( 732*e7b1675dSTing-Kang Chang private_keyset=private_keyset) 733*e7b1675dSTing-Kang Chang pub_response = keyset_servicer.Public(pub_request, self._ctx) 734*e7b1675dSTing-Kang Chang self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset') 735*e7b1675dSTing-Kang Chang public_keyset = pub_response.public_keyset 736*e7b1675dSTing-Kang Chang 737*e7b1675dSTing-Kang Chang data = b'The quick brown fox jumps over the lazy dog' 738*e7b1675dSTing-Kang Chang 739*e7b1675dSTing-Kang Chang sign_request = testing_api_pb2.SignatureSignRequest( 740*e7b1675dSTing-Kang Chang private_annotated_keyset=testing_api_pb2.AnnotatedKeyset( 741*e7b1675dSTing-Kang Chang serialized_keyset=private_keyset), 742*e7b1675dSTing-Kang Chang data=data) 743*e7b1675dSTing-Kang Chang sign_response = signature_servicer.Sign(sign_request, self._ctx) 744*e7b1675dSTing-Kang Chang self.assertEqual(sign_response.WhichOneof('result'), 'signature') 745*e7b1675dSTing-Kang Chang a_signature = sign_response.signature 746*e7b1675dSTing-Kang Chang 747*e7b1675dSTing-Kang Chang verify_request = testing_api_pb2.SignatureVerifyRequest( 748*e7b1675dSTing-Kang Chang public_annotated_keyset=testing_api_pb2.AnnotatedKeyset( 749*e7b1675dSTing-Kang Chang serialized_keyset=public_keyset), 750*e7b1675dSTing-Kang Chang signature=a_signature, 751*e7b1675dSTing-Kang Chang data=data) 752*e7b1675dSTing-Kang Chang verify_response = signature_servicer.Verify(verify_request, self._ctx) 753*e7b1675dSTing-Kang Chang self.assertEmpty(verify_response.err) 754*e7b1675dSTing-Kang Chang 755*e7b1675dSTing-Kang Chang def test_sign_verify_fail(self): 756*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 757*e7b1675dSTing-Kang Chang signature_servicer = services.SignatureServicer() 758*e7b1675dSTing-Kang Chang 759*e7b1675dSTing-Kang Chang template = signature.signature_key_templates.ECDSA_P256.SerializeToString() 760*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 761*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 762*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 763*e7b1675dSTing-Kang Chang self.assertEmpty(gen_response.err) 764*e7b1675dSTing-Kang Chang private_keyset = gen_response.keyset 765*e7b1675dSTing-Kang Chang 766*e7b1675dSTing-Kang Chang pub_request = testing_api_pb2.KeysetPublicRequest( 767*e7b1675dSTing-Kang Chang private_keyset=private_keyset) 768*e7b1675dSTing-Kang Chang pub_response = keyset_servicer.Public(pub_request, self._ctx) 769*e7b1675dSTing-Kang Chang self.assertEqual(pub_response.WhichOneof('result'), 'public_keyset') 770*e7b1675dSTing-Kang Chang public_keyset = pub_response.public_keyset 771*e7b1675dSTing-Kang Chang 772*e7b1675dSTing-Kang Chang invalid_request = testing_api_pb2.SignatureVerifyRequest( 773*e7b1675dSTing-Kang Chang public_annotated_keyset=testing_api_pb2.AnnotatedKeyset( 774*e7b1675dSTing-Kang Chang serialized_keyset=public_keyset), 775*e7b1675dSTing-Kang Chang signature=b'invalid signature', 776*e7b1675dSTing-Kang Chang data=b'The quick brown fox jumps over the lazy dog') 777*e7b1675dSTing-Kang Chang invalid_response = signature_servicer.Verify(invalid_request, self._ctx) 778*e7b1675dSTing-Kang Chang self.assertNotEmpty(invalid_response.err) 779*e7b1675dSTing-Kang Chang 780*e7b1675dSTing-Kang Chang def test_create_prf_set(self): 781*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 782*e7b1675dSTing-Kang Chang prf_set_servicer = services.PrfSetServicer() 783*e7b1675dSTing-Kang Chang 784*e7b1675dSTing-Kang Chang template = prf.prf_key_templates.HMAC_SHA256.SerializeToString() 785*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 786*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 787*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 788*e7b1675dSTing-Kang Chang 789*e7b1675dSTing-Kang Chang creation_request = testing_api_pb2.CreationRequest( 790*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 791*e7b1675dSTing-Kang Chang serialized_keyset=gen_response.keyset)) 792*e7b1675dSTing-Kang Chang creation_response = prf_set_servicer.Create(creation_request, self._ctx) 793*e7b1675dSTing-Kang Chang self.assertEmpty(creation_response.err) 794*e7b1675dSTing-Kang Chang 795*e7b1675dSTing-Kang Chang def test_create_prf_set_wrong_keyset(self): 796*e7b1675dSTing-Kang Chang prf_set_servicer = services.PrfSetServicer() 797*e7b1675dSTing-Kang Chang 798*e7b1675dSTing-Kang Chang creation_request = testing_api_pb2.CreationRequest( 799*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 800*e7b1675dSTing-Kang Chang serialized_keyset=b'\x80')) 801*e7b1675dSTing-Kang Chang creation_response = prf_set_servicer.Create(creation_request, self._ctx) 802*e7b1675dSTing-Kang Chang self.assertNotEmpty(creation_response.err) 803*e7b1675dSTing-Kang Chang 804*e7b1675dSTing-Kang Chang def test_compute_prf(self): 805*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 806*e7b1675dSTing-Kang Chang prf_set_servicer = services.PrfSetServicer() 807*e7b1675dSTing-Kang Chang template = prf.prf_key_templates.HMAC_SHA256.SerializeToString() 808*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 809*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 810*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 811*e7b1675dSTing-Kang Chang keyset = gen_response.keyset 812*e7b1675dSTing-Kang Chang 813*e7b1675dSTing-Kang Chang key_ids_request = testing_api_pb2.PrfSetKeyIdsRequest( 814*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 815*e7b1675dSTing-Kang Chang serialized_keyset=keyset)) 816*e7b1675dSTing-Kang Chang key_ids_response = prf_set_servicer.KeyIds(key_ids_request, self._ctx) 817*e7b1675dSTing-Kang Chang self.assertEqual(key_ids_response.WhichOneof('result'), 'output') 818*e7b1675dSTing-Kang Chang self.assertLen(key_ids_response.output.key_id, 1) 819*e7b1675dSTing-Kang Chang self.assertEqual(key_ids_response.output.key_id[0], 820*e7b1675dSTing-Kang Chang key_ids_response.output.primary_key_id) 821*e7b1675dSTing-Kang Chang 822*e7b1675dSTing-Kang Chang output_length = 31 823*e7b1675dSTing-Kang Chang compute_request = testing_api_pb2.PrfSetComputeRequest( 824*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 825*e7b1675dSTing-Kang Chang serialized_keyset=keyset), 826*e7b1675dSTing-Kang Chang key_id=key_ids_response.output.primary_key_id, 827*e7b1675dSTing-Kang Chang input_data=b'input_data', 828*e7b1675dSTing-Kang Chang output_length=output_length) 829*e7b1675dSTing-Kang Chang compute_response = prf_set_servicer.Compute(compute_request, self._ctx) 830*e7b1675dSTing-Kang Chang self.assertEqual(compute_response.WhichOneof('result'), 'output') 831*e7b1675dSTing-Kang Chang self.assertLen(compute_response.output, output_length) 832*e7b1675dSTing-Kang Chang 833*e7b1675dSTing-Kang Chang def test_key_ids_prf_fail(self): 834*e7b1675dSTing-Kang Chang prf_set_servicer = services.PrfSetServicer() 835*e7b1675dSTing-Kang Chang invalid_key_ids_response = prf_set_servicer.KeyIds( 836*e7b1675dSTing-Kang Chang testing_api_pb2.PrfSetKeyIdsRequest( 837*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 838*e7b1675dSTing-Kang Chang serialized_keyset=b'badkeyset')), self._ctx) 839*e7b1675dSTing-Kang Chang self.assertNotEmpty(invalid_key_ids_response.err) 840*e7b1675dSTing-Kang Chang 841*e7b1675dSTing-Kang Chang def test_compute_prf_fail(self): 842*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 843*e7b1675dSTing-Kang Chang prf_set_servicer = services.PrfSetServicer() 844*e7b1675dSTing-Kang Chang template = prf.prf_key_templates.HMAC_SHA256.SerializeToString() 845*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 846*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 847*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 848*e7b1675dSTing-Kang Chang keyset = gen_response.keyset 849*e7b1675dSTing-Kang Chang key_ids_request = testing_api_pb2.PrfSetKeyIdsRequest( 850*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 851*e7b1675dSTing-Kang Chang serialized_keyset=keyset)) 852*e7b1675dSTing-Kang Chang key_ids_response = prf_set_servicer.KeyIds(key_ids_request, self._ctx) 853*e7b1675dSTing-Kang Chang self.assertEqual(key_ids_response.WhichOneof('result'), 'output') 854*e7b1675dSTing-Kang Chang primary_key_id = key_ids_response.output.primary_key_id 855*e7b1675dSTing-Kang Chang 856*e7b1675dSTing-Kang Chang invalid_output_length = 123456 857*e7b1675dSTing-Kang Chang invalid_compute_request = testing_api_pb2.PrfSetComputeRequest( 858*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 859*e7b1675dSTing-Kang Chang serialized_keyset=keyset), 860*e7b1675dSTing-Kang Chang key_id=primary_key_id, 861*e7b1675dSTing-Kang Chang input_data=b'input_data', 862*e7b1675dSTing-Kang Chang output_length=invalid_output_length) 863*e7b1675dSTing-Kang Chang invalid_compute_response = prf_set_servicer.Compute(invalid_compute_request, 864*e7b1675dSTing-Kang Chang self._ctx) 865*e7b1675dSTing-Kang Chang self.assertEqual(invalid_compute_response.WhichOneof('result'), 'err') 866*e7b1675dSTing-Kang Chang self.assertNotEmpty(invalid_compute_response.err) 867*e7b1675dSTing-Kang Chang 868*e7b1675dSTing-Kang Chang def test_create_streaming_aead(self): 869*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 870*e7b1675dSTing-Kang Chang streaming_aead_servicer = services.StreamingAeadServicer() 871*e7b1675dSTing-Kang Chang 872*e7b1675dSTing-Kang Chang templates = streaming_aead.streaming_aead_key_templates 873*e7b1675dSTing-Kang Chang template = templates.AES128_CTR_HMAC_SHA256_4KB.SerializeToString() 874*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 875*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 876*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 877*e7b1675dSTing-Kang Chang 878*e7b1675dSTing-Kang Chang creation_request = testing_api_pb2.CreationRequest( 879*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 880*e7b1675dSTing-Kang Chang serialized_keyset=gen_response.keyset)) 881*e7b1675dSTing-Kang Chang creation_response = streaming_aead_servicer.Create( 882*e7b1675dSTing-Kang Chang creation_request, self._ctx) 883*e7b1675dSTing-Kang Chang self.assertEmpty(creation_response.err) 884*e7b1675dSTing-Kang Chang 885*e7b1675dSTing-Kang Chang def test_create_streaming_aead_broken_keyset(self): 886*e7b1675dSTing-Kang Chang streaming_aead_servicer = services.StreamingAeadServicer() 887*e7b1675dSTing-Kang Chang 888*e7b1675dSTing-Kang Chang creation_request = testing_api_pb2.CreationRequest( 889*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 890*e7b1675dSTing-Kang Chang serialized_keyset=b'\x80')) 891*e7b1675dSTing-Kang Chang creation_response = streaming_aead_servicer.Create(creation_request, 892*e7b1675dSTing-Kang Chang self._ctx) 893*e7b1675dSTing-Kang Chang self.assertNotEmpty(creation_response.err) 894*e7b1675dSTing-Kang Chang 895*e7b1675dSTing-Kang Chang def test_generate_streaming_encrypt_decrypt(self): 896*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 897*e7b1675dSTing-Kang Chang streaming_aead_servicer = services.StreamingAeadServicer() 898*e7b1675dSTing-Kang Chang 899*e7b1675dSTing-Kang Chang templates = streaming_aead.streaming_aead_key_templates 900*e7b1675dSTing-Kang Chang template = templates.AES128_CTR_HMAC_SHA256_4KB.SerializeToString() 901*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 902*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 903*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 904*e7b1675dSTing-Kang Chang keyset = gen_response.keyset 905*e7b1675dSTing-Kang Chang plaintext = b'The quick brown fox jumps over the lazy dog' 906*e7b1675dSTing-Kang Chang associated_data = b'associated_data' 907*e7b1675dSTing-Kang Chang 908*e7b1675dSTing-Kang Chang enc_request = testing_api_pb2.StreamingAeadEncryptRequest( 909*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 910*e7b1675dSTing-Kang Chang serialized_keyset=keyset), 911*e7b1675dSTing-Kang Chang plaintext=plaintext, 912*e7b1675dSTing-Kang Chang associated_data=associated_data) 913*e7b1675dSTing-Kang Chang enc_response = streaming_aead_servicer.Encrypt(enc_request, self._ctx) 914*e7b1675dSTing-Kang Chang self.assertEqual(enc_response.WhichOneof('result'), 'ciphertext') 915*e7b1675dSTing-Kang Chang ciphertext = enc_response.ciphertext 916*e7b1675dSTing-Kang Chang 917*e7b1675dSTing-Kang Chang dec_request = testing_api_pb2.StreamingAeadDecryptRequest( 918*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 919*e7b1675dSTing-Kang Chang serialized_keyset=keyset), 920*e7b1675dSTing-Kang Chang ciphertext=ciphertext, 921*e7b1675dSTing-Kang Chang associated_data=associated_data) 922*e7b1675dSTing-Kang Chang dec_response = streaming_aead_servicer.Decrypt(dec_request, self._ctx) 923*e7b1675dSTing-Kang Chang self.assertEqual(dec_response.WhichOneof('result'), 'plaintext') 924*e7b1675dSTing-Kang Chang 925*e7b1675dSTing-Kang Chang self.assertEqual(dec_response.plaintext, plaintext) 926*e7b1675dSTing-Kang Chang 927*e7b1675dSTing-Kang Chang def test_generate_streaming_decrypt_fail(self): 928*e7b1675dSTing-Kang Chang keyset_servicer = services.KeysetServicer() 929*e7b1675dSTing-Kang Chang streaming_aead_servicer = services.StreamingAeadServicer() 930*e7b1675dSTing-Kang Chang 931*e7b1675dSTing-Kang Chang templates = streaming_aead.streaming_aead_key_templates 932*e7b1675dSTing-Kang Chang template = templates.AES128_CTR_HMAC_SHA256_4KB.SerializeToString() 933*e7b1675dSTing-Kang Chang gen_request = testing_api_pb2.KeysetGenerateRequest(template=template) 934*e7b1675dSTing-Kang Chang gen_response = keyset_servicer.Generate(gen_request, self._ctx) 935*e7b1675dSTing-Kang Chang self.assertEqual(gen_response.WhichOneof('result'), 'keyset') 936*e7b1675dSTing-Kang Chang keyset = gen_response.keyset 937*e7b1675dSTing-Kang Chang 938*e7b1675dSTing-Kang Chang ciphertext = b'some invalid ciphertext' 939*e7b1675dSTing-Kang Chang associated_data = b'associated_data' 940*e7b1675dSTing-Kang Chang dec_request = testing_api_pb2.StreamingAeadDecryptRequest( 941*e7b1675dSTing-Kang Chang annotated_keyset=testing_api_pb2.AnnotatedKeyset( 942*e7b1675dSTing-Kang Chang serialized_keyset=keyset), 943*e7b1675dSTing-Kang Chang ciphertext=ciphertext, 944*e7b1675dSTing-Kang Chang associated_data=associated_data) 945*e7b1675dSTing-Kang Chang dec_response = streaming_aead_servicer.Decrypt(dec_request, self._ctx) 946*e7b1675dSTing-Kang Chang self.assertEqual(dec_response.WhichOneof('result'), 'err') 947*e7b1675dSTing-Kang Chang self.assertNotEmpty(dec_response.err) 948*e7b1675dSTing-Kang Chang 949*e7b1675dSTing-Kang Chang 950*e7b1675dSTing-Kang Changif __name__ == '__main__': 951*e7b1675dSTing-Kang Chang absltest.main() 952