1#  Copyright (C) 2024 The Android Open Source Project
2#
3#  Licensed under the Apache License, Version 2.0 (the "License");
4#  you may not use this file except in compliance with the License.
5#  You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14
15# Lint as: python3
16
17"""Parses the snoop log to extract polling loop data and APDU exchanges."""
18
19import base64
20import dataclasses
21import datetime
22import enum
23import os
24import zlib
25
26PREAMBLE_LENGTH = 9
27HEADER_LENGTH = 7
28SNOOP_LOG_START = "BEGIN:NFCSNOOP_"
29SNOOP_LOG_END = "END:NFCSNOOP_"
30
31# Bytes identifying the starts of polling loop and APDU transactions
32POLLING_LOOP_START_BYTES = bytes.fromhex("6f0c")
33APDU_START_BYTES = bytes.fromhex("6f02")
34
35# Index corresponding to the total length of the APDU data packet
36APDU_LEN_INDEX = 2
37
38# Size of the main APDU header, which precedes either a list of APDU exchanges
39# or a single APDU exchange
40APDU_MAIN_HEADER_SIZE = 5
41
42# Start of the APDU data, which follows the main header
43APDU_DATA_START_INDEX = 6
44
45# Minimum lengths of a valid APDU command and response
46APDU_COMMAND_MIN_LENGTH = 13
47APDU_RESPONSE_MIN_LENGTH = 10
48
49# Sequence of bytes that identifies an APDU transaction
50APDU_IDENTIFIER = bytes([0x20, 0x00])
51
52# Byte that identifies an APDU command
53APDU_COMMAND_IDENTIFIER = 0x19
54
55# Byte that identifies an APDU response
56APDU_RESPONSE_IDENTIFIER = 0x08
57
58# Sequence of bytes that identifies whether the APDU command or response was
59# the first command or response in a longer list of APDU commands or responses
60APDU_ORDER_FIRST = 0x02
61APDU_ORDER_FIRST_ALT = bytes([0x0A, 0x00])
62APDU_ORDER_SECOND = 0x03
63APDU_ORDER_SECOND_ALT = bytes([0x0B, 0x00])
64
65# Sequence of bytes that identifies the start of a "SELECT AID" APDU command
66AID_START_BYTES = bytes.fromhex("00A40400")
67
68# AID groups that are used by the emulator app
69SELECT_AID_FIRST = bytes.fromhex("00A4040008A000000151000000")
70SELECT_AID_SECOND = bytes.fromhex("00A4040008A000000003000000")
71
72
73class NfcType(enum.Enum):
74  NFC_A = 0
75  NFC_B = 1
76  NFC_F = 2
77  NFC_V = 3
78  REMOTE_FIELD = 4
79  UNKNOWN = 5
80
81
82@dataclasses.dataclass
83class PollingLoopEntry:
84  ts: int = 0
85  type: NfcType = NfcType.UNKNOWN
86  data: bytes = b""
87  error: str | None = None
88
89
90@dataclasses.dataclass
91class PartialApduEntry:
92  ts: int = 0
93  is_command: bool = False
94  data: bytes = b""  # data sent by the APDU command or response
95  is_first: bool = (
96      True
97      # whether this is the first entry in the list of APDU commands or responses sent together
98  )
99
100
101@dataclasses.dataclass
102class FullApduEntry:
103  ts: int = 0
104  command: list[bytes] = dataclasses.field(default_factory=lambda: [])
105  response: list[bytes] = dataclasses.field(default_factory=lambda: [])
106  error: str | None = None
107
108
109def replace_aids(
110    log: list[PollingLoopEntry | FullApduEntry],
111) -> list[PollingLoopEntry | FullApduEntry]:
112  """Replaces the AIDs in the log with the AIDs that are used by the emulator app."""
113  new_log: list[PollingLoopEntry | FullApduEntry] = []
114  for cur in log:
115    is_first_aid = True
116    if isinstance(cur, FullApduEntry):
117      new_apdu_entry = FullApduEntry(
118          ts=cur.ts, command=[], response=cur.response
119      )
120      for cmd in cur.command:
121        if cmd.startswith(AID_START_BYTES):
122          if is_first_aid:
123            new_apdu_entry.command.append(SELECT_AID_FIRST)
124            is_first_aid = False
125          else:
126            new_apdu_entry.command.append(SELECT_AID_SECOND)
127        else:
128          new_apdu_entry.command.append(cmd)
129      new_log.append(new_apdu_entry)
130    else:
131      new_log.append(cur)
132  return new_log
133
134
135def parse_timeframe(log, start, end):
136  """Returns a subset of the log that falls within the given timeframe."""
137  if start is None and end is None:
138    return log
139  parsed_log = log
140  if start is not None:
141    start_dt = datetime.datetime.strptime(start, "%Y-%m-%d %H:%M:%S.%f")
142    start_ts = int(float(datetime.datetime.timestamp(start_dt)) * 1000000)
143    parsed_log = list(filter(lambda x: x.ts >= start_ts, log))
144  if end is not None:
145    end_dt = datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S.%f")
146    end_ts = int(float(datetime.datetime.timestamp(end_dt)) * 1000000)
147    parsed_log = list(filter(lambda x: x.ts <= end_ts, parsed_log))
148  return parsed_log
149
150
151def standardize_log(
152    log: list[PollingLoopEntry | PartialApduEntry],
153) -> list[PollingLoopEntry | FullApduEntry]:
154  """Standardizes the log to ensure that it can be replayed by the PN 532 module.
155
156  This includes removing redundant calls to polling loop A and combining APDU
157  commands and responses into a single entry.
158
159  Args:
160    log: The log to be standardized.
161
162  Returns:
163    The standardized log.
164  """
165  cmds = []
166  rsps = []
167  last_ts = 0
168  standardized: list[PollingLoopEntry | FullApduEntry] = []
169  for cur in log:
170    if isinstance(cur, PollingLoopEntry):
171      if cur.type == NfcType.NFC_A or cur.type == NfcType.NFC_B:
172        standardized.append(cur)
173      elif cur.type == NfcType.UNKNOWN:
174        if not standardized:
175          standardized.append(cur)
176        else:
177          standardized[-1] = cur
178    elif isinstance(cur, PartialApduEntry):
179      if cur.is_command:
180        if len(cmds) == len(rsps) + 1:  # extra command without response
181          rsps.append(b"")
182        if len(cmds) == len(rsps) != 0 and cur.data.startswith(AID_START_BYTES):
183          standardized.append(FullApduEntry(last_ts, cmds, rsps))
184          cmds = []
185          rsps = []
186        cmds.append(cur.data)
187      else:
188        if len(cmds) == len(rsps):  # extra response without command
189          continue
190        rsps.append(cur.data)
191      last_ts = cur.ts
192  # handle last command and response
193  if len(cmds) == len(rsps) + 1:
194    rsps.append(b"")
195  if len(cmds) == len(rsps) != 0:
196    standardized.append(FullApduEntry(last_ts, cmds, rsps))
197  return standardized
198
199
200def parse_file(data: bytes) -> list[PollingLoopEntry | PartialApduEntry]:
201  """Parses the file to extract polling loop data and APDU exchanges."""
202  if not data:
203    raise RuntimeError("No data found in file")
204  version = data[0]
205  if version != 1:
206    raise RuntimeError("Unsupported version: {}".format(version))
207
208  offset = PREAMBLE_LENGTH
209  header_length = HEADER_LENGTH
210  pts_offset = 2
211  polling_list = []
212  ts = calculate_timestamp(data)
213  while len(data) - offset > header_length:
214
215    # length of the current transaction in bytes
216    length = data[offset] + (data[offset + 1] << 8)
217
218    # duration between the last transaction and the current one
219    pts = bytearray(data[offset + pts_offset : offset + pts_offset + 4])
220    pts_real = pts[0] + (pts[1] << 8) + (pts[2] << 16) + (pts[3] << 24)
221    ts += pts_real
222
223    transaction_type = (data[offset + header_length] & 0xE0) >> 5
224    if transaction_type == 3:  # ST_NTF or NCI_NTF transactions
225      cur_data = data[offset + header_length : offset + header_length + length]
226      if cur_data.startswith(POLLING_LOOP_START_BYTES):
227        polling_list.extend(add_polling_data(cur_data, ts))
228      elif cur_data.startswith(APDU_START_BYTES):
229        apdu_transactions = find_apdu_transactions(cur_data, ts)
230        polling_list.extend(apdu_transactions)
231    offset += header_length + length
232  return polling_list
233
234
235def open_and_parse_file(
236    file_path: str,
237) -> list[PollingLoopEntry | FullApduEntry]:
238  """Opens the file that contains the unparsed snoop log and parses it.
239
240  Args:
241    file_path: The path of the file containing the unparsed snoop log.
242
243  Returns:
244    A list of polling loop entries and APDU exchanges parsed from the file.
245
246  Raises:
247    RuntimeError: If the file cannot be found.
248  """
249  snoop_file = open_read_file(file_path)
250  str_data = ""
251  found_log = False
252  while line := snoop_file.readline():
253    if not found_log and SNOOP_LOG_START in line:
254      found_log = True
255    elif found_log:
256      if SNOOP_LOG_END in line:
257        break
258      str_data += line
259  snoop_bytes = inflate(base64.b64decode(str_data))
260  parsed = parse_file(snoop_bytes)
261  return standardize_log(parsed)
262
263
264def find_apdu_transactions(data: bytes, ts: int) -> list[PartialApduEntry]:
265  """Finds all APDU transactions in the given data."""
266  total_size = data[APDU_LEN_INDEX]
267  if total_size < APDU_MAIN_HEADER_SIZE or data[4:6] != APDU_IDENTIFIER:
268    return []
269
270  apdus: list[PartialApduEntry] = []
271  index = APDU_DATA_START_INDEX
272  while index < len(data):
273    cur_size = data[index + 1]
274    cur_data = data[index : index + cur_size + 2]
275    cmd, is_first = parse_apdu_command(cur_data) or (None, None)
276    if cmd is not None:
277      apdus.append(
278          PartialApduEntry(ts=ts, is_command=True, data=cmd, is_first=is_first)
279      )
280    else:
281      rsp, is_first = parse_apdu_response(cur_data) or (None, None)
282      if rsp is not None:
283        apdus.append(
284            PartialApduEntry(
285                ts=ts, is_command=False, data=rsp, is_first=is_first
286            )
287        )
288    index += cur_size + 2
289  return apdus
290
291
292def parse_apdu_command(data: bytes):
293  """Isolate the bytes sent from the reader to the emulator.
294
295  Args:
296    data: The raw APDU command in bytes.
297
298  Returns:
299    the data sent by the APDU command, or none if it is not a valid APDU
300    command.
301  """
302  if len(data) < APDU_COMMAND_MIN_LENGTH:
303    return None
304  if data[0] != APDU_COMMAND_IDENTIFIER:
305    return None
306  if data[1] != len(data) - 2:
307    return None
308  if data[5:7] != bytes.fromhex("0000"):
309    return None
310  if data[8] in [APDU_ORDER_FIRST, APDU_ORDER_SECOND]:
311    is_first = True if data[8] == APDU_ORDER_FIRST else False
312    return data[9:-4], is_first
313  elif data[8:10] in [APDU_ORDER_FIRST_ALT, APDU_ORDER_SECOND_ALT]:
314    is_first = True if data[8:10] == APDU_ORDER_FIRST_ALT else False
315    return data[10:-4], is_first
316  return None
317
318
319def parse_apdu_response(data: bytes):
320  """Isolates the data sent from the emulator to the reader.
321
322  Args:
323    data: The raw APDU response in bytes.
324
325  Returns:
326    the data sent by the APDU response, or none if it is not a valid APDU
327    response.
328  """
329  if len(data) < APDU_RESPONSE_MIN_LENGTH:
330    return None
331  if data[0] != APDU_RESPONSE_IDENTIFIER:
332    return None
333  if data[1] != len(data) - 2:
334    return None
335  if data[5] in [APDU_ORDER_FIRST, APDU_ORDER_SECOND]:
336    is_first = True if data[5] == APDU_ORDER_FIRST else False
337    return data[6:-4], is_first
338  elif data[5:7] in [APDU_ORDER_FIRST_ALT, APDU_ORDER_SECOND_ALT]:
339    is_first = True if data[5:7] == APDU_ORDER_FIRST_ALT else False
340    return data[7:-4], is_first
341  elif data[7] in [APDU_ORDER_FIRST, APDU_ORDER_SECOND]:
342    is_first = True if data[7] == APDU_ORDER_FIRST else False
343    return data[8:-4], is_first
344  return None
345
346
347def add_polling_data(data: bytes, ts: int) -> list[PollingLoopEntry]:
348  """Adds polling data to the list of transactions.
349
350  Each entry may contain multiple polling data transactions.
351
352  Args:
353    data: The raw polling data in bytes.
354    ts: The timestamp of the polling transaction.
355
356  Returns:
357    A list of polling data transactions.
358  """
359  transaction_list = []
360  count = 4
361  while count < len(data):
362    flag = data[count]
363    match flag:
364      case 0:
365        entry_type = NfcType.REMOTE_FIELD
366      case 1:
367        entry_type = NfcType.NFC_A
368      case 2:
369        entry_type = NfcType.NFC_B
370      case _:
371        entry_type = NfcType.UNKNOWN
372    length = data[count + 2] - 5
373    polling_data = data[count + 8 : count + 8 + length]
374    transaction_list.append(
375        PollingLoopEntry(
376            ts=ts,
377            type=entry_type,
378            data=polling_data,
379        )
380    )
381    count += 8 + length
382  return transaction_list
383
384
385def calculate_timestamp(data: bytes) -> int:
386  """Calculates the timestamp of the first frame in the log."""
387  ts = data[1:9]
388  ts_real = (
389      ts[0]
390      + (ts[1] << 8)
391      + (ts[2] << 16)
392      + (ts[3] << 24)
393      + (ts[4] << 32)
394      + (ts[5] << 40)
395      + (ts[6] << 48)
396      + (ts[7] << 56)
397  )
398  offset = PREAMBLE_LENGTH
399  while (len(data) - offset) > HEADER_LENGTH:
400    length = data[offset] + (data[offset + 1] << 8)
401    pts = bytearray(data[offset + 2 : offset + 6])
402    pts_real = pts[0] + (pts[1] << 8) + (pts[2] << 16) + (pts[3] << 24)
403    ts_real -= pts_real
404    offset += HEADER_LENGTH + length
405  return ts_real
406
407
408def inflate(data: bytes) -> bytes:
409  """Inflates decompressed data."""
410  decompressed = zlib.decompressobj().decompress(data[PREAMBLE_LENGTH:])
411  return data[0:PREAMBLE_LENGTH] + decompressed
412
413
414def open_read_file(file_path: str):
415  """Opens the file at the given path.
416
417  Args:
418    file_path: The path of the file to be opened. This can be either a local
419      path or an absolute path.
420
421  Returns:
422    An object representing the opened file.
423
424  Raises:
425    RuntimeError: If the file cannot be opened.
426  """
427  full_path = os.path.dirname(os.path.realpath(__file__)) + "/" + file_path
428  if os.path.exists(file_path):
429    file_to_open = file_path
430  elif os.path.exists(full_path):
431    file_to_open = full_path
432  else:
433    raise RuntimeError("File not found: {}".format(file_path))
434
435  try:
436    return open(file_to_open, "rt")
437  except Exception as e:
438    raise RuntimeError(
439        "Error occurred while opening file: {}".format(file_path)
440    ) from e
441