1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# Lint as: python3 16 17"""Generates a Python test case from a snoop log.""" 18 19import json 20import math 21import os 22 23from parse_log import FullApduEntry, NfcType, PollingLoopEntry 24 25INDENT_SIZE = 4 26 27 28def generate_test( 29 log: list[FullApduEntry | PollingLoopEntry], name: str 30) -> str: 31 """Generates a Python test case from a snoop log parsed by the replay tool. 32 33 The generated test will be placed in the current directory. 34 35 Args: 36 log: The parsed snoop log. 37 name: The name of the file containing the snoop log. 38 39 Returns: 40 The name of the JSON file containing APDUs needed to run the test. 41 """ 42 # The name of the test file is based on the name of the snoop log 43 python_local_file = name + "_test.py" 44 file_path = ( 45 os.path.dirname(os.path.realpath(__file__)) + "/" + python_local_file 46 ) 47 48 try: 49 file = open(file_path, "wt") 50 except Exception as e: 51 raise RuntimeError( 52 "Error occurred while opening file: {}".format(file_path) 53 ) from e 54 file.write(create_imports()) 55 file.write(create_replace_aids_method()) 56 file.write(create_polling_loop_methods()) 57 file.write(create_apdu_exchange_method()) 58 file.write(create_setup()) 59 file.write(create_test_opening(name)) 60 61 last_timestamp = log[0].ts 62 json_list = [] 63 for entry in log: 64 if isinstance(entry, PollingLoopEntry): 65 file.write(create_polling_loop_test(entry, last_timestamp)) 66 else: # isinstance(entry, FullApduEntry): 67 file.write(create_apdu_test(entry, last_timestamp)) 68 json_list.append(create_apdu_dict(entry)) 69 last_timestamp = entry.ts 70 71 json_dump = json.dumps(json_list) 72 apdu_local_file = name + "_apdus.json" 73 apdu_file_path = ( 74 os.path.dirname(os.path.realpath(__file__)) + "/" + apdu_local_file 75 ) 76 apdu_file = open(apdu_file_path, "wt") 77 apdu_file.write(json_dump) 78 79 file.write(create_teardown_test()) 80 file.write(create_main_function()) 81 82 print() 83 print( 84 "Test generated at {}. To run the test, copy the test file to" 85 " packages/apps/Nfc/tests/testcases/multidevices/.".format(file_path) 86 ) 87 update_android_bp(python_local_file, name) 88 89 return apdu_local_file 90 91 92def update_android_bp(local_file_path, test_name): 93 """Creates a new python_test_host entry in Android.bp for the generated test.""" 94 try: 95 android_bp = open("Android.bp", "a") 96 except Exception as e: 97 raise RuntimeError("Error occurred while opening Android.bp") from e 98 99 s = create_line() 100 s += create_line() 101 s += create_line("python_test_host {") 102 s += create_line('name: "{}",'.format(test_name), indent=1) 103 s += create_line('main: "{}",'.format(local_file_path), indent=1) 104 s += create_line('srcs: ["{}"],'.format(local_file_path), indent=1) 105 s += create_line('test_config: "AndroidTest.xml",', indent=1) 106 s += create_line("test_options: {", indent=1) 107 s += create_line("unit_test: false,", indent=2) 108 s += create_line('tags: ["mobly"],', indent=2) 109 s += create_line("},", indent=1) 110 s += create_line('defaults: ["GeneratedTestsPythonDefaults"],', indent=1) 111 s += create_line("}") 112 android_bp.write(s) 113 114 115def create_apdu_dict(entry: FullApduEntry): 116 """Creates a dictionary representation of an APDU entry.""" 117 command_arr = [] 118 for cmd in entry.command: 119 command_arr.append(cmd.hex()) 120 response_arr = [] 121 for rsp in entry.response: 122 if isinstance(rsp, str): 123 response_arr.append(rsp) 124 else: 125 response_arr.append(rsp.hex()) 126 apdu_dict = { 127 "commands": command_arr, 128 "responses": response_arr, 129 } 130 return apdu_dict 131 132 133def create_test_opening(name: str): 134 """Creates the opening of the test file.""" 135 s = create_line("def test_{}(self):".format(name), indent=1) 136 s += create_line("# Read in APDU commands and responses from file", indent=2) 137 s += create_line( 138 'file_path_name = self.user_params.get("file_path", "")', indent=2 139 ) 140 s += create_line("apdu_cmds = []", indent=2) 141 s += create_line("apdu_rsps = []", indent=2) 142 s += create_line("if file_path_name:", indent=2) 143 s += create_line('with open(file_path_name, "r") as json_data:', indent=3) 144 s += create_line("d = json.load(json_data)", indent=4) 145 s += create_line("for entry in d:", indent=4) 146 s += create_line("apdu_cmds.append(", indent=5) 147 s += create_line( 148 '[bytearray.fromhex(cmd) for cmd in entry["commands"]]', indent=6 149 ) 150 s += create_line(")", indent=5) 151 s += create_line("apdu_rsps.append(", indent=5) 152 s += create_line( 153 '[bytearray.fromhex(rsp) for rsp in entry["responses"]]', indent=6 154 ) 155 s += create_line(")", indent=5) 156 s += create_line() 157 return s 158 159 160def create_polling_loop_test(entry: PollingLoopEntry, last_timestamp: int): 161 """Adds code to send a polling loop from the reader to the emulator. 162 163 The test will check to ensure that the polling loop is successfully received. 164 """ 165 s = create_line( 166 "# Sending {} polling loop".format(entry.type), 167 indent=2, 168 ) 169 170 sleep_time = calculate_time_to_sleep(entry.ts, last_timestamp) 171 s += create_line("time.sleep({})".format(sleep_time), indent=2) 172 173 match entry.type: 174 case NfcType.NFC_A: 175 s += create_line("saw_loop = send_polling_loop_a(self.reader)", indent=2) 176 case NfcType.NFC_B: 177 s += create_line("saw_loop = send_polling_loop_b(self.reader)", indent=2) 178 case _: # NfcType.UNKNOWN 179 s += create_line('custom_data = "{}"'.format(entry.data.hex()), indent=2) 180 s += create_line( 181 "saw_loop = send_custom_polling_loop(self.reader, custom_data)", 182 indent=2, 183 ) 184 s += create_line( 185 'asserts.assert_true(saw_loop, "Did not see polling loop")', indent=2 186 ) 187 s += create_line() 188 return s 189 190 191def create_apdu_test(entry: FullApduEntry, last_timestamp: int): 192 """Adds code to conduct an APDU exchange between the reader and emulator. 193 194 The test will check to ensure that the expected response is received from the 195 emulator. 196 """ 197 s = create_line("# Conducting APDU exchange", indent=2) 198 199 sleep_time = calculate_time_to_sleep(entry.ts, last_timestamp) 200 s += create_line("time.sleep({})".format(sleep_time), indent=2) 201 202 s += create_line("commands = apdu_cmds[0]", indent=2) 203 s += create_line("if self.with_emulator_app:", indent=2) 204 s += create_line("commands = replace_aids(commands)", indent=3) 205 s += create_line("responses = apdu_rsps[0]", indent=2) 206 s += create_line( 207 "tag_found, transacted = conduct_apdu_exchange(self.reader, commands," 208 " responses)", 209 indent=2, 210 ) 211 s += create_line() 212 s += create_line("asserts.assert_true(", indent=2) 213 s += create_line( 214 'tag_found, "Reader did not detect tag, transaction not attempted."', 215 indent=3, 216 ) 217 s += create_line(")", indent=2) 218 s += create_line("asserts.assert_true(", indent=2) 219 s += create_line("transacted,", indent=3) 220 s += create_line( 221 '"Transaction failed, check device logs for more information."', indent=3 222 ) 223 s += create_line(")", indent=2) 224 s += create_line() 225 s += create_line("apdu_cmds.pop(0)", indent=2) 226 s += create_line("apdu_rsps.pop(0)", indent=2) 227 s += create_line() 228 return s 229 230 231def create_imports(): 232 s = create_line('"""Test generated from the NFC Replay Tool."""') 233 s += create_line() 234 s += create_line("import json") 235 s += create_line("import time") 236 s += create_line("from mobly import asserts") 237 s += create_line("from mobly import base_test") 238 s += create_line("from mobly import test_runner") 239 s += create_line("from mobly.controllers import android_device") 240 s += create_line("import pn532") 241 s += create_line() 242 s += create_line("# Number of polling loops to perform.") 243 s += create_line("_NUM_POLLING_LOOPS = 50") 244 s += create_line() 245 return s 246 247 248def create_replace_aids_method(): 249 """Create a method that replaces the AIDs sent by the test with the ones 250 251 that are used by the emulator app. 252 """ 253 s = create_line("_EMULATOR_AIDS = [") 254 s += create_line('bytearray.fromhex("00a4040008a000000151000000"),', indent=1) 255 s += create_line('bytearray.fromhex("00a4040008a000000003000000"),', indent=1) 256 s += create_line("]") 257 s += create_line() 258 s += create_line() 259 s += create_line("def replace_aids(commands: list[bytearray]):") 260 s += create_line( 261 '"""Replaces SELECT AID commands with AIDs from the emulator app."""', 262 indent=1, 263 ) 264 s += create_line("new_commands = []", indent=1) 265 s += create_line("is_first_aid = True", indent=1) 266 s += create_line("for command in commands:", indent=1) 267 s += create_line( 268 'if command.startswith(bytearray.fromhex("00a40400")):', indent=2 269 ) 270 s += create_line("if is_first_aid:", indent=3) 271 s += create_line( 272 "new_commands.append(_EMULATOR_AIDS[0])", 273 indent=4, 274 ) 275 s += create_line("is_first_aid = False", indent=4) 276 s += create_line("else:", indent=3) 277 s += create_line("new_commands.append(_EMULATOR_AIDS[1])", indent=4) 278 s += create_line("else:", indent=2) 279 s += create_line("new_commands.append(command)", indent=3) 280 s += create_line("return new_commands", indent=1) 281 return s 282 283 284def create_polling_loop_methods(): 285 """Create methods that send polling loops to the reader. 286 287 Specifically, three methods are created: send_polling_loop_a(), 288 send_polling_loop_b(), and send_custom_polling_loop(). 289 """ 290 s = create_line() 291 s += create_line() 292 s += create_line("def send_polling_loop_a(reader: pn532.PN532) -> bool:") 293 s += create_line("saw_loop = False", indent=1) 294 s += create_line("for i in range(_NUM_POLLING_LOOPS):", indent=1) 295 s += create_line("tag = reader.poll_a()", indent=2) 296 s += create_line("if tag is not None:", indent=2) 297 s += create_line("saw_loop = True", indent=3) 298 s += create_line("break", indent=3) 299 s += create_line("reader.mute()", indent=2) 300 s += create_line("return saw_loop", indent=1) 301 s += create_line() 302 s += create_line() 303 s += create_line("def send_polling_loop_b(reader: pn532.PN532) -> bool:") 304 s += create_line("saw_loop = False", indent=1) 305 s += create_line("for i in range(_NUM_POLLING_LOOPS):", indent=1) 306 s += create_line("tag = reader.poll_b()", indent=2) 307 s += create_line("if tag is not None:", indent=2) 308 s += create_line("saw_loop = True", indent=3) 309 s += create_line("break", indent=3) 310 s += create_line("reader.mute()", indent=2) 311 s += create_line("return saw_loop", indent=1) 312 s += create_line() 313 s += create_line() 314 s += create_line( 315 "def send_custom_polling_loop(reader: pn532.PN532, custom_data_hex: str)" 316 " -> bool:" 317 ) 318 s += create_line("saw_loop = False", indent=1) 319 s += create_line("for i in range(_NUM_POLLING_LOOPS):", indent=1) 320 s += create_line("tag = reader.poll_a()", indent=2) 321 s += create_line("if tag is not None:", indent=2) 322 s += create_line( 323 "reader.send_broadcast(bytearray.fromhex(custom_data_hex))", indent=3 324 ) 325 s += create_line("saw_loop = True", indent=3) 326 s += create_line("break", indent=3) 327 s += create_line("reader.poll_b()", indent=2) 328 s += create_line("reader.mute()", indent=2) 329 s += create_line("return saw_loop", indent=1) 330 return s 331 332 333def create_apdu_exchange_method(): 334 """Creates method to conduct an APDU exchange between the emulator and reader.""" 335 s = create_line() 336 s += create_line() 337 s += create_line("def conduct_apdu_exchange(") 338 s += create_line( 339 "reader: pn532.PN532, commands: list[bytearray], responses:" 340 " list[bytearray]", 341 indent=2, 342 ) 343 s += create_line(") -> tuple[bool, bool]:") 344 s += create_line( 345 '"""Conducts an APDU exchange with the PN532 reader."""', indent=1 346 ) 347 s += create_line("transacted = False", indent=1) 348 s += create_line("tag = None", indent=1) 349 s += create_line("for _ in range(_NUM_POLLING_LOOPS):", indent=1) 350 s += create_line("tag = reader.poll_a()", indent=2) 351 s += create_line("if tag is not None:", indent=2) 352 s += create_line("transacted = tag.transact(commands, responses)", indent=3) 353 s += create_line("reader.mute()", indent=3) 354 s += create_line("break", indent=3) 355 s += create_line("reader.mute()", indent=2) 356 s += create_line("return tag, transacted", indent=1) 357 return s 358 359 360def create_setup(): 361 """Creates methods to prepare the PN532 reader and emulator before the test. 362 363 This involves checking to ensure that the raeder and emulator are both 364 present, and enabling NFC on the emulator. 365 366 Args: 367 name: The name of the original snoop log file. 368 """ 369 s = create_line() 370 s += create_line() 371 s += create_line( 372 "class GeneratedMultiDeviceTestCases(base_test.BaseTestClass):" 373 ) 374 s += create_line() 375 s += create_line("def setup_class(self):", indent=1) 376 s += create_line( 377 "self.emulator = self.register_controller(android_device)[0]", indent=2 378 ) 379 s += create_line('self.emulator.debug_tag = "emulator"', indent=2) 380 s += create_line( 381 'pn532_serial_path = self.user_params.get("pn532_serial_path", "")', 382 indent=2, 383 ) 384 s += create_line( 385 'self.with_emulator_app = self.user_params.get("with_emulator_app",' 386 " False)", 387 indent=2, 388 ) 389 s += create_line( 390 'self.emulator.adb.shell(["svc", "nfc", "disable"])', indent=2 391 ) 392 s += create_line( 393 'self.emulator.adb.shell(["svc", "nfc", "enable"])', indent=2 394 ) 395 s += create_line("self.reader = pn532.PN532(pn532_serial_path)", indent=2) 396 s += create_line("self.reader.mute()", indent=2) 397 s += create_line() 398 return s 399 400 401def create_teardown_test(): 402 s = create_line("def teardown_test(self):", indent=1) 403 s += create_line("self.reader.mute()", indent=2) 404 return s 405 406 407def create_main_function(): 408 s = create_line() 409 s += create_line('if __name__ == "__main__":') 410 s += create_line("test_runner.main()", indent=1) 411 s += create_line() 412 return s 413 414 415def create_line(s: str = "", indent: int = 0): 416 return "{}{}\n".format(create_indent(indent), s) 417 418 419def create_indent(multiplier: int): 420 return " " * multiplier * INDENT_SIZE 421 422 423def calculate_time_to_sleep(current_ts: int, last_ts: int) -> int: 424 num_seconds = math.ceil((current_ts - last_ts) / 1000000) 425 if num_seconds < 1: 426 return 1 427 return num_seconds 428