1#  Copyright (C) 2024 The Android Open Source Project
2#
3#  Licensed under the Apache License, Version 2.0 (the "License");
4#  you may not use this file except in compliance with the License.
5#  You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14
15# Lint as: python3
16
17"""Generates a Python test case from a snoop log."""
18
19import json
20import math
21import os
22
23from parse_log import FullApduEntry, NfcType, PollingLoopEntry
24
25INDENT_SIZE = 4
26
27
28def generate_test(
29    log: list[FullApduEntry | PollingLoopEntry], name: str
30) -> str:
31  """Generates a Python test case from a snoop log parsed by the replay tool.
32
33  The generated test will be placed in the current directory.
34
35  Args:
36    log: The parsed snoop log.
37    name: The name of the file containing the snoop log.
38
39  Returns:
40    The name of the JSON file containing APDUs needed to run the test.
41  """
42  # The name of the test file is based on the name of the snoop log
43  python_local_file = name + "_test.py"
44  file_path = (
45      os.path.dirname(os.path.realpath(__file__)) + "/" + python_local_file
46  )
47
48  try:
49    file = open(file_path, "wt")
50  except Exception as e:
51    raise RuntimeError(
52        "Error occurred while opening file: {}".format(file_path)
53    ) from e
54  file.write(create_imports())
55  file.write(create_replace_aids_method())
56  file.write(create_polling_loop_methods())
57  file.write(create_apdu_exchange_method())
58  file.write(create_setup())
59  file.write(create_test_opening(name))
60
61  last_timestamp = log[0].ts
62  json_list = []
63  for entry in log:
64    if isinstance(entry, PollingLoopEntry):
65      file.write(create_polling_loop_test(entry, last_timestamp))
66    else:  # isinstance(entry, FullApduEntry):
67      file.write(create_apdu_test(entry, last_timestamp))
68      json_list.append(create_apdu_dict(entry))
69    last_timestamp = entry.ts
70
71  json_dump = json.dumps(json_list)
72  apdu_local_file = name + "_apdus.json"
73  apdu_file_path = (
74      os.path.dirname(os.path.realpath(__file__)) + "/" + apdu_local_file
75  )
76  apdu_file = open(apdu_file_path, "wt")
77  apdu_file.write(json_dump)
78
79  file.write(create_teardown_test())
80  file.write(create_main_function())
81
82  print()
83  print(
84      "Test generated at {}. To run the test, copy the test file to"
85      " packages/apps/Nfc/tests/testcases/multidevices/.".format(file_path)
86  )
87  update_android_bp(python_local_file, name)
88
89  return apdu_local_file
90
91
92def update_android_bp(local_file_path, test_name):
93  """Creates a new python_test_host entry in Android.bp for the generated test."""
94  try:
95    android_bp = open("Android.bp", "a")
96  except Exception as e:
97    raise RuntimeError("Error occurred while opening Android.bp") from e
98
99  s = create_line()
100  s += create_line()
101  s += create_line("python_test_host {")
102  s += create_line('name: "{}",'.format(test_name), indent=1)
103  s += create_line('main: "{}",'.format(local_file_path), indent=1)
104  s += create_line('srcs: ["{}"],'.format(local_file_path), indent=1)
105  s += create_line('test_config: "AndroidTest.xml",', indent=1)
106  s += create_line("test_options: {", indent=1)
107  s += create_line("unit_test: false,", indent=2)
108  s += create_line('tags: ["mobly"],', indent=2)
109  s += create_line("},", indent=1)
110  s += create_line('defaults: ["GeneratedTestsPythonDefaults"],', indent=1)
111  s += create_line("}")
112  android_bp.write(s)
113
114
115def create_apdu_dict(entry: FullApduEntry):
116  """Creates a dictionary representation of an APDU entry."""
117  command_arr = []
118  for cmd in entry.command:
119    command_arr.append(cmd.hex())
120  response_arr = []
121  for rsp in entry.response:
122    if isinstance(rsp, str):
123      response_arr.append(rsp)
124    else:
125      response_arr.append(rsp.hex())
126  apdu_dict = {
127      "commands": command_arr,
128      "responses": response_arr,
129  }
130  return apdu_dict
131
132
133def create_test_opening(name: str):
134  """Creates the opening of the test file."""
135  s = create_line("def test_{}(self):".format(name), indent=1)
136  s += create_line("# Read in APDU commands and responses from file", indent=2)
137  s += create_line(
138      'file_path_name = self.user_params.get("file_path", "")', indent=2
139  )
140  s += create_line("apdu_cmds = []", indent=2)
141  s += create_line("apdu_rsps = []", indent=2)
142  s += create_line("if file_path_name:", indent=2)
143  s += create_line('with open(file_path_name, "r") as json_data:', indent=3)
144  s += create_line("d = json.load(json_data)", indent=4)
145  s += create_line("for entry in d:", indent=4)
146  s += create_line("apdu_cmds.append(", indent=5)
147  s += create_line(
148      '[bytearray.fromhex(cmd) for cmd in entry["commands"]]', indent=6
149  )
150  s += create_line(")", indent=5)
151  s += create_line("apdu_rsps.append(", indent=5)
152  s += create_line(
153      '[bytearray.fromhex(rsp) for rsp in entry["responses"]]', indent=6
154  )
155  s += create_line(")", indent=5)
156  s += create_line()
157  return s
158
159
160def create_polling_loop_test(entry: PollingLoopEntry, last_timestamp: int):
161  """Adds code to send a polling loop from the reader to the emulator.
162
163  The test will check to ensure that the polling loop is successfully received.
164  """
165  s = create_line(
166      "# Sending {} polling loop".format(entry.type),
167      indent=2,
168  )
169
170  sleep_time = calculate_time_to_sleep(entry.ts, last_timestamp)
171  s += create_line("time.sleep({})".format(sleep_time), indent=2)
172
173  match entry.type:
174    case NfcType.NFC_A:
175      s += create_line("saw_loop = send_polling_loop_a(self.reader)", indent=2)
176    case NfcType.NFC_B:
177      s += create_line("saw_loop = send_polling_loop_b(self.reader)", indent=2)
178    case _:  # NfcType.UNKNOWN
179      s += create_line('custom_data = "{}"'.format(entry.data.hex()), indent=2)
180      s += create_line(
181          "saw_loop = send_custom_polling_loop(self.reader, custom_data)",
182          indent=2,
183      )
184  s += create_line(
185      'asserts.assert_true(saw_loop, "Did not see polling loop")', indent=2
186  )
187  s += create_line()
188  return s
189
190
191def create_apdu_test(entry: FullApduEntry, last_timestamp: int):
192  """Adds code to conduct an APDU exchange between the reader and emulator.
193
194  The test will check to ensure that the expected response is received from the
195  emulator.
196  """
197  s = create_line("# Conducting APDU exchange", indent=2)
198
199  sleep_time = calculate_time_to_sleep(entry.ts, last_timestamp)
200  s += create_line("time.sleep({})".format(sleep_time), indent=2)
201
202  s += create_line("commands = apdu_cmds[0]", indent=2)
203  s += create_line("if self.with_emulator_app:", indent=2)
204  s += create_line("commands = replace_aids(commands)", indent=3)
205  s += create_line("responses = apdu_rsps[0]", indent=2)
206  s += create_line(
207      "tag_found, transacted = conduct_apdu_exchange(self.reader, commands,"
208      " responses)",
209      indent=2,
210  )
211  s += create_line()
212  s += create_line("asserts.assert_true(", indent=2)
213  s += create_line(
214      'tag_found, "Reader did not detect tag, transaction not attempted."',
215      indent=3,
216  )
217  s += create_line(")", indent=2)
218  s += create_line("asserts.assert_true(", indent=2)
219  s += create_line("transacted,", indent=3)
220  s += create_line(
221      '"Transaction failed, check device logs for more information."', indent=3
222  )
223  s += create_line(")", indent=2)
224  s += create_line()
225  s += create_line("apdu_cmds.pop(0)", indent=2)
226  s += create_line("apdu_rsps.pop(0)", indent=2)
227  s += create_line()
228  return s
229
230
231def create_imports():
232  s = create_line('"""Test generated from the NFC Replay Tool."""')
233  s += create_line()
234  s += create_line("import json")
235  s += create_line("import time")
236  s += create_line("from mobly import asserts")
237  s += create_line("from mobly import base_test")
238  s += create_line("from mobly import test_runner")
239  s += create_line("from mobly.controllers import android_device")
240  s += create_line("import pn532")
241  s += create_line()
242  s += create_line("# Number of polling loops to perform.")
243  s += create_line("_NUM_POLLING_LOOPS = 50")
244  s += create_line()
245  return s
246
247
248def create_replace_aids_method():
249  """Create a method that replaces the AIDs sent by the test with the ones
250
251  that are used by the emulator app.
252  """
253  s = create_line("_EMULATOR_AIDS = [")
254  s += create_line('bytearray.fromhex("00a4040008a000000151000000"),', indent=1)
255  s += create_line('bytearray.fromhex("00a4040008a000000003000000"),', indent=1)
256  s += create_line("]")
257  s += create_line()
258  s += create_line()
259  s += create_line("def replace_aids(commands: list[bytearray]):")
260  s += create_line(
261      '"""Replaces SELECT AID commands with AIDs from the emulator app."""',
262      indent=1,
263  )
264  s += create_line("new_commands = []", indent=1)
265  s += create_line("is_first_aid = True", indent=1)
266  s += create_line("for command in commands:", indent=1)
267  s += create_line(
268      'if command.startswith(bytearray.fromhex("00a40400")):', indent=2
269  )
270  s += create_line("if is_first_aid:", indent=3)
271  s += create_line(
272      "new_commands.append(_EMULATOR_AIDS[0])",
273      indent=4,
274  )
275  s += create_line("is_first_aid = False", indent=4)
276  s += create_line("else:", indent=3)
277  s += create_line("new_commands.append(_EMULATOR_AIDS[1])", indent=4)
278  s += create_line("else:", indent=2)
279  s += create_line("new_commands.append(command)", indent=3)
280  s += create_line("return new_commands", indent=1)
281  return s
282
283
284def create_polling_loop_methods():
285  """Create methods that send polling loops to the reader.
286
287  Specifically, three methods are created: send_polling_loop_a(),
288  send_polling_loop_b(), and send_custom_polling_loop().
289  """
290  s = create_line()
291  s += create_line()
292  s += create_line("def send_polling_loop_a(reader: pn532.PN532) -> bool:")
293  s += create_line("saw_loop = False", indent=1)
294  s += create_line("for i in range(_NUM_POLLING_LOOPS):", indent=1)
295  s += create_line("tag = reader.poll_a()", indent=2)
296  s += create_line("if tag is not None:", indent=2)
297  s += create_line("saw_loop = True", indent=3)
298  s += create_line("break", indent=3)
299  s += create_line("reader.mute()", indent=2)
300  s += create_line("return saw_loop", indent=1)
301  s += create_line()
302  s += create_line()
303  s += create_line("def send_polling_loop_b(reader: pn532.PN532) -> bool:")
304  s += create_line("saw_loop = False", indent=1)
305  s += create_line("for i in range(_NUM_POLLING_LOOPS):", indent=1)
306  s += create_line("tag = reader.poll_b()", indent=2)
307  s += create_line("if tag is not None:", indent=2)
308  s += create_line("saw_loop = True", indent=3)
309  s += create_line("break", indent=3)
310  s += create_line("reader.mute()", indent=2)
311  s += create_line("return saw_loop", indent=1)
312  s += create_line()
313  s += create_line()
314  s += create_line(
315      "def send_custom_polling_loop(reader: pn532.PN532, custom_data_hex: str)"
316      " -> bool:"
317  )
318  s += create_line("saw_loop = False", indent=1)
319  s += create_line("for i in range(_NUM_POLLING_LOOPS):", indent=1)
320  s += create_line("tag = reader.poll_a()", indent=2)
321  s += create_line("if tag is not None:", indent=2)
322  s += create_line(
323      "reader.send_broadcast(bytearray.fromhex(custom_data_hex))", indent=3
324  )
325  s += create_line("saw_loop = True", indent=3)
326  s += create_line("break", indent=3)
327  s += create_line("reader.poll_b()", indent=2)
328  s += create_line("reader.mute()", indent=2)
329  s += create_line("return saw_loop", indent=1)
330  return s
331
332
333def create_apdu_exchange_method():
334  """Creates method to conduct an APDU exchange between the emulator and reader."""
335  s = create_line()
336  s += create_line()
337  s += create_line("def conduct_apdu_exchange(")
338  s += create_line(
339      "reader: pn532.PN532, commands: list[bytearray], responses:"
340      " list[bytearray]",
341      indent=2,
342  )
343  s += create_line(") -> tuple[bool, bool]:")
344  s += create_line(
345      '"""Conducts an APDU exchange with the PN532 reader."""', indent=1
346  )
347  s += create_line("transacted = False", indent=1)
348  s += create_line("tag = None", indent=1)
349  s += create_line("for _ in range(_NUM_POLLING_LOOPS):", indent=1)
350  s += create_line("tag = reader.poll_a()", indent=2)
351  s += create_line("if tag is not None:", indent=2)
352  s += create_line("transacted = tag.transact(commands, responses)", indent=3)
353  s += create_line("reader.mute()", indent=3)
354  s += create_line("break", indent=3)
355  s += create_line("reader.mute()", indent=2)
356  s += create_line("return tag, transacted", indent=1)
357  return s
358
359
360def create_setup():
361  """Creates methods to prepare the PN532 reader and emulator before the test.
362
363  This involves checking to ensure that the raeder and emulator are both
364  present, and enabling NFC on the emulator.
365
366  Args:
367    name: The name of the original snoop log file.
368  """
369  s = create_line()
370  s += create_line()
371  s += create_line(
372      "class GeneratedMultiDeviceTestCases(base_test.BaseTestClass):"
373  )
374  s += create_line()
375  s += create_line("def setup_class(self):", indent=1)
376  s += create_line(
377      "self.emulator = self.register_controller(android_device)[0]", indent=2
378  )
379  s += create_line('self.emulator.debug_tag = "emulator"', indent=2)
380  s += create_line(
381      'pn532_serial_path = self.user_params.get("pn532_serial_path", "")',
382      indent=2,
383  )
384  s += create_line(
385      'self.with_emulator_app = self.user_params.get("with_emulator_app",'
386      " False)",
387      indent=2,
388  )
389  s += create_line(
390      'self.emulator.adb.shell(["svc", "nfc", "disable"])', indent=2
391  )
392  s += create_line(
393      'self.emulator.adb.shell(["svc", "nfc", "enable"])', indent=2
394  )
395  s += create_line("self.reader = pn532.PN532(pn532_serial_path)", indent=2)
396  s += create_line("self.reader.mute()", indent=2)
397  s += create_line()
398  return s
399
400
401def create_teardown_test():
402  s = create_line("def teardown_test(self):", indent=1)
403  s += create_line("self.reader.mute()", indent=2)
404  return s
405
406
407def create_main_function():
408  s = create_line()
409  s += create_line('if __name__ == "__main__":')
410  s += create_line("test_runner.main()", indent=1)
411  s += create_line()
412  return s
413
414
415def create_line(s: str = "", indent: int = 0):
416  return "{}{}\n".format(create_indent(indent), s)
417
418
419def create_indent(multiplier: int):
420  return " " * multiplier * INDENT_SIZE
421
422
423def calculate_time_to_sleep(current_ts: int, last_ts: int) -> int:
424  num_seconds = math.ceil((current_ts - last_ts) / 1000000)
425  if num_seconds < 1:
426    return 1
427  return num_seconds
428