1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# Lint as: python3 16 17import argparse 18import datetime 19import os 20import subprocess 21import time 22from generate_test import generate_test 23from parse_log import FullApduEntry, NfcType, PollingLoopEntry, open_and_parse_file, parse_timeframe, replace_aids 24from pn532 import PN532 25 26# Minimum amount of time between successive NFC transactions, to prevent the 27# reader from being overloaded. 28_MIN_SLEEP_TIME_SECONDS = 0.5 29 30# Maximum amount of time allowed between successive NFC transactions, to allow 31# for ease of use. 32_MAX_SLEEP_TIME_SECONDS = 5 33 34_EXPECTED_ERROR_INDEX = 0 35_ACTUAL_ERROR_INDEX = 1 36 37# Number of times to try to obtain a tag before declaring failure. 38_NUM_RETRIES = 50 39 40# String templates for the output of a test case or snoop log. 41_APDU_OUTPUT_STR = "{}: APDU exchange: sent {}, received {}" 42_APDU_OUTPUT_TEST_STR = "{}: APDU exchange: sent {} APDUs, received {} APDUs" 43_POLLING_OUTPUT_STR = "{}: sent NFC data of type {}" 44_ERROR_STR = " ERROR: {}" 45 46# Width of the column for outputting the results of a test case. 47_COLUMN_WIDTH = 80 48 49# Directory for generated test cases and files for the emulator app. 50_EMULATOR_APP_PARSED_DIR = "src/com/android/nfc/emulatorapp/parsed_files/" 51 52 53def send_nfc_a_data(reader: PN532) -> str | None: 54 """Calls poll_a() on the reader. 55 56 Args: 57 reader: The PN532 reader. 58 59 Returns: 60 the error, if one occurs 61 """ 62 try: 63 reader.poll_a() 64 except Exception as e: 65 return e.__str__() 66 67 68def send_nfc_b_data(reader: PN532) -> str | None: 69 """Calls poll_b() on the reader. 70 71 Args: 72 reader: The PN532 reader. 73 74 Returns: 75 the error, if one occurs 76 """ 77 try: 78 reader.poll_b() 79 except Exception as e: 80 return e.__str__() 81 82 83def send_unknown_data(reader: PN532, data: bytes) -> str | None: 84 """Sends a custom polling frame to the reader. 85 86 Args: 87 reader: The PN532 reader. 88 data: The custom polling frame to be sent to the reader. 89 90 Returns: 91 the error, if one occurs 92 """ 93 try: 94 reader.poll_a() 95 reader.send_broadcast(data) 96 reader.mute() 97 except Exception as e: 98 return e.__str__() 99 100 101def conduct_apdu_exchange(reader: PN532, current: FullApduEntry) -> str | None: 102 """Conducts an APDU exchange between the emulator and the PN 532 module. 103 104 Once the device is detected by the reader, the reader will send the APDU 105 commands to the device and receive the responses. If an error occurs -- for 106 instance, if the response from the emulator differs from the expected response 107 -- the error is logged in the output. 108 109 Args: 110 reader: The PN532 reader. 111 current: A data object containing the APDU commands to be sent to the 112 emulator and the expected responses. 113 114 Returns: 115 the error, if one occurs 116 """ 117 try: 118 for i in range(_NUM_RETRIES): 119 tag = reader.poll_a() 120 if tag is not None: 121 transacted = tag.transact(current.command, current.response) 122 123 if not transacted: 124 return "Received incorrect response. Expected: ".format( 125 current.response 126 ) 127 return None 128 reader.mute() 129 return "No tag received" 130 except Exception as e: 131 return e.__str__() 132 133 134def replay_transaction(log, module_path: str): 135 """Replays the given transaction log on the PN 532 module. 136 137 Args: 138 log: The transaction log to be replayed. 139 module_path: The serial path to the PN 532 module. 140 """ 141 try: 142 reader = PN532(module_path) 143 except Exception as e: 144 print("Could not connect to PN532 module") 145 return 146 147 if not log: 148 return 149 150 prev_time = log[0].ts 151 152 for current in log: 153 num_seconds = (current.ts - prev_time) / 1000000 154 if num_seconds < _MIN_SLEEP_TIME_SECONDS: 155 time.sleep(_MIN_SLEEP_TIME_SECONDS) 156 elif num_seconds > _MAX_SLEEP_TIME_SECONDS: 157 time.sleep(_MAX_SLEEP_TIME_SECONDS) 158 else: 159 time.sleep(num_seconds) 160 161 error = None 162 163 if isinstance(current, PollingLoopEntry): 164 if current.type == NfcType.NFC_A: 165 error = send_nfc_a_data(reader) 166 elif current.type == NfcType.NFC_B: 167 error = send_nfc_b_data(reader) 168 elif current.type == NfcType.UNKNOWN: 169 error = send_unknown_data(reader, current.data) 170 elif isinstance(current, FullApduEntry): 171 error = conduct_apdu_exchange(reader, current) 172 173 output_line_for_snoop_log(current, error) 174 175 # adjust timestamp 176 prev_time = current.ts 177 178 reader.mute() 179 180 181def parse_snoop_log(args: argparse.Namespace): 182 """Parses the given snoop log file. 183 184 If the file will be used for replaying a transaction with the emulator app, 185 the AIDs will be replaced with the ones used by the app. Additionally, if the 186 user specifies a start and end time, the log will be filtered to only include 187 transactions that fall within that timeframe. 188 189 Args: 190 snoop_file: The local path to the snoop log file. 191 192 Returns: 193 The parsed snoop log. 194 """ 195 parsed = open_and_parse_file(args.file) 196 197 # replace the AIDs with the ones used by the emulator app 198 if args.replay_with_app or args.parse_only: 199 parsed = replace_aids(parsed) 200 201 return parse_timeframe(parsed, args.start, args.end) 202 203 204def output_line_for_snoop_log( 205 entry: PollingLoopEntry | FullApduEntry, 206 error: str | None, 207): 208 """Outputs a summary of an interaction from a snoop log. 209 210 Args: 211 entry: The current interaction to be replayed from the snoop log. 212 error: Whether or not an error occurred during the replayed (actual) 213 transaction. 214 """ 215 cur_time = int(float(datetime.datetime.now().timestamp() * 1000000)) 216 cur_time_str = datetime.datetime.fromtimestamp(cur_time / 1000000).strftime( 217 "%Y-%m-%d %H:%M:%S.%f" 218 ) 219 if isinstance(entry, FullApduEntry): 220 print( 221 _APDU_OUTPUT_STR.format( 222 cur_time_str, 223 [command.hex() for command in entry.command], 224 [response.hex() for response in entry.response], 225 ) 226 ) 227 else: # isinstance(entry, PollingLoopEntry) 228 print(_POLLING_OUTPUT_STR.format(cur_time_str, entry.type.name)) 229 230 if error is not None: 231 print(_ERROR_STR.format(error)) 232 233 234def print_opening_sequence( 235 file_name: str, 236 start: str | None = None, 237 end: str | None = None, 238): 239 """Prints the opening sequence for a test case or snoop log. 240 241 The name of the file to be replayed is displayed, along with the timeframe 242 that will be replayed, if specified by the user. 243 244 Args: 245 file_name: The name of the file to be replayed. 246 start: The start of the timeframe to be replayed. 247 end: The end of the timeframe to be replayed. 248 """ 249 print() 250 print("Replaying transaction from snoop log: {}".format(file_name)) 251 if start is not None and end is not None: 252 print("Timeframe: {} - {}".format(start, end)) 253 elif start is not None: 254 print("Timeframe: {} - end".format(start)) 255 elif end is not None: 256 print("Timeframe: start - {}".format(end)) 257 else: 258 print() 259 260 261def create_file_for_emulator_app( 262 output: list[PollingLoopEntry | FullApduEntry], filename: str 263): 264 """Creates a file containing the parsed APDU exchanges from a snoop log. 265 266 This will be to replay the transaction with an Android app installed on the 267 emulator. 268 269 Args: 270 output: A list of polling loop entries and APDU exchanges parsed from the 271 snoop log. 272 filename: The name of the file to be created. This is near-identical to the 273 name of the snoop log file. 274 """ 275 local_path = _EMULATOR_APP_PARSED_DIR + filename.replace("/", "_") 276 full_path = os.path.dirname(os.path.realpath(__file__)) + "/" + local_path 277 try: 278 file = open(full_path, "wt") 279 except Exception as e: 280 raise RuntimeError( 281 "Error occurred while opening file: {}".format(full_path) 282 ) from e 283 for entry in output: 284 if isinstance(entry, FullApduEntry): 285 file.write( 286 "{};{}".format( 287 [command.hex() for command in entry.command], 288 [response.hex() for response in entry.response], 289 ) 290 ) 291 file.write("\n") 292 print() 293 print("File for third party app generated at: {}".format(local_path)) 294 295 296def get_name_for_test_case(filename: str) -> str: 297 return "Generated" + filename.replace("/", "").replace(".txt", "") 298 299 300def main(): 301 parser = argparse.ArgumentParser(prog="pn532") 302 parser.add_argument( 303 "-p", 304 "--path", 305 action="store", 306 help="Path to the PN532 serial device, e.g. /dev/ttyUSB0", 307 ) 308 parser.add_argument( 309 "-f", 310 "--file", 311 action="store", 312 required=True, 313 help="Path to the file of the snoop log", 314 ) 315 parser.add_argument( 316 "--start", 317 action="store", 318 help="Start of the timeframe to be replayed", 319 ) 320 parser.add_argument( 321 "--end", 322 action="store", 323 help="End of the timeframe to be replayed", 324 ) 325 parser.add_argument( 326 "--parse_only", 327 action="store_true", 328 help="Parse the log without replaying the transaction", 329 ) 330 parser.add_argument( 331 "--replay_with_app", 332 action="store_true", 333 help="Replay the transaction with the emulator app", 334 ) 335 parser.add_argument( 336 "--generate_and_replay_test", 337 action="store_true", 338 help="Generate a test case from the log and then immediately run it", 339 ) 340 args = parser.parse_args() 341 342 parsed_snoop_log = parse_snoop_log(args) 343 if args.parse_only: # scenario 1: parse snoop log for the emulator app 344 create_file_for_emulator_app(parsed_snoop_log, args.file) 345 else: # scenario 2: replay transaction from a snoop log 346 print_opening_sequence( 347 file_name=args.file, 348 start=args.start, 349 end=args.end, 350 ) 351 if args.generate_and_replay_test: # Replay the test that was just generated 352 test_case_name = get_name_for_test_case(args.file) 353 apdu_local_file = generate_test(parsed_snoop_log, test_case_name) 354 test_command = [ 355 "atest", 356 "-v", 357 test_case_name, 358 "--", 359 "--testparam", 360 "pn532_serial_path=" + args.path, 361 "--testparam", 362 "file_path=" + apdu_local_file, 363 ] 364 if args.replay_with_app: 365 test_command += ["--testparam", "with_emulator_app=True"] 366 subprocess.run(test_command) 367 else: # Default: replay the transaction 368 replay_transaction(parsed_snoop_log, args.path) 369 370 371if __name__ == "__main__": 372 main() 373