1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# Lint as: python3 16 17"""Parses the snoop log to extract polling loop data and APDU exchanges.""" 18 19import base64 20import dataclasses 21import datetime 22import enum 23import os 24import zlib 25 26PREAMBLE_LENGTH = 9 27HEADER_LENGTH = 7 28SNOOP_LOG_START = "BEGIN:NFCSNOOP_" 29SNOOP_LOG_END = "END:NFCSNOOP_" 30 31# Bytes identifying the starts of polling loop and APDU transactions 32POLLING_LOOP_START_BYTES = bytes.fromhex("6f0c") 33APDU_START_BYTES = bytes.fromhex("6f02") 34 35# Index corresponding to the total length of the APDU data packet 36APDU_LEN_INDEX = 2 37 38# Size of the main APDU header, which precedes either a list of APDU exchanges 39# or a single APDU exchange 40APDU_MAIN_HEADER_SIZE = 5 41 42# Start of the APDU data, which follows the main header 43APDU_DATA_START_INDEX = 6 44 45# Minimum lengths of a valid APDU command and response 46APDU_COMMAND_MIN_LENGTH = 13 47APDU_RESPONSE_MIN_LENGTH = 10 48 49# Sequence of bytes that identifies an APDU transaction 50APDU_IDENTIFIER = bytes([0x20, 0x00]) 51 52# Byte that identifies an APDU command 53APDU_COMMAND_IDENTIFIER = 0x19 54 55# Byte that identifies an APDU response 56APDU_RESPONSE_IDENTIFIER = 0x08 57 58# Sequence of bytes that identifies whether the APDU command or response was 59# the first command or response in a longer list of APDU commands or responses 60APDU_ORDER_FIRST = 0x02 61APDU_ORDER_FIRST_ALT = bytes([0x0A, 0x00]) 62APDU_ORDER_SECOND = 0x03 63APDU_ORDER_SECOND_ALT = bytes([0x0B, 0x00]) 64 65# Sequence of bytes that identifies the start of a "SELECT AID" APDU command 66AID_START_BYTES = bytes.fromhex("00A40400") 67 68# AID groups that are used by the emulator app 69SELECT_AID_FIRST = bytes.fromhex("00A4040008A000000151000000") 70SELECT_AID_SECOND = bytes.fromhex("00A4040008A000000003000000") 71 72 73class NfcType(enum.Enum): 74 NFC_A = 0 75 NFC_B = 1 76 NFC_F = 2 77 NFC_V = 3 78 REMOTE_FIELD = 4 79 UNKNOWN = 5 80 81 82@dataclasses.dataclass 83class PollingLoopEntry: 84 ts: int = 0 85 type: NfcType = NfcType.UNKNOWN 86 data: bytes = b"" 87 error: str | None = None 88 89 90@dataclasses.dataclass 91class PartialApduEntry: 92 ts: int = 0 93 is_command: bool = False 94 data: bytes = b"" # data sent by the APDU command or response 95 is_first: bool = ( 96 True 97 # whether this is the first entry in the list of APDU commands or responses sent together 98 ) 99 100 101@dataclasses.dataclass 102class FullApduEntry: 103 ts: int = 0 104 command: list[bytes] = dataclasses.field(default_factory=lambda: []) 105 response: list[bytes] = dataclasses.field(default_factory=lambda: []) 106 error: str | None = None 107 108 109def replace_aids( 110 log: list[PollingLoopEntry | FullApduEntry], 111) -> list[PollingLoopEntry | FullApduEntry]: 112 """Replaces the AIDs in the log with the AIDs that are used by the emulator app.""" 113 new_log: list[PollingLoopEntry | FullApduEntry] = [] 114 for cur in log: 115 is_first_aid = True 116 if isinstance(cur, FullApduEntry): 117 new_apdu_entry = FullApduEntry( 118 ts=cur.ts, command=[], response=cur.response 119 ) 120 for cmd in cur.command: 121 if cmd.startswith(AID_START_BYTES): 122 if is_first_aid: 123 new_apdu_entry.command.append(SELECT_AID_FIRST) 124 is_first_aid = False 125 else: 126 new_apdu_entry.command.append(SELECT_AID_SECOND) 127 else: 128 new_apdu_entry.command.append(cmd) 129 new_log.append(new_apdu_entry) 130 else: 131 new_log.append(cur) 132 return new_log 133 134 135def parse_timeframe(log, start, end): 136 """Returns a subset of the log that falls within the given timeframe.""" 137 if start is None and end is None: 138 return log 139 parsed_log = log 140 if start is not None: 141 start_dt = datetime.datetime.strptime(start, "%Y-%m-%d %H:%M:%S.%f") 142 start_ts = int(float(datetime.datetime.timestamp(start_dt)) * 1000000) 143 parsed_log = list(filter(lambda x: x.ts >= start_ts, log)) 144 if end is not None: 145 end_dt = datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S.%f") 146 end_ts = int(float(datetime.datetime.timestamp(end_dt)) * 1000000) 147 parsed_log = list(filter(lambda x: x.ts <= end_ts, parsed_log)) 148 return parsed_log 149 150 151def standardize_log( 152 log: list[PollingLoopEntry | PartialApduEntry], 153) -> list[PollingLoopEntry | FullApduEntry]: 154 """Standardizes the log to ensure that it can be replayed by the PN 532 module. 155 156 This includes removing redundant calls to polling loop A and combining APDU 157 commands and responses into a single entry. 158 159 Args: 160 log: The log to be standardized. 161 162 Returns: 163 The standardized log. 164 """ 165 cmds = [] 166 rsps = [] 167 last_ts = 0 168 standardized: list[PollingLoopEntry | FullApduEntry] = [] 169 for cur in log: 170 if isinstance(cur, PollingLoopEntry): 171 if cur.type == NfcType.NFC_A or cur.type == NfcType.NFC_B: 172 standardized.append(cur) 173 elif cur.type == NfcType.UNKNOWN: 174 if not standardized: 175 standardized.append(cur) 176 else: 177 standardized[-1] = cur 178 elif isinstance(cur, PartialApduEntry): 179 if cur.is_command: 180 if len(cmds) == len(rsps) + 1: # extra command without response 181 rsps.append(b"") 182 if len(cmds) == len(rsps) != 0 and cur.data.startswith(AID_START_BYTES): 183 standardized.append(FullApduEntry(last_ts, cmds, rsps)) 184 cmds = [] 185 rsps = [] 186 cmds.append(cur.data) 187 else: 188 if len(cmds) == len(rsps): # extra response without command 189 continue 190 rsps.append(cur.data) 191 last_ts = cur.ts 192 # handle last command and response 193 if len(cmds) == len(rsps) + 1: 194 rsps.append(b"") 195 if len(cmds) == len(rsps) != 0: 196 standardized.append(FullApduEntry(last_ts, cmds, rsps)) 197 return standardized 198 199 200def parse_file(data: bytes) -> list[PollingLoopEntry | PartialApduEntry]: 201 """Parses the file to extract polling loop data and APDU exchanges.""" 202 if not data: 203 raise RuntimeError("No data found in file") 204 version = data[0] 205 if version != 1: 206 raise RuntimeError("Unsupported version: {}".format(version)) 207 208 offset = PREAMBLE_LENGTH 209 header_length = HEADER_LENGTH 210 pts_offset = 2 211 polling_list = [] 212 ts = calculate_timestamp(data) 213 while len(data) - offset > header_length: 214 215 # length of the current transaction in bytes 216 length = data[offset] + (data[offset + 1] << 8) 217 218 # duration between the last transaction and the current one 219 pts = bytearray(data[offset + pts_offset : offset + pts_offset + 4]) 220 pts_real = pts[0] + (pts[1] << 8) + (pts[2] << 16) + (pts[3] << 24) 221 ts += pts_real 222 223 transaction_type = (data[offset + header_length] & 0xE0) >> 5 224 if transaction_type == 3: # ST_NTF or NCI_NTF transactions 225 cur_data = data[offset + header_length : offset + header_length + length] 226 if cur_data.startswith(POLLING_LOOP_START_BYTES): 227 polling_list.extend(add_polling_data(cur_data, ts)) 228 elif cur_data.startswith(APDU_START_BYTES): 229 apdu_transactions = find_apdu_transactions(cur_data, ts) 230 polling_list.extend(apdu_transactions) 231 offset += header_length + length 232 return polling_list 233 234 235def open_and_parse_file( 236 file_path: str, 237) -> list[PollingLoopEntry | FullApduEntry]: 238 """Opens the file that contains the unparsed snoop log and parses it. 239 240 Args: 241 file_path: The path of the file containing the unparsed snoop log. 242 243 Returns: 244 A list of polling loop entries and APDU exchanges parsed from the file. 245 246 Raises: 247 RuntimeError: If the file cannot be found. 248 """ 249 snoop_file = open_read_file(file_path) 250 str_data = "" 251 found_log = False 252 while line := snoop_file.readline(): 253 if not found_log and SNOOP_LOG_START in line: 254 found_log = True 255 elif found_log: 256 if SNOOP_LOG_END in line: 257 break 258 str_data += line 259 snoop_bytes = inflate(base64.b64decode(str_data)) 260 parsed = parse_file(snoop_bytes) 261 return standardize_log(parsed) 262 263 264def find_apdu_transactions(data: bytes, ts: int) -> list[PartialApduEntry]: 265 """Finds all APDU transactions in the given data.""" 266 total_size = data[APDU_LEN_INDEX] 267 if total_size < APDU_MAIN_HEADER_SIZE or data[4:6] != APDU_IDENTIFIER: 268 return [] 269 270 apdus: list[PartialApduEntry] = [] 271 index = APDU_DATA_START_INDEX 272 while index < len(data): 273 cur_size = data[index + 1] 274 cur_data = data[index : index + cur_size + 2] 275 cmd, is_first = parse_apdu_command(cur_data) or (None, None) 276 if cmd is not None: 277 apdus.append( 278 PartialApduEntry(ts=ts, is_command=True, data=cmd, is_first=is_first) 279 ) 280 else: 281 rsp, is_first = parse_apdu_response(cur_data) or (None, None) 282 if rsp is not None: 283 apdus.append( 284 PartialApduEntry( 285 ts=ts, is_command=False, data=rsp, is_first=is_first 286 ) 287 ) 288 index += cur_size + 2 289 return apdus 290 291 292def parse_apdu_command(data: bytes): 293 """Isolate the bytes sent from the reader to the emulator. 294 295 Args: 296 data: The raw APDU command in bytes. 297 298 Returns: 299 the data sent by the APDU command, or none if it is not a valid APDU 300 command. 301 """ 302 if len(data) < APDU_COMMAND_MIN_LENGTH: 303 return None 304 if data[0] != APDU_COMMAND_IDENTIFIER: 305 return None 306 if data[1] != len(data) - 2: 307 return None 308 if data[5:7] != bytes.fromhex("0000"): 309 return None 310 if data[8] in [APDU_ORDER_FIRST, APDU_ORDER_SECOND]: 311 is_first = True if data[8] == APDU_ORDER_FIRST else False 312 return data[9:-4], is_first 313 elif data[8:10] in [APDU_ORDER_FIRST_ALT, APDU_ORDER_SECOND_ALT]: 314 is_first = True if data[8:10] == APDU_ORDER_FIRST_ALT else False 315 return data[10:-4], is_first 316 return None 317 318 319def parse_apdu_response(data: bytes): 320 """Isolates the data sent from the emulator to the reader. 321 322 Args: 323 data: The raw APDU response in bytes. 324 325 Returns: 326 the data sent by the APDU response, or none if it is not a valid APDU 327 response. 328 """ 329 if len(data) < APDU_RESPONSE_MIN_LENGTH: 330 return None 331 if data[0] != APDU_RESPONSE_IDENTIFIER: 332 return None 333 if data[1] != len(data) - 2: 334 return None 335 if data[5] in [APDU_ORDER_FIRST, APDU_ORDER_SECOND]: 336 is_first = True if data[5] == APDU_ORDER_FIRST else False 337 return data[6:-4], is_first 338 elif data[5:7] in [APDU_ORDER_FIRST_ALT, APDU_ORDER_SECOND_ALT]: 339 is_first = True if data[5:7] == APDU_ORDER_FIRST_ALT else False 340 return data[7:-4], is_first 341 elif data[7] in [APDU_ORDER_FIRST, APDU_ORDER_SECOND]: 342 is_first = True if data[7] == APDU_ORDER_FIRST else False 343 return data[8:-4], is_first 344 return None 345 346 347def add_polling_data(data: bytes, ts: int) -> list[PollingLoopEntry]: 348 """Adds polling data to the list of transactions. 349 350 Each entry may contain multiple polling data transactions. 351 352 Args: 353 data: The raw polling data in bytes. 354 ts: The timestamp of the polling transaction. 355 356 Returns: 357 A list of polling data transactions. 358 """ 359 transaction_list = [] 360 count = 4 361 while count < len(data): 362 flag = data[count] 363 match flag: 364 case 0: 365 entry_type = NfcType.REMOTE_FIELD 366 case 1: 367 entry_type = NfcType.NFC_A 368 case 2: 369 entry_type = NfcType.NFC_B 370 case _: 371 entry_type = NfcType.UNKNOWN 372 length = data[count + 2] - 5 373 polling_data = data[count + 8 : count + 8 + length] 374 transaction_list.append( 375 PollingLoopEntry( 376 ts=ts, 377 type=entry_type, 378 data=polling_data, 379 ) 380 ) 381 count += 8 + length 382 return transaction_list 383 384 385def calculate_timestamp(data: bytes) -> int: 386 """Calculates the timestamp of the first frame in the log.""" 387 ts = data[1:9] 388 ts_real = ( 389 ts[0] 390 + (ts[1] << 8) 391 + (ts[2] << 16) 392 + (ts[3] << 24) 393 + (ts[4] << 32) 394 + (ts[5] << 40) 395 + (ts[6] << 48) 396 + (ts[7] << 56) 397 ) 398 offset = PREAMBLE_LENGTH 399 while (len(data) - offset) > HEADER_LENGTH: 400 length = data[offset] + (data[offset + 1] << 8) 401 pts = bytearray(data[offset + 2 : offset + 6]) 402 pts_real = pts[0] + (pts[1] << 8) + (pts[2] << 16) + (pts[3] << 24) 403 ts_real -= pts_real 404 offset += HEADER_LENGTH + length 405 return ts_real 406 407 408def inflate(data: bytes) -> bytes: 409 """Inflates decompressed data.""" 410 decompressed = zlib.decompressobj().decompress(data[PREAMBLE_LENGTH:]) 411 return data[0:PREAMBLE_LENGTH] + decompressed 412 413 414def open_read_file(file_path: str): 415 """Opens the file at the given path. 416 417 Args: 418 file_path: The path of the file to be opened. This can be either a local 419 path or an absolute path. 420 421 Returns: 422 An object representing the opened file. 423 424 Raises: 425 RuntimeError: If the file cannot be opened. 426 """ 427 full_path = os.path.dirname(os.path.realpath(__file__)) + "/" + file_path 428 if os.path.exists(file_path): 429 file_to_open = file_path 430 elif os.path.exists(full_path): 431 file_to_open = full_path 432 else: 433 raise RuntimeError("File not found: {}".format(file_path)) 434 435 try: 436 return open(file_to_open, "rt") 437 except Exception as e: 438 raise RuntimeError( 439 "Error occurred while opening file: {}".format(file_path) 440 ) from e 441