1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# Lint as: python3 16 17import logging 18import struct 19from typing import Optional, Dict 20from enum import IntEnum 21from binascii import hexlify 22 23import serial 24 25from serial.tools.list_ports import comports 26from mobly import logger as mobly_logger 27 28from .tag import TypeATag, TypeBTag 29from .nfcutils import ByteStruct, snake_to_camel, s_to_us 30from .nfcutils.reader import Reader, CONFIGURATION_A_LONG 31 32 33_LONG_PREAMBLE = bytes(20) 34_ACK_FRAME = bytes.fromhex("0000ff00ff00") 35_SOF = bytes.fromhex("0000ff") 36 37 38_BITRATE = {106: 0b000, 212: 0b001, 424: 0b010, 848: 0b011} 39# Framing values defined in PN532_C1, 8.6.23 40_FRAMING = {"A": 0b00, "DEP": 0b01, "F": 0b10, "B": 0b11} 41# Timeout values defined in UM0701-02, Table 17, 42# from 100 µs (n=1) up to 3.28 sec (n=16) 43_TIMEOUT = {n: 100 * 2 ** (n - 1) for n in range(0x01, 0x10)} 44 45 46# Picked manually, might not be the best combinations 47_POWER_LEVELS_TO_P_N_DRIVER_CONFIGS = { 48 # No frames should be detected 49 0: (0b000000, 0b0000), 50 # A, F detected with gain 1-3 51 20: (0b000001, 0b0001), 52 # A, F detected with gain 4-5 53 40: (0b000010, 0b0010), 54 # A, F detected with gain 5-6 55 60: (0b000011, 0b0100), 56 # A, F, detected with gain 7-8 57 80: (0b001000, 0b1000), 58 # A, B, F detected with gain 9 59 100: (0b111111, 0b1111) 60} 61 62 63class Command(IntEnum): 64 """https://www.nxp.com/docs/en/user-guide/141520.pdf 65 UM0701-02 66 """ 67 68 DIAGNOSE = 0x00 69 GET_FIRMWARE_VERSION = 0x02 70 GET_GENERAL_STATUS = 0x04 71 72 READ_REGISTER = 0x06 73 WRITE_REGISTER = 0x08 74 75 SAM_CONFIGURATION = 0x14 76 POWER_DOWN = 0x16 77 78 RF_CONFIGURATION = 0x32 79 80 IN_JUMP_FOR_DEP = 0x56 81 IN_JUMP_FOR_PSL = 0x46 82 IN_LIST_PASSIVE_TARGET = 0x4A 83 84 IN_DATA_EXCHANGE = 0x40 85 IN_COMMUNICATE_THRU = 0x42 86 87 IN_DESELECT = 0x44 88 IN_RELEASE = 0x52 89 IN_SELECT = 0x54 90 91 IN_AUTO_POLL = 0x60 92 93 TG_INIT_AS_TARGET = 0x8C 94 TG_SET_GENERAL_BYTES = 0x92 95 TG_GET_DATA = 0x86 96 TG_SET_DATA = 0x8E 97 TG_SET_METADATA = 0x94 98 TG_GET_INITIATOR_COMMAND = 0x88 99 TG_RESPONSE_TO_INITIATOR = 0x90 100 TG_GET_TARGET_STATUS = 0x8A 101 102 103_BS = ByteStruct.of 104 105 106class Register(IntEnum): 107 """https://www.nxp.com/docs/en/nxp/data-sheets/PN532_C1.pdf 108 PN532/C1 109 8.6.22 CIU memory map 110 8.7.1 Standard registers 111 """ 112 113 structure: ByteStruct 114 115 def __new__(cls, address: int, structure: Optional[ByteStruct] = None): 116 obj = int.__new__(cls, address) 117 obj._value_ = address 118 obj.structure = None 119 return obj 120 121 def __init__(self, _, structure: Optional[ByteStruct] = None): 122 # When initializing, we already know the name, so we're able to 123 # generate a nice name for matching ByteStruct 124 name = snake_to_camel(self.name.lower(), lower=False) 125 self.structure = ByteStruct.of( 126 name, **(structure.fields if structure else {"value": (7, 0)}) 127 ) 128 129 COMMAND = 0x6331 130 COMM_I_EN = 0x6332 131 DIV_I_EN = 0x6333 132 COMM_I_RQ = 0x6334 133 DIV_I_RQ = 0x6335 134 ERROR = 0x6336 135 WATER_LEVEL = 0x633B 136 # (8.6.23.14) Control 137 CONTROL = 0x633C, _BS( 138 t_stop_now=7, 139 t_start_now=6, 140 wr_nfcip1_id_to_fifo=5, 141 initiator=4, 142 rfu=3, 143 rx_last_bits=(2, 0), 144 ) 145 # (8.6.23.15) BitFraming 146 BIT_FRAMING = 0x633D, _BS( 147 start_send=7, rx_align=(6, 4), rfu=3, tx_last_bits=(2, 0) 148 ) 149 # (8.6.23.16) Coll 150 COLL = 0x633E, _BS( 151 values_after_coll=7, 152 rfu=6, 153 coll_pos_not_valid=5, 154 coll_pos=(4, 0), 155 ) 156 # (8.6.23.17) Mode 157 MODE = 0x6301, _BS( 158 msb_first=7, 159 detect_sync=6, 160 tx_wait_rf=5, 161 rx_wait_rf=4, 162 pol_sigin=3, 163 mode_det_off=2, 164 crc_preset=(1, 0), 165 ) 166 # (8.6.23.18) TxMode 167 TX_MODE = 0x6302, _BS( 168 crc_en=7, speed=(6, 4), inv_mod=3, mix=2, framing=(1, 0) 169 ) 170 # (8.6.23.19) RxMode 171 RX_MODE = 0x6303, _BS( 172 crc_en=7, speed=(6, 4), no_err=3, multiple=2, framing=(1, 0) 173 ) 174 # (8.6.23.20) TxControl 175 TX_CONTROL = 0x6304, _BS( 176 inv_tx2_rf_on=7, 177 inv_tx1_rf_on=6, 178 inv_tx2_rf_off=5, 179 inv_tx1_rf_off=4, 180 tx2_cw=3, 181 check_rf=2, 182 tx2_rf_en=1, 183 tx1_rf_en=0, 184 ) 185 # (8.6.23.21) TxAuto 186 TX_AUTO = 0x6305, _BS( 187 auto_rf_off=7, 188 force_100_ask=6, 189 auto_wake_up=5, 190 rfu=4, 191 ca_on=3, 192 initial_rf_on=2, 193 tx2_rf_auto_en=1, 194 tx1_rf_auto_en=0, 195 ) 196 TX_SEL = 0x6306 197 RX_SEL = 0x6307 198 # (8.6.23.24) RxThreshold 199 RX_THRESHOLD = 0x6308, _BS(min_level=(7, 4), rfu=3, col_level=(2, 0)) 200 # (8.6.23.25) Demod 201 DEMOD = 0x6309, _BS( 202 add_iq=(7, 6), fix_iq=5, tau_rcv=(3, 2), tau_sync=(1, 0) 203 ) 204 MANUAL_RCV = 0x630D 205 TYPE_B = 0x630E 206 # (8.6.23.33) GsNOff 207 GS_N_OFF = 0x6313, _BS(cw_gs_n_off=(7, 4), mod_gs_n_off=(3, 0)) 208 # (8.6.23.34) ModWidth 209 MOD_WIDTH = 0x6314, _BS(mod_width=(7, 0)) 210 # (8.6.23.35) TxBitPhase 211 TX_BIT_PHASE = 0x6315, _BS(rcv_clk_change=7, tx_bit_phase=(6, 0)) 212 # (8.6.23.36) RfCfg 213 RF_CFG = 0x6316, _BS(rf_level_amp=7, rx_gain=(6, 4), rf_level=(3, 0)) 214 # (8.6.23.37) GsNOn 215 GS_N_ON = 0x6317, _BS(cw_gs_n_on=(7, 4), mod_gs_n_on=(3, 0)) 216 # (8.6.23.38) CWGsP 217 CW_GS_P = 0x6318, _BS(rfu=(7, 6), cw_gs_p=(5, 0)) 218 # (8.6.23.39) ModGsP 219 MOD_GS_P = 0x6319, _BS(rfu=(7, 6), mod_gs_p=(5, 0)) 220 221 222REG = Register 223 224 225REGISTER_VALUES_FOR_TRANSCEIVE = { 226 # The following registers are configured for transmit 227 # based on register states after using IN_LIST_PASSIVE_TARGET 228 REG.CONTROL: REG.CONTROL.structure(initiator=True), 229 REG.TX_CONTROL: REG.TX_CONTROL.structure( 230 inv_tx2_rf_on=True, 231 tx2_rf_en=True, 232 tx1_rf_en=True, 233 ), 234 REG.RX_THRESHOLD: REG.RX_THRESHOLD.structure( 235 min_level=0b1000, col_level=0b101 236 ), 237 REG.GS_N_OFF: REG.GS_N_OFF.structure( 238 cw_gs_n_off=0b0110, mod_gs_n_off=0b1111 239 ), 240} 241 242 243class RFConfigItem(IntEnum): 244 """https://www.nxp.com/docs/en/user-guide/141520.pdf 245 UM0701-02 246 7.3.1 RFConfiguration 247 """ 248 249 RF_FIELD = 0x01 # ConfigurationData 250 VARIOUS_TIMINGS = 0x02 # RFU, fATR_RES_Timeout, fRetryTimeout 251 # 0x03 RFU 252 MAX_RTY_COM = 0x04 # MaxRtyCOM 253 MAX_RETRIES = 0x05 # MxRtyATR, MxRtyPSL, MxRtyPassiveActivation 254 255 256class BrTy(IntEnum): 257 """https://www.nxp.com/docs/en/user-guide/141520.pdf 258 UM0701-02 259 7.3.5 InListPassiveTarget 260 """ 261 262 # InitiatorData is optional and may contain a UID to initialize 263 TYPE_A_106 = 0x00 264 # InitiatorData contains "Polling" command payload 265 TYPE_F_212 = 0x01 266 TYPE_F_424 = 0x02 267 # InitiatorData contains AFI and optional polling method byte 268 TYPE_B_106 = 0x03 269 # InitiatorData field unused 270 TYPE_A_JEWEL = 0x04 271 272 273class Status(IntEnum): 274 """https://www.nxp.com/docs/en/user-guide/141520.pdf 275 UM0701-02 276 7.1 Error Handling 277 """ 278 279 OK = 0x00 280 TIME_OUT = 0x01 281 CRC_ERROR = 0x02 282 PARITY_ERROR = 0x03 283 ERRONEOUS_BIT_COUNT = 0x04 284 MIFARE_FRAMING_ERROR = 0x05 285 BIT_COLLISION = 0x06 286 COMMUNICATION_BUFFER_SIZE_INSUFFICIENT = 0x07 287 RF_BUFFER_OVERFLOW = 0x09 288 RF_PROTOCOL_ERROR = 0x0B 289 TEMPERATURE_ERROR = 0x0D 290 INTERNAL_BUFFER_OVERFLOW = 0x0E 291 INVALID_PARAMETER = 0x10 292 293 294class PN532(Reader): 295 """Implements an NFC reader with a PN532 chip""" 296 297 def __init__(self, path): 298 """Initializes device on path, 299 or first available serial port if none is provided.""" 300 if len(comports()) == 0: 301 raise IndexError( 302 "Could not find device on serial port" 303 + ", make sure reader is plugged in." 304 ) 305 if len(path) == 0: 306 path = comports()[0].device 307 308 self.register_cache = {} 309 self.rf_configuration_cache = {} 310 self.write_long_preamble = True 311 312 self.log = mobly_logger.PrefixLoggerAdapter( 313 logging.getLogger(), 314 { 315 mobly_logger.PrefixLoggerAdapter.EXTRA_KEY_LOG_PREFIX: ( 316 f"[PN532|{path}]" 317 ) 318 }, 319 ) 320 self.log.debug("Serial port: %s", path) 321 self.device = serial.Serial(path, 115200, timeout=0.5) 322 323 self.device.flush() 324 self._send_ack_frame() 325 self.device.flushInput() 326 if not self.verify_firmware_version(): 327 raise RuntimeError( 328 "Could not verify PN532 firmware on serial path " + path 329 ) 330 self.sam_configuration(mode=0x01, timeout_value=0x00) 331 332 self.write_long_preamble = False 333 334 # Disable retries 335 self.device.flushInput() 336 self.rf_configuration( 337 RFConfigItem.MAX_RETRIES, 338 [ 339 0x00, # MxRtyATR 340 0x00, # MxRtyPSL 341 0x00, # MxRtyPassiveActivation 342 ], 343 ) 344 345 # Custom functions 346 347 def poll_a(self): 348 """Attempts to detect target for NFC type A.""" 349 self.log.debug("Polling A") 350 tag = self.in_list_passive_target(br_ty=BrTy.TYPE_A_106) 351 if tag: 352 self.log.debug(f"Got Type A tag, SEL_RES={tag.sel_res}") 353 return tag 354 355 def poll_b(self): 356 """Attempts to detect target for NFC type B.""" 357 self.log.debug("Polling B") 358 afi = 0x00 359 tag = self.in_list_passive_target( 360 br_ty=BrTy.TYPE_B_106, initiator_data=(afi,) 361 ) 362 if tag: 363 self.log.debug(f"Got Type B tag {tag.sensb_res}") 364 return tag 365 366 def send_broadcast( 367 self, 368 data: bytes, 369 *, 370 configuration=CONFIGURATION_A_LONG 371 ): 372 """Emits a broadcast frame into the polling loop""" 373 self.log.debug("Sending broadcast %s", hexlify(data).decode()) 374 return self.transceive_raw( 375 data=data, 376 type_=configuration.type, 377 crc=configuration.crc, 378 bits=configuration.bits, 379 bitrate=configuration.bitrate, 380 timeout=configuration.timeout or 0.25, 381 power_level=configuration.power, 382 ) 383 384 def mute(self): 385 """Turns off device's RF antenna.""" 386 self.log.debug("Muting") 387 self.rf_configuration(RFConfigItem.RF_FIELD, [0b10]) 388 389 def unmute(self, auto_rf_ca=False): 390 """Turns on device's RF antenna.""" 391 self.log.debug("Unmuting") 392 self.rf_configuration(RFConfigItem.RF_FIELD, [(auto_rf_ca << 1) + 0b01]) 393 394 def reset(self): 395 """Clears out input and output buffers to expunge leftover data""" 396 self.device.reset_input_buffer() 397 self.device.reset_output_buffer() 398 399 # Special commands 400 401 def transceive_raw( 402 self, 403 data, 404 type_="A", 405 crc=True, 406 bits=8, 407 bitrate=106, 408 *, 409 timeout=1, 410 power_level=100, 411 cache_configuration=True, 412 ): 413 """Configures the CIU with specified configuration and sends raw data 414 :param timeout: Timeout in seconds 415 :param cache_configuration: if true, prevents redundant writes & reads 416 """ 417 # Choose the least index of timeout duration 418 # where result >= given value. Timeout is in μs. 419 # If timeout value is too big, or <= 0, 420 # fall back to maximum timeout duration 421 timeout_index = next( 422 (idx for idx, t in _TIMEOUT.items() if t >= s_to_us(timeout)), 0x10 423 ) 424 self.rf_configuration( 425 RFConfigItem.VARIOUS_TIMINGS, 426 [ 427 0x00, # RFU 428 0x0B, # ATR_RES TimeOut, default value is 0x0B 429 timeout_index, 430 ], 431 cache=cache_configuration, 432 ) 433 434 435 p_n_config = next( 436 ( 437 config 438 for power, config in _POWER_LEVELS_TO_P_N_DRIVER_CONFIGS.items() 439 if power >= power_level 440 ), 441 _POWER_LEVELS_TO_P_N_DRIVER_CONFIGS[0] 442 ) 443 p_driver_conductance_level, n_driver_conductance_level = p_n_config 444 445 446 ( 447 tx_mode, rx_mode, tx_auto, bit_frm, 448 gs_n_on, cw_gs_p, md_gs_p, 449 rf_cfg, tx_bit_phase, demod, 450 ) = self.read_registers( 451 REG.TX_MODE, REG.RX_MODE, REG.TX_AUTO, REG.BIT_FRAMING, 452 REG.GS_N_ON, REG.CW_GS_P, REG.MOD_GS_P, 453 REG.RF_CFG, REG.TX_BIT_PHASE, REG.DEMOD, 454 cache=cache_configuration, 455 ) 456 457 # The following register modifications are based on register state 458 # modifications when performing IN_LIST_PASSIVE_TARGET and communication 459 registers_to_write = { 460 REG.TX_MODE: tx_mode.replace( 461 crc_en=crc, speed=_BITRATE[bitrate], framing=_FRAMING[type_] 462 ), 463 REG.RX_MODE: rx_mode.replace( 464 crc_en=crc, speed=_BITRATE[bitrate], framing=_FRAMING[type_] 465 ), 466 REG.TX_AUTO: tx_auto.replace(force_100_ask=type_ == "A"), 467 REG.BIT_FRAMING: bit_frm.replace(tx_last_bits=bits & 0b111), 468 REG.GS_N_ON: gs_n_on.replace( 469 mod_gs_n_on=0b0100 if type_ == "A" else 0b1111, 470 cw_gs_n_on=n_driver_conductance_level & 0b1111, 471 ), 472 REG.CW_GS_P: cw_gs_p.replace( 473 cw_gs_p=p_driver_conductance_level & 0b111111 474 ), 475 REG.MOD_GS_P: md_gs_p.replace( 476 mod_gs_p=0b010111 if type_ == "B" else 0b010001 477 ), 478 REG.RF_CFG: rf_cfg.replace( 479 rx_gain=0b110 if type_ == "F" else 0b101, 480 rf_level=0b1001, 481 ), 482 REG.TX_BIT_PHASE: tx_bit_phase.replace( 483 rcv_clk_change=1, 484 tx_bit_phase=0b1111 if type_ == "F" else 0b0111, 485 ), 486 REG.DEMOD: demod.replace( 487 add_iq=0b01, 488 tau_rcv=0b00 if type_ == "F" else 0b11, 489 tau_sync=0b01, 490 ), 491 **REGISTER_VALUES_FOR_TRANSCEIVE, 492 } 493 494 self.write_registers(registers_to_write, cache=cache_configuration) 495 496 # Handle a special case for FeliCa, where length byte has to be present 497 if type_ == "F": 498 data = [len(data) + 1, *data] 499 500 # No data is OK for this use case 501 return self.in_communicate_thru(data, raise_on_error_status=False) 502 503 def verify_firmware_version(self): 504 """Verifies we are talking to a PN532.""" 505 self.log.debug("Checking firmware version") 506 rsp = self.get_firmware_version() 507 return rsp[0] == 0x32 508 509 # PN532 defined commands 510 511 def initialize_target_mode(self): 512 """Configures the PN532 as target.""" 513 self.log.debug("Initializing target mode") 514 self._execute_command( 515 Command.TG_INIT_AS_TARGET, 516 [ 517 0x05, # Mode 518 0x04, # SENS_RES (2 bytes) 519 0x00, 520 0x12, # nfcid1T (3 BYTES) 521 0x34, 522 0x56, 523 0x20, # SEL_RES 524 0x00, # FeliCAParams[] (18 bytes) 525 0x00, 526 0x00, 527 0x00, 528 0x00, 529 0x00, 530 0x00, 531 0x00, 532 0x00, 533 0x00, 534 0x00, 535 0x00, 536 0x00, 537 0x00, 538 0x00, 539 0x00, 540 0x00, 541 0x00, 542 0x00, # NFCID3T[] (10 bytes) 543 0x00, 544 0x00, 545 0x00, 546 0x00, 547 0x00, 548 0x00, 549 0x00, 550 0x00, 551 0x00, 552 0x00, # LEN Gt 553 0x00, # LEN Tk 554 ], 555 ) 556 557 def sam_configuration(self, mode=0x01, timeout_value=0x00): 558 """(7.2.10) SAMConfiguration""" 559 return self._execute_command( 560 Command.SAM_CONFIGURATION, 561 [mode, timeout_value], 562 timeout=1, 563 min_response=0, 564 ) 565 566 def get_firmware_version(self): 567 """(7.2.2) GetFirmwareVersion""" 568 return self._execute_command( 569 Command.GET_FIRMWARE_VERSION, min_response=4 570 ) 571 572 def in_data_exchange( 573 self, tg, data, *, timeout=3, raise_on_error_status=True 574 ): 575 """(7.3.8) InDataExchange""" 576 rsp = self._execute_command( 577 Command.IN_DATA_EXCHANGE, 578 [tg, *data], 579 timeout=timeout, 580 ) 581 if rsp is None or rsp[0] != Status.OK: 582 if raise_on_error_status: 583 raise RuntimeError(f"No response to {data}; {rsp}") 584 self.log.error("Got error exchanging data") 585 return None 586 return rsp[1:] 587 588 def in_communicate_thru( 589 self, data, *, timeout=1, raise_on_error_status=True 590 ): 591 """(7.3.9) InCommunicateThru""" 592 rsp = self._execute_command( 593 Command.IN_COMMUNICATE_THRU, data, min_response=1, timeout=timeout 594 ) 595 if rsp[0] != Status.OK: 596 if raise_on_error_status: 597 raise RuntimeError(f"No response to {data}; {rsp}") 598 return None 599 return rsp[1:] 600 601 def in_list_passive_target( 602 self, br_ty: BrTy, initiator_data: bytes = b"", max_tg=1 603 ): 604 """(7.3.5) InListPassiveTarget 605 If max_tg=1, returns a tag or None if none was found, 606 Otherwise, returns a list 607 """ 608 # Reset cache values as IN_LIST_PASSIVE_TARGET modifies them 609 self.register_cache = {} 610 self.rf_configuration_cache = {} 611 612 rsp = self._execute_command( 613 Command.IN_LIST_PASSIVE_TARGET, 614 [max_tg, br_ty, *initiator_data], 615 min_response=1, 616 ) 617 618 if rsp[0] == 0: 619 return [] if max_tg > 1 else None 620 621 data = rsp[1:] 622 623 tags = [] 624 offset = 0 625 626 tag_for_brty = { 627 BrTy.TYPE_A_106: TypeATag, 628 BrTy.TYPE_B_106: TypeBTag 629 } 630 631 if br_ty not in tag_for_brty: 632 raise RuntimeError(f"BrTy {br_ty} not supported yet") 633 634 while offset <= len(data) - 1: 635 tag, offset = tag_for_brty[br_ty].from_target_data( 636 self, data[offset:] 637 ) 638 tags.append(tag) 639 640 if max_tg == 1: 641 return tags[0] 642 return tags 643 644 def read_registers(self, *registers: Register, cache=False): 645 """(7.2.4) ReadRegister: 646 Reads CIU registers 647 :param registers: an iterable containing addresses of registers to read 648 :param cache: prevents redundant register reads 649 """ 650 if cache and all( 651 Register(register) in self.register_cache for register in registers 652 ): 653 return [self.register_cache[register] for register in registers] 654 data = b"".join(struct.pack(">H", register) for register in registers) 655 rsp = self._execute_command(Command.READ_REGISTER, data) 656 if not rsp: 657 raise RuntimeError(f"No response for read registers {registers}.") 658 return list( 659 register.structure(byte) for byte, register in zip(rsp, registers) 660 ) 661 662 def write_registers( 663 self, registers: Dict[Register, int], cache=False 664 ) -> None: 665 """(7.2.5) WriteRegister: 666 Writes CIU registers 667 :param registers: dictionary containing key-value pairs 668 of register addresses and values to be written 669 :param cache: prevents redundant register writes 670 """ 671 # If not caching, assume all are different 672 difference = { 673 reg: val 674 for reg, val in registers.items() 675 if not cache or self.register_cache.get(reg) != val 676 } 677 if not difference: 678 return 679 data = b"".join( 680 struct.pack(">HB", reg, val) for reg, val in difference.items() 681 ) 682 self._execute_command(Command.WRITE_REGISTER, data) 683 self.register_cache = {**self.register_cache, **registers} 684 685 def rf_configuration( 686 self, cfg_item: RFConfigItem, value: int, *, cache=False 687 ): 688 """(7.3.1) RFConfiguration 689 Applies settings to one of the available configuration items 690 :param cache: prevents redundant config writes 691 """ 692 if cache and self.rf_configuration_cache.get(cfg_item) == value: 693 return 694 self._execute_command( 695 Command.RF_CONFIGURATION, [cfg_item, *value], min_response=0 696 ) 697 self.rf_configuration_cache[cfg_item] = value 698 699 # Internal communication commands 700 701 def _execute_command( 702 self, command: Command, data=b"", *, timeout=0.5, min_response=None 703 ): 704 """Executes the provided command 705 Verifies that response code matches the command code if response arrived 706 If min_response is set, checks if enough data was returned 707 """ 708 rsp = self._send_frame( 709 self._construct_frame([command, *data]), timeout=timeout 710 ) 711 712 if not rsp: 713 if min_response is not None: 714 raise RuntimeError(f"No response for {command.name}; {rsp}") 715 return None 716 if rsp[0] != command + 1: 717 raise RuntimeError( 718 f"Response code {rsp[0]} does not match the command {command}" 719 ) 720 del rsp[0] 721 722 if isinstance(min_response, int) and len(rsp) < min_response: 723 raise RuntimeError( 724 f"Got unexpected response for {command.name}" 725 + f"; Length mismatch {len(rsp)} < {min_response}" 726 + f"; {bytes(rsp).hex()}" 727 ) 728 729 return rsp 730 731 # Protocol communication methods 732 733 def _construct_frame(self, data): 734 """Construct a data fram to be sent to the PN532.""" 735 # Preamble, start code, length, length checksum, TFI 736 frame = [ 737 0x00, 738 0x00, 739 0xFF, 740 (len(data) + 1) & 0xFF, 741 ((~(len(data) + 1) & 0xFF) + 0x01) & 0xFF, 742 0xD4, 743 ] 744 data_sum = 0xD4 745 746 # Add data to frame 747 for b in data: 748 data_sum += b 749 frame.append(b) 750 frame.append(((~data_sum & 0xFF) + 0x01) & 0xFF) # Data checksum 751 752 frame.append(0x00) # Postamble 753 self.log.debug( 754 "Constructed frame " + hexlify(bytearray(frame)).decode() 755 ) 756 return bytearray(frame) 757 758 def _write(self, frame): 759 """Performs serial writes 760 while handling config for sending long preambles""" 761 if self.write_long_preamble: 762 frame = _LONG_PREAMBLE + frame 763 self.device.write(frame) 764 765 def _send_frame(self, frame, timeout=0.5): 766 """Writes a frame to the device and returns the response.""" 767 self._write(frame) 768 return self._get_device_response(timeout) 769 770 def _send_ack_frame(self, timeout=0.5): 771 """Send ACK frame, there is no response.""" 772 self.device.timeout = timeout 773 self._write(_ACK_FRAME) 774 775 def _get_device_response(self, timeout=0.5): 776 """onfirms we get an ACK frame from device. 777 Reads response frame, and writes ACK. 778 """ 779 self.device.timeout = timeout 780 frame = bytearray(self.device.read(6)) 781 782 if (len(frame)) == 0: 783 self.log.error("Did not get response from PN532") 784 return None 785 786 if bytes(frame) != _ACK_FRAME: 787 self.log.error( 788 "Did not get ACK frame, got %s", hexlify(frame).decode() 789 ) 790 791 frame = bytearray(self.device.read(6)) 792 793 if (len(frame)) == 0: 794 return None 795 796 if bytes(frame[0:3]) != _SOF: 797 self.log.error( 798 "Unexpected start to frame, got %s", 799 hexlify(frame[0:3]).decode(), 800 ) 801 802 data_len = frame[3] 803 length_checksum = frame[4] 804 if (length_checksum + data_len) & 0xFF != 0: 805 self.log.error("Frame failed length checksum") 806 return None 807 808 tfi = frame[5] 809 if tfi != 0xD5: 810 self.log.error( 811 "Unexpected TFI byte when performing read, got %02x", frame[5] 812 ) 813 return None 814 815 data_packet = bytearray( 816 self.device.read(data_len - 1) 817 ) # subtract one since length includes TFI byte. 818 data_checksum = bytearray(self.device.read(1))[0] 819 if (tfi + sum(data_packet) + data_checksum) & 0xFF != 0: 820 self.log.error("Frame failed data checksum") 821 822 postamble = bytearray(self.device.read(1))[0] 823 if postamble != 0x00: 824 if tfi != 0xD5: 825 self.log.error( 826 "Unexpected postamble byte when performing read, got %02x", 827 frame[4], 828 ) 829 830 self._send_ack_frame() 831 832 self.log.debug( 833 "Received frame %s%s", 834 hexlify(frame).decode(), 835 hexlify(data_packet).decode(), 836 ) 837 838 return data_packet 839