1#  Copyright (C) 2024 The Android Open Source Project
2#
3#  Licensed under the Apache License, Version 2.0 (the "License");
4#  you may not use this file except in compliance with the License.
5#  You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14
15# Lint as: python3
16
17import argparse
18import datetime
19import os
20import subprocess
21import time
22from generate_test import generate_test
23from parse_log import FullApduEntry, NfcType, PollingLoopEntry, open_and_parse_file, parse_timeframe, replace_aids
24from pn532 import PN532
25
26# Minimum amount of time between successive NFC transactions, to prevent the
27# reader from being overloaded.
28_MIN_SLEEP_TIME_SECONDS = 0.5
29
30# Maximum amount of time allowed between successive NFC transactions, to allow
31# for ease of use.
32_MAX_SLEEP_TIME_SECONDS = 5
33
34_EXPECTED_ERROR_INDEX = 0
35_ACTUAL_ERROR_INDEX = 1
36
37# Number of times to try to obtain a tag before declaring failure.
38_NUM_RETRIES = 50
39
40# String templates for the output of a test case or snoop log.
41_APDU_OUTPUT_STR = "{}: APDU exchange: sent {}, received {}"
42_APDU_OUTPUT_TEST_STR = "{}: APDU exchange: sent {} APDUs, received {} APDUs"
43_POLLING_OUTPUT_STR = "{}: sent NFC data of type {}"
44_ERROR_STR = "     ERROR: {}"
45
46# Width of the column for outputting the results of a test case.
47_COLUMN_WIDTH = 80
48
49# Directory for generated test cases and files for the emulator app.
50_EMULATOR_APP_PARSED_DIR = "src/com/android/nfc/emulatorapp/parsed_files/"
51
52
53def send_nfc_a_data(reader: PN532) -> str | None:
54  """Calls poll_a() on the reader.
55
56  Args:
57    reader: The PN532 reader.
58
59  Returns:
60    the error, if one occurs
61  """
62  try:
63    reader.poll_a()
64  except Exception as e:
65    return e.__str__()
66
67
68def send_nfc_b_data(reader: PN532) -> str | None:
69  """Calls poll_b() on the reader.
70
71  Args:
72    reader: The PN532 reader.
73
74  Returns:
75    the error, if one occurs
76  """
77  try:
78    reader.poll_b()
79  except Exception as e:
80    return e.__str__()
81
82
83def send_unknown_data(reader: PN532, data: bytes) -> str | None:
84  """Sends a custom polling frame to the reader.
85
86  Args:
87    reader: The PN532 reader.
88    data: The custom polling frame to be sent to the reader.
89
90  Returns:
91    the error, if one occurs
92  """
93  try:
94    reader.poll_a()
95    reader.send_broadcast(data)
96    reader.mute()
97  except Exception as e:
98    return e.__str__()
99
100
101def conduct_apdu_exchange(reader: PN532, current: FullApduEntry) -> str | None:
102  """Conducts an APDU exchange between the emulator and the PN 532 module.
103
104  Once the device is detected by the reader, the reader will send the APDU
105  commands to the device and receive the responses. If an error occurs -- for
106  instance, if the response from the emulator differs from the expected response
107  -- the error is logged in the output.
108
109  Args:
110    reader: The PN532 reader.
111    current: A data object containing the APDU commands to be sent to the
112      emulator and the expected responses.
113
114  Returns:
115    the error, if one occurs
116  """
117  try:
118    for i in range(_NUM_RETRIES):
119      tag = reader.poll_a()
120      if tag is not None:
121        transacted = tag.transact(current.command, current.response)
122
123        if not transacted:
124          return "Received incorrect response. Expected: ".format(
125              current.response
126          )
127        return None
128      reader.mute()
129    return "No tag received"
130  except Exception as e:
131    return e.__str__()
132
133
134def replay_transaction(log, module_path: str):
135  """Replays the given transaction log on the PN 532 module.
136
137  Args:
138    log: The transaction log to be replayed.
139    module_path: The serial path to the PN 532 module.
140  """
141  try:
142    reader = PN532(module_path)
143  except Exception as e:
144    print("Could not connect to PN532 module")
145    return
146
147  if not log:
148    return
149
150  prev_time = log[0].ts
151
152  for current in log:
153    num_seconds = (current.ts - prev_time) / 1000000
154    if num_seconds < _MIN_SLEEP_TIME_SECONDS:
155      time.sleep(_MIN_SLEEP_TIME_SECONDS)
156    elif num_seconds > _MAX_SLEEP_TIME_SECONDS:
157      time.sleep(_MAX_SLEEP_TIME_SECONDS)
158    else:
159      time.sleep(num_seconds)
160
161    error = None
162
163    if isinstance(current, PollingLoopEntry):
164      if current.type == NfcType.NFC_A:
165        error = send_nfc_a_data(reader)
166      elif current.type == NfcType.NFC_B:
167        error = send_nfc_b_data(reader)
168      elif current.type == NfcType.UNKNOWN:
169        error = send_unknown_data(reader, current.data)
170    elif isinstance(current, FullApduEntry):
171      error = conduct_apdu_exchange(reader, current)
172
173    output_line_for_snoop_log(current, error)
174
175    # adjust timestamp
176    prev_time = current.ts
177
178  reader.mute()
179
180
181def parse_snoop_log(args: argparse.Namespace):
182  """Parses the given snoop log file.
183
184  If the file will be used for replaying a transaction with the emulator app,
185  the AIDs will be replaced with the ones used by the app. Additionally, if the
186  user specifies a start and end time, the log will be filtered to only include
187  transactions that fall within that timeframe.
188
189  Args:
190    snoop_file: The local path to the snoop log file.
191
192  Returns:
193    The parsed snoop log.
194  """
195  parsed = open_and_parse_file(args.file)
196
197  # replace the AIDs with the ones used by the emulator app
198  if args.replay_with_app or args.parse_only:
199    parsed = replace_aids(parsed)
200
201  return parse_timeframe(parsed, args.start, args.end)
202
203
204def output_line_for_snoop_log(
205    entry: PollingLoopEntry | FullApduEntry,
206    error: str | None,
207):
208  """Outputs a summary of an interaction from a snoop log.
209
210  Args:
211    entry: The current interaction to be replayed from the snoop log.
212    error: Whether or not an error occurred during the replayed (actual)
213      transaction.
214  """
215  cur_time = int(float(datetime.datetime.now().timestamp() * 1000000))
216  cur_time_str = datetime.datetime.fromtimestamp(cur_time / 1000000).strftime(
217      "%Y-%m-%d %H:%M:%S.%f"
218  )
219  if isinstance(entry, FullApduEntry):
220    print(
221        _APDU_OUTPUT_STR.format(
222            cur_time_str,
223            [command.hex() for command in entry.command],
224            [response.hex() for response in entry.response],
225        )
226    )
227  else:  # isinstance(entry, PollingLoopEntry)
228    print(_POLLING_OUTPUT_STR.format(cur_time_str, entry.type.name))
229
230  if error is not None:
231    print(_ERROR_STR.format(error))
232
233
234def print_opening_sequence(
235    file_name: str,
236    start: str | None = None,
237    end: str | None = None,
238):
239  """Prints the opening sequence for a test case or snoop log.
240
241  The name of the file to be replayed is displayed, along with the timeframe
242  that will be replayed, if specified by the user.
243
244  Args:
245    file_name: The name of the file to be replayed.
246    start: The start of the timeframe to be replayed.
247    end: The end of the timeframe to be replayed.
248  """
249  print()
250  print("Replaying transaction from snoop log: {}".format(file_name))
251  if start is not None and end is not None:
252    print("Timeframe: {} - {}".format(start, end))
253  elif start is not None:
254    print("Timeframe: {} - end".format(start))
255  elif end is not None:
256    print("Timeframe: start - {}".format(end))
257  else:
258    print()
259
260
261def create_file_for_emulator_app(
262    output: list[PollingLoopEntry | FullApduEntry], filename: str
263):
264  """Creates a file containing the parsed APDU exchanges from a snoop log.
265
266  This will be to replay the transaction with an Android app installed on the
267  emulator.
268
269  Args:
270    output: A list of polling loop entries and APDU exchanges parsed from the
271      snoop log.
272    filename: The name of the file to be created. This is near-identical to the
273      name of the snoop log file.
274  """
275  local_path = _EMULATOR_APP_PARSED_DIR + filename.replace("/", "_")
276  full_path = os.path.dirname(os.path.realpath(__file__)) + "/" + local_path
277  try:
278    file = open(full_path, "wt")
279  except Exception as e:
280    raise RuntimeError(
281        "Error occurred while opening file: {}".format(full_path)
282    ) from e
283  for entry in output:
284    if isinstance(entry, FullApduEntry):
285      file.write(
286          "{};{}".format(
287              [command.hex() for command in entry.command],
288              [response.hex() for response in entry.response],
289          )
290      )
291      file.write("\n")
292  print()
293  print("File for third party app generated at: {}".format(local_path))
294
295
296def get_name_for_test_case(filename: str) -> str:
297  return "Generated" + filename.replace("/", "").replace(".txt", "")
298
299
300def main():
301  parser = argparse.ArgumentParser(prog="pn532")
302  parser.add_argument(
303      "-p",
304      "--path",
305      action="store",
306      help="Path to the PN532 serial device, e.g. /dev/ttyUSB0",
307  )
308  parser.add_argument(
309      "-f",
310      "--file",
311      action="store",
312      required=True,
313      help="Path to the file of the snoop log",
314  )
315  parser.add_argument(
316      "--start",
317      action="store",
318      help="Start of the timeframe to be replayed",
319  )
320  parser.add_argument(
321      "--end",
322      action="store",
323      help="End of the timeframe to be replayed",
324  )
325  parser.add_argument(
326      "--parse_only",
327      action="store_true",
328      help="Parse the log without replaying the transaction",
329  )
330  parser.add_argument(
331      "--replay_with_app",
332      action="store_true",
333      help="Replay the transaction with the emulator app",
334  )
335  parser.add_argument(
336      "--generate_and_replay_test",
337      action="store_true",
338      help="Generate a test case from the log and then immediately run it",
339  )
340  args = parser.parse_args()
341
342  parsed_snoop_log = parse_snoop_log(args)
343  if args.parse_only:  # scenario 1: parse snoop log for the emulator app
344    create_file_for_emulator_app(parsed_snoop_log, args.file)
345  else:  # scenario 2: replay transaction from a snoop log
346    print_opening_sequence(
347        file_name=args.file,
348        start=args.start,
349        end=args.end,
350    )
351    if args.generate_and_replay_test:  # Replay the test that was just generated
352      test_case_name = get_name_for_test_case(args.file)
353      apdu_local_file = generate_test(parsed_snoop_log, test_case_name)
354      test_command = [
355          "atest",
356          "-v",
357          test_case_name,
358          "--",
359          "--testparam",
360          "pn532_serial_path=" + args.path,
361          "--testparam",
362          "file_path=" + apdu_local_file,
363      ]
364      if args.replay_with_app:
365        test_command += ["--testparam", "with_emulator_app=True"]
366      subprocess.run(test_command)
367    else:  # Default: replay the transaction
368      replay_transaction(parsed_snoop_log, args.path)
369
370
371if __name__ == "__main__":
372  main()
373