1# Copyright 2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Cross-language tests for reading and writing encrypted keysets.""" 15 16from typing import Iterable, Tuple, Optional 17 18from absl.testing import absltest 19from absl.testing import parameterized 20import tink 21from tink import aead 22from tink.proto import tink_pb2 23from util import key_util 24from util import testing_servers 25from google.protobuf import json_format 26from google.protobuf import text_format 27 28# Contains keys with different status and output_prefix_type 29SYMMETRIC_KEYSET = r""" 30primary_key_id: 2178398476 31key { 32 key_data { 33 type_url: "type.googleapis.com/google.crypto.tink.HmacKey" 34 value: "\032 \216\300\2643\375\353)\347?\034q\006\325~\322\377\365\364\202\205\320m\005\327Y\3213\213\217i>\034\022\004\020\020\010\003" 35 key_material_type: SYMMETRIC 36 } 37 status: ENABLED 38 key_id: 2178398476 39 output_prefix_type: TINK 40} 41key { 42 key_data { 43 type_url: "type.googleapis.com/google.crypto.tink.HmacKey" 44 value: "\032@\212}\023kK\247.\300\030\377 \351\321\234}rFuJ\367\201\260b)0\271k\001v,\0346D\363mM\255\272\317\007\340M\225d\270[\210\262\362\352\3544&\037\005(\370\320\031\335}\311\374\n\022\004\020 \010\004" 45 key_material_type: SYMMETRIC 46 } 47 status: DISABLED 48 key_id: 1021124131 49 output_prefix_type: LEGACY 50} 51key { 52 key_data { 53 type_url: "type.googleapis.com/google.crypto.tink.HmacKey" 54 value: "\032 \312\272\026\243]t\023\024\310\"\2331\361c\r\202\372\363o\260\335\274\2726#\365\034yU\365)\264\022\004\020\020\010\003" 55 key_material_type: SYMMETRIC 56 } 57 status: ENABLED 58 key_id: 1531888792 59 output_prefix_type: CRUNCHY 60} 61key { 62 key_data { 63 type_url: "type.googleapis.com/google.crypto.tink.HmacKey" 64 value: "\032 \363q\0337,\254\303\215$\370yR\304`\206uf{V\243\271\367\254\351\034\020\247M\'\240+\320\022\004\020\020\010\003" 65 key_material_type: SYMMETRIC 66 } 67 status: DESTROYED 68 key_id: 3173753038 69 output_prefix_type: RAW 70} 71""" 72 73PRIVATE_KEYSET = r""" 74primary_key_id: 3858784341 75key { 76 key_data { 77 type_url: "type.googleapis.com/google.crypto.tink.EcdsaPrivateKey" 78 value: "\032 \021U\231BC\265\337\020$\351n\336di\245\245\371\004-\215k\214\262\344*\306\224\367\360I\317\330\022L\" e\356\202K\367I{\247T\314o\032\222\000\267\266\024\263u\234H\236<\374\340sDK<;6\242\032 c\264\n\200\340\317\001\351\352\372\305\345\371i\3625\200\305 \367\257\335\256\221\313\313\263\036!\270\305\020\022\006\030\002\020\002\010\003" 79 key_material_type: ASYMMETRIC_PRIVATE 80 } 81 status: ENABLED 82 key_id: 3858784341 83 output_prefix_type: TINK 84} 85key { 86 key_data { 87 type_url: "type.googleapis.com/google.crypto.tink.EcdsaPrivateKey" 88 value: "\032 :]\010\201.YK\214\372\302P}\250\354Q\246\322(\216\213\345T\316Lp\013\037#\347\316Sr\022L\" M\014\237i\213\010\252\246\216\342\222\374\026\303\334\010u4\357\323\332\227\250\177\336\216|\217\264\3424\207\032 \013\367.n\035\323\274\337\350\252\233\214)\007\347!\327\313B\223\336jp\251\035\371\247h\014\272\357\317\022\006\030\002\020\002\010\003" 89 key_material_type: ASYMMETRIC_PRIVATE 90 } 91 status: ENABLED 92 key_id: 465053161 93 output_prefix_type: RAW 94} 95""" 96 97PUBLIC_KEYSET = r""" 98primary_key_id: 768876193 99key { 100 key_data { 101 type_url: "type.googleapis.com/google.crypto.tink.EcdsaPublicKey" 102 value: "\" \336\302\324\330=\026.|\\\224\314A\301Ka\241\324{\035\210Tp\222\306\263\317\236\307\032q\010\252\032 }\261\033x\347Gx\224&V\314hx\000\217Q\272G\361b\302\346Fb?r\334\223w\304y\325\022\006\030\002\020\002\010\003" 103 key_material_type: ASYMMETRIC_PUBLIC 104 } 105 status: ENABLED 106 key_id: 768876193 107 output_prefix_type: TINK 108} 109""" 110 111TEST_KEYSETS = [ 112 ('symmetric', SYMMETRIC_KEYSET), 113 ('private', PRIVATE_KEYSET), 114 ('public', PUBLIC_KEYSET), 115] 116 117 118def setUpModule(): 119 testing_servers.start('keyset_read_write') 120 121 122def tearDownModule(): 123 testing_servers.stop() 124 125 126def read_write_encrypted_test_cases( 127) -> Iterable[Tuple[str, bytes, str, str, str, str, Optional[bytes]]]: 128 """Yields (test_name, test_parameters...) tuples to test.""" 129 for keyset_name, keyset_text_proto in TEST_KEYSETS: 130 keyset_proto = text_format.Parse(keyset_text_proto, tink_pb2.Keyset()) 131 keyset = keyset_proto.SerializeToString() 132 for write_lang in testing_servers.LANGUAGES: 133 for read_lang in testing_servers.LANGUAGES: 134 for associated_data in [None, b'', b'associated_data']: 135 yield ('_bin_%s, r in %s, w in %s, ad=%s' % 136 (keyset_name, read_lang, write_lang, associated_data), keyset, 137 read_lang, 'KEYSET_READER_BINARY', write_lang, 138 'KEYSET_WRITER_BINARY', associated_data) 139 yield ('_json_%s, r in %s, w in %s, ad=%s' % 140 (keyset_name, write_lang, read_lang, associated_data), keyset, 141 read_lang, 'KEYSET_READER_JSON', write_lang, 142 'KEYSET_WRITER_JSON', associated_data) 143 144 145class KeysetReadWriteTest(parameterized.TestCase): 146 147 @parameterized.named_parameters(TEST_KEYSETS) 148 def test_to_from_json(self, keyset_text_proto): 149 keyset_proto = text_format.Parse(keyset_text_proto, tink_pb2.Keyset()) 150 keyset = keyset_proto.SerializeToString() 151 for to_lang in testing_servers.LANGUAGES: 152 json_keyset = testing_servers.keyset_to_json(to_lang, keyset) 153 for from_lang in testing_servers.LANGUAGES: 154 keyset_from_json = testing_servers.keyset_from_json( 155 from_lang, json_keyset) 156 key_util.assert_tink_proto_equal( 157 self, 158 tink_pb2.Keyset.FromString(keyset), 159 tink_pb2.Keyset.FromString(keyset_from_json), 160 msg=('keysets are not equal when converting to JSON in ' 161 '%s and back in %s' % (to_lang, from_lang))) 162 163 @parameterized.named_parameters(read_write_encrypted_test_cases()) 164 def test_read_write_encrypted_keyset(self, keyset, read_lang, reader_type, 165 write_lang, writer_type, 166 associated_data): 167 # Use an arbitrary AEAD template that's supported in all languages, 168 # and use an arbitrary language to generate the keyset_encryption_keyset. 169 keyset_encryption_keyset = testing_servers.new_keyset( 170 'cc', aead.aead_key_templates.AES128_GCM) 171 172 encrypted_keyset = testing_servers.keyset_write_encrypted( 173 write_lang, keyset, keyset_encryption_keyset, associated_data, 174 writer_type) 175 decrypted_keyset = testing_servers.keyset_read_encrypted( 176 read_lang, encrypted_keyset, keyset_encryption_keyset, 177 associated_data, reader_type) 178 # Both keyset and decrypted_keyset are serialized tink_pb2.Keyset. 179 key_util.assert_tink_proto_equal( 180 self, tink_pb2.Keyset.FromString(keyset), 181 tink_pb2.Keyset.FromString(decrypted_keyset)) 182 183 with self.assertRaises(tink.TinkError): 184 testing_servers.keyset_read_encrypted(read_lang, encrypted_keyset, 185 keyset_encryption_keyset, 186 b'invalid_associated_data', 187 reader_type) 188 189 @parameterized.parameters(testing_servers.LANGUAGES) 190 def test_read_encrypted_ignores_keyset_info_binary(self, lang): 191 # Use an arbitrary AEAD template that's supported in all languages, 192 # and use an arbitrary language to generate the keyset_encryption_keyset. 193 keyset_encryption_keyset = testing_servers.new_keyset( 194 'cc', aead.aead_key_templates.AES128_GCM) 195 # Also, generate an arbitrary keyset. 196 keyset = testing_servers.new_keyset('cc', 197 aead.aead_key_templates.AES128_GCM) 198 associated_data = b'associated_data' 199 200 encrypted_keyset = testing_servers.keyset_write_encrypted( 201 lang, keyset, keyset_encryption_keyset, associated_data, 202 'KEYSET_WRITER_BINARY') 203 204 # encrypted_keyset is a serialized tink_pb2.EncryptedKeyset 205 parsed_encrypted_keyset = tink_pb2.EncryptedKeyset.FromString( 206 encrypted_keyset) 207 208 # Note that some implementations (currently C++) do not set keyset_info. 209 # But we require that values are correct when they are set. 210 if parsed_encrypted_keyset.HasField('keyset_info'): 211 self.assertLen(parsed_encrypted_keyset.keyset_info.key_info, 1) 212 self.assertEqual(parsed_encrypted_keyset.keyset_info.primary_key_id, 213 parsed_encrypted_keyset.keyset_info.key_info[0].key_id) 214 215 # keyset_info should be ignored when reading a keyset. 216 # to test this, we add something invalid and check that read still works. 217 parsed_encrypted_keyset.keyset_info.key_info.append( 218 tink_pb2.KeysetInfo.KeyInfo(type_url='invalid', key_id=123)) 219 modified_encrypted_keyset = parsed_encrypted_keyset.SerializeToString() 220 221 decrypted_keyset = testing_servers.keyset_read_encrypted( 222 lang, modified_encrypted_keyset, keyset_encryption_keyset, 223 associated_data, 'KEYSET_READER_BINARY') 224 # Both keyset and decrypted_keyset are serialized tink_pb2.Keyset. 225 key_util.assert_tink_proto_equal( 226 self, tink_pb2.Keyset.FromString(keyset), 227 tink_pb2.Keyset.FromString(decrypted_keyset)) 228 229 @parameterized.parameters(testing_servers.LANGUAGES) 230 def test_read_encrypted_ignores_keyset_info_json(self, lang): 231 # Use an arbitrary AEAD template that's supported in all languages, 232 # and use an arbitrary language to generate the keyset_encryption_keyset. 233 keyset_encryption_keyset = testing_servers.new_keyset( 234 'cc', aead.aead_key_templates.AES128_GCM) 235 # Also, generate an arbitrary keyset. 236 keyset = testing_servers.new_keyset('cc', 237 aead.aead_key_templates.AES128_GCM) 238 associated_data = b'associated_data' 239 240 encrypted_keyset = testing_servers.keyset_write_encrypted( 241 lang, keyset, keyset_encryption_keyset, associated_data, 242 'KEYSET_WRITER_JSON') 243 244 # encrypted_keyset is a JSON serialized tink_pb2.EncryptedKeyset 245 parsed_encrypted_keyset = json_format.Parse(encrypted_keyset, 246 tink_pb2.EncryptedKeyset()) 247 248 # Note that some implementations (currently C++) do not set keyset_info. 249 # But we require that values are correct when they are set. 250 if parsed_encrypted_keyset.HasField('keyset_info'): 251 self.assertLen(parsed_encrypted_keyset.keyset_info.key_info, 1) 252 self.assertEqual(parsed_encrypted_keyset.keyset_info.primary_key_id, 253 parsed_encrypted_keyset.keyset_info.key_info[0].key_id) 254 255 # keyset_info should be ignored when reading a keyset. 256 # To test this, we add something invalid and check that read still works. 257 # Some languages (C++ and Java) however do check that the fields of 258 # keyset_info are present. So we have to set all required fields here. 259 parsed_encrypted_keyset.keyset_info.key_info.append( 260 tink_pb2.KeysetInfo.KeyInfo( 261 type_url='invalid', 262 status=tink_pb2.ENABLED, 263 key_id=123, 264 output_prefix_type=tink_pb2.LEGACY)) 265 parsed_encrypted_keyset.keyset_info.primary_key_id = 123 266 modified_encrypted_keyset = json_format.MessageToJson( 267 parsed_encrypted_keyset).encode('utf8') 268 269 decrypted_keyset = testing_servers.keyset_read_encrypted( 270 lang, modified_encrypted_keyset, keyset_encryption_keyset, 271 associated_data, 'KEYSET_READER_JSON') 272 # Both keyset and decrypted_keyset are serialized tink_pb2.Keyset. 273 key_util.assert_tink_proto_equal( 274 self, tink_pb2.Keyset.FromString(keyset), 275 tink_pb2.Keyset.FromString(decrypted_keyset)) 276 277if __name__ == '__main__': 278 absltest.main() 279