1#  Copyright (C) 2024 The Android Open Source Project
2#
3#  Licensed under the Apache License, Version 2.0 (the "License");
4#  you may not use this file except in compliance with the License.
5#  You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14
15# Lint as: python3
16
17import logging
18import struct
19from typing import Optional, Dict
20from enum import IntEnum
21from binascii import hexlify
22
23import serial
24
25from serial.tools.list_ports import comports
26from mobly import logger as mobly_logger
27
28from .tag import TypeATag, TypeBTag
29from .nfcutils import ByteStruct, snake_to_camel, s_to_us
30from .nfcutils.reader import Reader, CONFIGURATION_A_LONG
31
32
33_LONG_PREAMBLE = bytes(20)
34_ACK_FRAME = bytes.fromhex("0000ff00ff00")
35_SOF = bytes.fromhex("0000ff")
36
37
38_BITRATE = {106: 0b000, 212: 0b001, 424: 0b010, 848: 0b011}
39# Framing values defined in PN532_C1, 8.6.23
40_FRAMING = {"A": 0b00, "DEP": 0b01, "F": 0b10, "B": 0b11}
41# Timeout values defined in UM0701-02, Table 17,
42# from 100 µs (n=1) up to 3.28 sec (n=16)
43_TIMEOUT = {n: 100 * 2 ** (n - 1) for n in range(0x01, 0x10)}
44
45
46# Picked manually, might not be the best combinations
47_POWER_LEVELS_TO_P_N_DRIVER_CONFIGS = {
48    # No frames should be detected
49    0: (0b000000, 0b0000),
50    # A, F detected with gain 1-3
51    20: (0b000001, 0b0001),
52    # A, F detected with gain 4-5
53    40: (0b000010, 0b0010),
54    # A, F detected with gain 5-6
55    60: (0b000011, 0b0100),
56    # A, F, detected with gain 7-8
57    80: (0b001000, 0b1000),
58    # A, B, F detected with gain 9
59    100: (0b111111, 0b1111)
60}
61
62
63class Command(IntEnum):
64    """https://www.nxp.com/docs/en/user-guide/141520.pdf
65    UM0701-02
66    """
67
68    DIAGNOSE = 0x00
69    GET_FIRMWARE_VERSION = 0x02
70    GET_GENERAL_STATUS = 0x04
71
72    READ_REGISTER = 0x06
73    WRITE_REGISTER = 0x08
74
75    SAM_CONFIGURATION = 0x14
76    POWER_DOWN = 0x16
77
78    RF_CONFIGURATION = 0x32
79
80    IN_JUMP_FOR_DEP = 0x56
81    IN_JUMP_FOR_PSL = 0x46
82    IN_LIST_PASSIVE_TARGET = 0x4A
83
84    IN_DATA_EXCHANGE = 0x40
85    IN_COMMUNICATE_THRU = 0x42
86
87    IN_DESELECT = 0x44
88    IN_RELEASE = 0x52
89    IN_SELECT = 0x54
90
91    IN_AUTO_POLL = 0x60
92
93    TG_INIT_AS_TARGET = 0x8C
94    TG_SET_GENERAL_BYTES = 0x92
95    TG_GET_DATA = 0x86
96    TG_SET_DATA = 0x8E
97    TG_SET_METADATA = 0x94
98    TG_GET_INITIATOR_COMMAND = 0x88
99    TG_RESPONSE_TO_INITIATOR = 0x90
100    TG_GET_TARGET_STATUS = 0x8A
101
102
103_BS = ByteStruct.of
104
105
106class Register(IntEnum):
107    """https://www.nxp.com/docs/en/nxp/data-sheets/PN532_C1.pdf
108    PN532/C1
109    8.6.22 CIU memory map
110    8.7.1  Standard registers
111    """
112
113    structure: ByteStruct
114
115    def __new__(cls, address: int, structure: Optional[ByteStruct] = None):
116        obj = int.__new__(cls, address)
117        obj._value_ = address
118        obj.structure = None
119        return obj
120
121    def __init__(self, _, structure: Optional[ByteStruct] = None):
122        # When initializing, we already know the name, so we're able to
123        # generate a nice name for matching ByteStruct
124        name = snake_to_camel(self.name.lower(), lower=False)
125        self.structure = ByteStruct.of(
126            name, **(structure.fields if structure else {"value": (7, 0)})
127        )
128
129    COMMAND = 0x6331
130    COMM_I_EN = 0x6332
131    DIV_I_EN = 0x6333
132    COMM_I_RQ = 0x6334
133    DIV_I_RQ = 0x6335
134    ERROR = 0x6336
135    WATER_LEVEL = 0x633B
136    # (8.6.23.14) Control
137    CONTROL = 0x633C, _BS(
138        t_stop_now=7,
139        t_start_now=6,
140        wr_nfcip1_id_to_fifo=5,
141        initiator=4,
142        rfu=3,
143        rx_last_bits=(2, 0),
144    )
145    # (8.6.23.15) BitFraming
146    BIT_FRAMING = 0x633D, _BS(
147        start_send=7, rx_align=(6, 4), rfu=3, tx_last_bits=(2, 0)
148    )
149    # (8.6.23.16) Coll
150    COLL = 0x633E, _BS(
151        values_after_coll=7,
152        rfu=6,
153        coll_pos_not_valid=5,
154        coll_pos=(4, 0),
155    )
156    # (8.6.23.17) Mode
157    MODE = 0x6301, _BS(
158        msb_first=7,
159        detect_sync=6,
160        tx_wait_rf=5,
161        rx_wait_rf=4,
162        pol_sigin=3,
163        mode_det_off=2,
164        crc_preset=(1, 0),
165    )
166    # (8.6.23.18) TxMode
167    TX_MODE = 0x6302, _BS(
168        crc_en=7, speed=(6, 4), inv_mod=3, mix=2, framing=(1, 0)
169    )
170    # (8.6.23.19) RxMode
171    RX_MODE = 0x6303, _BS(
172        crc_en=7, speed=(6, 4), no_err=3, multiple=2, framing=(1, 0)
173    )
174    # (8.6.23.20) TxControl
175    TX_CONTROL = 0x6304, _BS(
176        inv_tx2_rf_on=7,
177        inv_tx1_rf_on=6,
178        inv_tx2_rf_off=5,
179        inv_tx1_rf_off=4,
180        tx2_cw=3,
181        check_rf=2,
182        tx2_rf_en=1,
183        tx1_rf_en=0,
184    )
185    # (8.6.23.21) TxAuto
186    TX_AUTO = 0x6305, _BS(
187        auto_rf_off=7,
188        force_100_ask=6,
189        auto_wake_up=5,
190        rfu=4,
191        ca_on=3,
192        initial_rf_on=2,
193        tx2_rf_auto_en=1,
194        tx1_rf_auto_en=0,
195    )
196    TX_SEL = 0x6306
197    RX_SEL = 0x6307
198    # (8.6.23.24) RxThreshold
199    RX_THRESHOLD = 0x6308, _BS(min_level=(7, 4), rfu=3, col_level=(2, 0))
200    # (8.6.23.25) Demod
201    DEMOD = 0x6309, _BS(
202        add_iq=(7, 6), fix_iq=5, tau_rcv=(3, 2), tau_sync=(1, 0)
203    )
204    MANUAL_RCV = 0x630D
205    TYPE_B = 0x630E
206    # (8.6.23.33) GsNOff
207    GS_N_OFF = 0x6313, _BS(cw_gs_n_off=(7, 4), mod_gs_n_off=(3, 0))
208    # (8.6.23.34) ModWidth
209    MOD_WIDTH = 0x6314, _BS(mod_width=(7, 0))
210    # (8.6.23.35) TxBitPhase
211    TX_BIT_PHASE = 0x6315, _BS(rcv_clk_change=7, tx_bit_phase=(6, 0))
212    # (8.6.23.36) RfCfg
213    RF_CFG = 0x6316, _BS(rf_level_amp=7, rx_gain=(6, 4), rf_level=(3, 0))
214    # (8.6.23.37) GsNOn
215    GS_N_ON = 0x6317, _BS(cw_gs_n_on=(7, 4), mod_gs_n_on=(3, 0))
216    # (8.6.23.38) CWGsP
217    CW_GS_P = 0x6318, _BS(rfu=(7, 6), cw_gs_p=(5, 0))
218    # (8.6.23.39) ModGsP
219    MOD_GS_P = 0x6319, _BS(rfu=(7, 6), mod_gs_p=(5, 0))
220
221
222REG = Register
223
224
225REGISTER_VALUES_FOR_TRANSCEIVE = {
226    # The following registers are configured for transmit
227    # based on register states after using IN_LIST_PASSIVE_TARGET
228    REG.CONTROL: REG.CONTROL.structure(initiator=True),
229    REG.TX_CONTROL: REG.TX_CONTROL.structure(
230        inv_tx2_rf_on=True,
231        tx2_rf_en=True,
232        tx1_rf_en=True,
233    ),
234    REG.RX_THRESHOLD: REG.RX_THRESHOLD.structure(
235        min_level=0b1000, col_level=0b101
236    ),
237    REG.GS_N_OFF: REG.GS_N_OFF.structure(
238        cw_gs_n_off=0b0110, mod_gs_n_off=0b1111
239    ),
240}
241
242
243class RFConfigItem(IntEnum):
244    """https://www.nxp.com/docs/en/user-guide/141520.pdf
245    UM0701-02
246    7.3.1 RFConfiguration
247    """
248
249    RF_FIELD = 0x01  # ConfigurationData
250    VARIOUS_TIMINGS = 0x02  # RFU, fATR_RES_Timeout, fRetryTimeout
251    # 0x03 RFU
252    MAX_RTY_COM = 0x04  # MaxRtyCOM
253    MAX_RETRIES = 0x05  # MxRtyATR, MxRtyPSL, MxRtyPassiveActivation
254
255
256class BrTy(IntEnum):
257    """https://www.nxp.com/docs/en/user-guide/141520.pdf
258    UM0701-02
259    7.3.5 InListPassiveTarget
260    """
261
262    # InitiatorData is optional and may contain a UID to initialize
263    TYPE_A_106 = 0x00
264    # InitiatorData contains "Polling" command payload
265    TYPE_F_212 = 0x01
266    TYPE_F_424 = 0x02
267    # InitiatorData contains AFI and optional polling method byte
268    TYPE_B_106 = 0x03
269    # InitiatorData field unused
270    TYPE_A_JEWEL = 0x04
271
272
273class Status(IntEnum):
274    """https://www.nxp.com/docs/en/user-guide/141520.pdf
275    UM0701-02
276    7.1 Error Handling
277    """
278
279    OK = 0x00
280    TIME_OUT = 0x01
281    CRC_ERROR = 0x02
282    PARITY_ERROR = 0x03
283    ERRONEOUS_BIT_COUNT = 0x04
284    MIFARE_FRAMING_ERROR = 0x05
285    BIT_COLLISION = 0x06
286    COMMUNICATION_BUFFER_SIZE_INSUFFICIENT = 0x07
287    RF_BUFFER_OVERFLOW = 0x09
288    RF_PROTOCOL_ERROR = 0x0B
289    TEMPERATURE_ERROR = 0x0D
290    INTERNAL_BUFFER_OVERFLOW = 0x0E
291    INVALID_PARAMETER = 0x10
292
293
294class PN532(Reader):
295    """Implements an NFC reader with a PN532 chip"""
296
297    def __init__(self, path):
298        """Initializes device on path,
299        or first available serial port if none is provided."""
300        if len(comports()) == 0:
301            raise IndexError(
302                "Could not find device on serial port"
303                + ", make sure reader is plugged in."
304            )
305        if len(path) == 0:
306            path = comports()[0].device
307
308        self.register_cache = {}
309        self.rf_configuration_cache = {}
310        self.write_long_preamble = True
311
312        self.log = mobly_logger.PrefixLoggerAdapter(
313            logging.getLogger(),
314            {
315                mobly_logger.PrefixLoggerAdapter.EXTRA_KEY_LOG_PREFIX: (
316                    f"[PN532|{path}]"
317                )
318            },
319        )
320        self.log.debug("Serial port: %s", path)
321        self.device = serial.Serial(path, 115200, timeout=0.5)
322
323        self.device.flush()
324        self._send_ack_frame()
325        self.device.flushInput()
326        if not self.verify_firmware_version():
327            raise RuntimeError(
328                "Could not verify PN532 firmware on serial path " + path
329            )
330        self.sam_configuration(mode=0x01, timeout_value=0x00)
331
332        self.write_long_preamble = False
333
334        # Disable retries
335        self.device.flushInput()
336        self.rf_configuration(
337            RFConfigItem.MAX_RETRIES,
338            [
339                0x00,  # MxRtyATR
340                0x00,  # MxRtyPSL
341                0x00,  # MxRtyPassiveActivation
342            ],
343        )
344
345    # Custom functions
346
347    def poll_a(self):
348        """Attempts to detect target for NFC type A."""
349        self.log.debug("Polling A")
350        tag = self.in_list_passive_target(br_ty=BrTy.TYPE_A_106)
351        if tag:
352            self.log.debug(f"Got Type A tag, SEL_RES={tag.sel_res}")
353        return tag
354
355    def poll_b(self):
356        """Attempts to detect target for NFC type B."""
357        self.log.debug("Polling B")
358        afi = 0x00
359        tag = self.in_list_passive_target(
360            br_ty=BrTy.TYPE_B_106, initiator_data=(afi,)
361        )
362        if tag:
363            self.log.debug(f"Got Type B tag {tag.sensb_res}")
364        return tag
365
366    def send_broadcast(
367        self,
368        data: bytes,
369        *,
370        configuration=CONFIGURATION_A_LONG
371    ):
372        """Emits a broadcast frame into the polling loop"""
373        self.log.debug("Sending broadcast %s", hexlify(data).decode())
374        return self.transceive_raw(
375            data=data,
376            type_=configuration.type,
377            crc=configuration.crc,
378            bits=configuration.bits,
379            bitrate=configuration.bitrate,
380            timeout=configuration.timeout or 0.25,
381            power_level=configuration.power,
382        )
383
384    def mute(self):
385        """Turns off device's RF antenna."""
386        self.log.debug("Muting")
387        self.rf_configuration(RFConfigItem.RF_FIELD, [0b10])
388
389    def unmute(self, auto_rf_ca=False):
390        """Turns on device's RF antenna."""
391        self.log.debug("Unmuting")
392        self.rf_configuration(RFConfigItem.RF_FIELD, [(auto_rf_ca << 1) + 0b01])
393
394    def reset(self):
395        """Clears out input and output buffers to expunge leftover data"""
396        self.device.reset_input_buffer()
397        self.device.reset_output_buffer()
398
399    # Special commands
400
401    def transceive_raw(
402        self,
403        data,
404        type_="A",
405        crc=True,
406        bits=8,
407        bitrate=106,
408        *,
409        timeout=1,
410        power_level=100,
411        cache_configuration=True,
412    ):
413        """Configures the CIU with specified configuration and sends raw data
414        :param timeout: Timeout in seconds
415        :param cache_configuration: if true, prevents redundant writes & reads
416        """
417        # Choose the least index of timeout duration
418        # where result >= given value. Timeout is in μs.
419        # If timeout value is too big, or <= 0,
420        # fall back to maximum timeout duration
421        timeout_index = next(
422            (idx for idx, t in _TIMEOUT.items() if t >= s_to_us(timeout)), 0x10
423        )
424        self.rf_configuration(
425            RFConfigItem.VARIOUS_TIMINGS,
426            [
427                0x00,  # RFU
428                0x0B,  # ATR_RES TimeOut, default value is 0x0B
429                timeout_index,
430            ],
431            cache=cache_configuration,
432        )
433
434
435        p_n_config = next(
436            (
437                config
438                for power, config in _POWER_LEVELS_TO_P_N_DRIVER_CONFIGS.items()
439                if power >= power_level
440            ),
441            _POWER_LEVELS_TO_P_N_DRIVER_CONFIGS[0]
442        )
443        p_driver_conductance_level, n_driver_conductance_level = p_n_config
444
445
446        (
447            tx_mode, rx_mode, tx_auto, bit_frm,
448            gs_n_on, cw_gs_p, md_gs_p,
449            rf_cfg, tx_bit_phase, demod,
450        ) = self.read_registers(
451            REG.TX_MODE, REG.RX_MODE, REG.TX_AUTO, REG.BIT_FRAMING,
452            REG.GS_N_ON, REG.CW_GS_P, REG.MOD_GS_P,
453            REG.RF_CFG, REG.TX_BIT_PHASE, REG.DEMOD,
454            cache=cache_configuration,
455        )
456
457        # The following register modifications are based on register state
458        # modifications when performing IN_LIST_PASSIVE_TARGET and communication
459        registers_to_write = {
460            REG.TX_MODE: tx_mode.replace(
461                crc_en=crc, speed=_BITRATE[bitrate], framing=_FRAMING[type_]
462            ),
463            REG.RX_MODE: rx_mode.replace(
464                crc_en=crc, speed=_BITRATE[bitrate], framing=_FRAMING[type_]
465            ),
466            REG.TX_AUTO: tx_auto.replace(force_100_ask=type_ == "A"),
467            REG.BIT_FRAMING: bit_frm.replace(tx_last_bits=bits & 0b111),
468            REG.GS_N_ON: gs_n_on.replace(
469                mod_gs_n_on=0b0100 if type_ == "A" else 0b1111,
470                cw_gs_n_on=n_driver_conductance_level & 0b1111,
471            ),
472            REG.CW_GS_P: cw_gs_p.replace(
473                cw_gs_p=p_driver_conductance_level & 0b111111
474            ),
475            REG.MOD_GS_P: md_gs_p.replace(
476                mod_gs_p=0b010111 if type_ == "B" else 0b010001
477            ),
478            REG.RF_CFG: rf_cfg.replace(
479                rx_gain=0b110 if type_ == "F" else 0b101,
480                rf_level=0b1001,
481            ),
482            REG.TX_BIT_PHASE: tx_bit_phase.replace(
483                rcv_clk_change=1,
484                tx_bit_phase=0b1111 if type_ == "F" else 0b0111,
485            ),
486            REG.DEMOD: demod.replace(
487                add_iq=0b01,
488                tau_rcv=0b00 if type_ == "F" else 0b11,
489                tau_sync=0b01,
490            ),
491            **REGISTER_VALUES_FOR_TRANSCEIVE,
492        }
493
494        self.write_registers(registers_to_write, cache=cache_configuration)
495
496        # Handle a special case for FeliCa, where length byte has to be present
497        if type_ == "F":
498            data = [len(data) + 1, *data]
499
500        # No data is OK for this use case
501        return self.in_communicate_thru(data, raise_on_error_status=False)
502
503    def verify_firmware_version(self):
504        """Verifies we are talking to a PN532."""
505        self.log.debug("Checking firmware version")
506        rsp = self.get_firmware_version()
507        return rsp[0] == 0x32
508
509    # PN532 defined commands
510
511    def initialize_target_mode(self):
512        """Configures the PN532 as target."""
513        self.log.debug("Initializing target mode")
514        self._execute_command(
515            Command.TG_INIT_AS_TARGET,
516            [
517                0x05,  # Mode
518                0x04,  # SENS_RES (2 bytes)
519                0x00,
520                0x12,  # nfcid1T (3 BYTES)
521                0x34,
522                0x56,
523                0x20,  # SEL_RES
524                0x00,  # FeliCAParams[] (18 bytes)
525                0x00,
526                0x00,
527                0x00,
528                0x00,
529                0x00,
530                0x00,
531                0x00,
532                0x00,
533                0x00,
534                0x00,
535                0x00,
536                0x00,
537                0x00,
538                0x00,
539                0x00,
540                0x00,
541                0x00,
542                0x00,  # NFCID3T[] (10 bytes)
543                0x00,
544                0x00,
545                0x00,
546                0x00,
547                0x00,
548                0x00,
549                0x00,
550                0x00,
551                0x00,
552                0x00,  # LEN Gt
553                0x00,  # LEN Tk
554            ],
555        )
556
557    def sam_configuration(self, mode=0x01, timeout_value=0x00):
558        """(7.2.10) SAMConfiguration"""
559        return self._execute_command(
560            Command.SAM_CONFIGURATION,
561            [mode, timeout_value],
562            timeout=1,
563            min_response=0,
564        )
565
566    def get_firmware_version(self):
567        """(7.2.2) GetFirmwareVersion"""
568        return self._execute_command(
569            Command.GET_FIRMWARE_VERSION, min_response=4
570        )
571
572    def in_data_exchange(
573        self, tg, data, *, timeout=3, raise_on_error_status=True
574    ):
575        """(7.3.8) InDataExchange"""
576        rsp = self._execute_command(
577            Command.IN_DATA_EXCHANGE,
578            [tg, *data],
579            timeout=timeout,
580        )
581        if rsp is None or rsp[0] != Status.OK:
582            if raise_on_error_status:
583                raise RuntimeError(f"No response to {data}; {rsp}")
584            self.log.error("Got error exchanging data")
585            return None
586        return rsp[1:]
587
588    def in_communicate_thru(
589        self, data, *, timeout=1, raise_on_error_status=True
590    ):
591        """(7.3.9) InCommunicateThru"""
592        rsp = self._execute_command(
593            Command.IN_COMMUNICATE_THRU, data, min_response=1, timeout=timeout
594        )
595        if rsp[0] != Status.OK:
596            if raise_on_error_status:
597                raise RuntimeError(f"No response to {data}; {rsp}")
598            return None
599        return rsp[1:]
600
601    def in_list_passive_target(
602        self, br_ty: BrTy, initiator_data: bytes = b"", max_tg=1
603    ):
604        """(7.3.5) InListPassiveTarget
605        If max_tg=1, returns a tag or None if none was found,
606        Otherwise, returns a list
607        """
608        # Reset cache values as IN_LIST_PASSIVE_TARGET modifies them
609        self.register_cache = {}
610        self.rf_configuration_cache = {}
611
612        rsp = self._execute_command(
613            Command.IN_LIST_PASSIVE_TARGET,
614            [max_tg, br_ty, *initiator_data],
615            min_response=1,
616        )
617
618        if rsp[0] == 0:
619            return [] if max_tg > 1 else None
620
621        data = rsp[1:]
622
623        tags = []
624        offset = 0
625
626        tag_for_brty = {
627            BrTy.TYPE_A_106: TypeATag,
628            BrTy.TYPE_B_106: TypeBTag
629        }
630
631        if br_ty not in tag_for_brty:
632            raise RuntimeError(f"BrTy {br_ty} not supported yet")
633
634        while offset <= len(data) - 1:
635            tag, offset = tag_for_brty[br_ty].from_target_data(
636                self, data[offset:]
637            )
638            tags.append(tag)
639
640        if max_tg == 1:
641            return tags[0]
642        return tags
643
644    def read_registers(self, *registers: Register, cache=False):
645        """(7.2.4) ReadRegister:
646        Reads CIU registers
647         :param registers: an iterable containing addresses of registers to read
648         :param cache: prevents redundant register reads
649        """
650        if cache and all(
651            Register(register) in self.register_cache for register in registers
652        ):
653            return [self.register_cache[register] for register in registers]
654        data = b"".join(struct.pack(">H", register) for register in registers)
655        rsp = self._execute_command(Command.READ_REGISTER, data)
656        if not rsp:
657            raise RuntimeError(f"No response for read registers {registers}.")
658        return list(
659            register.structure(byte) for byte, register in zip(rsp, registers)
660        )
661
662    def write_registers(
663        self, registers: Dict[Register, int], cache=False
664    ) -> None:
665        """(7.2.5) WriteRegister:
666        Writes CIU registers
667         :param registers: dictionary containing key-value pairs
668         of register addresses and values to be written
669         :param cache: prevents redundant register writes
670        """
671        # If not caching, assume all are different
672        difference = {
673            reg: val
674            for reg, val in registers.items()
675            if not cache or self.register_cache.get(reg) != val
676        }
677        if not difference:
678            return
679        data = b"".join(
680            struct.pack(">HB", reg, val) for reg, val in difference.items()
681        )
682        self._execute_command(Command.WRITE_REGISTER, data)
683        self.register_cache = {**self.register_cache, **registers}
684
685    def rf_configuration(
686        self, cfg_item: RFConfigItem, value: int, *, cache=False
687    ):
688        """(7.3.1) RFConfiguration
689        Applies settings to one of the available configuration items
690        :param cache: prevents redundant config writes
691        """
692        if cache and self.rf_configuration_cache.get(cfg_item) == value:
693            return
694        self._execute_command(
695            Command.RF_CONFIGURATION, [cfg_item, *value], min_response=0
696        )
697        self.rf_configuration_cache[cfg_item] = value
698
699    # Internal communication commands
700
701    def _execute_command(
702        self, command: Command, data=b"", *, timeout=0.5, min_response=None
703    ):
704        """Executes the provided command
705        Verifies that response code matches the command code if response arrived
706        If min_response is set, checks if enough data was returned
707        """
708        rsp = self._send_frame(
709            self._construct_frame([command, *data]), timeout=timeout
710        )
711
712        if not rsp:
713            if min_response is not None:
714                raise RuntimeError(f"No response for {command.name}; {rsp}")
715            return None
716        if rsp[0] != command + 1:
717            raise RuntimeError(
718                f"Response code {rsp[0]} does not match the command {command}"
719            )
720        del rsp[0]
721
722        if isinstance(min_response, int) and len(rsp) < min_response:
723            raise RuntimeError(
724                f"Got unexpected response for {command.name}"
725                + f"; Length mismatch {len(rsp)} < {min_response}"
726                + f"; {bytes(rsp).hex()}"
727            )
728
729        return rsp
730
731    # Protocol communication methods
732
733    def _construct_frame(self, data):
734        """Construct a data fram to be sent to the PN532."""
735        # Preamble, start code, length, length checksum, TFI
736        frame = [
737            0x00,
738            0x00,
739            0xFF,
740            (len(data) + 1) & 0xFF,
741            ((~(len(data) + 1) & 0xFF) + 0x01) & 0xFF,
742            0xD4,
743        ]
744        data_sum = 0xD4
745
746        # Add data to frame
747        for b in data:
748            data_sum += b
749            frame.append(b)
750        frame.append(((~data_sum & 0xFF) + 0x01) & 0xFF)  # Data checksum
751
752        frame.append(0x00)  # Postamble
753        self.log.debug(
754            "Constructed frame " + hexlify(bytearray(frame)).decode()
755        )
756        return bytearray(frame)
757
758    def _write(self, frame):
759        """Performs serial writes
760        while handling config for sending long preambles"""
761        if self.write_long_preamble:
762            frame = _LONG_PREAMBLE + frame
763        self.device.write(frame)
764
765    def _send_frame(self, frame, timeout=0.5):
766        """Writes a frame to the device and returns the response."""
767        self._write(frame)
768        return self._get_device_response(timeout)
769
770    def _send_ack_frame(self, timeout=0.5):
771        """Send ACK frame, there is no response."""
772        self.device.timeout = timeout
773        self._write(_ACK_FRAME)
774
775    def _get_device_response(self, timeout=0.5):
776        """onfirms we get an ACK frame from device.
777        Reads response frame, and writes ACK.
778        """
779        self.device.timeout = timeout
780        frame = bytearray(self.device.read(6))
781
782        if (len(frame)) == 0:
783            self.log.error("Did not get response from PN532")
784            return None
785
786        if bytes(frame) != _ACK_FRAME:
787            self.log.error(
788                "Did not get ACK frame, got %s", hexlify(frame).decode()
789            )
790
791        frame = bytearray(self.device.read(6))
792
793        if (len(frame)) == 0:
794            return None
795
796        if bytes(frame[0:3]) != _SOF:
797            self.log.error(
798                "Unexpected start to frame, got %s",
799                hexlify(frame[0:3]).decode(),
800            )
801
802        data_len = frame[3]
803        length_checksum = frame[4]
804        if (length_checksum + data_len) & 0xFF != 0:
805            self.log.error("Frame failed length checksum")
806            return None
807
808        tfi = frame[5]
809        if tfi != 0xD5:
810            self.log.error(
811                "Unexpected TFI byte when performing read, got %02x", frame[5]
812            )
813            return None
814
815        data_packet = bytearray(
816            self.device.read(data_len - 1)
817        )  # subtract one since length includes TFI byte.
818        data_checksum = bytearray(self.device.read(1))[0]
819        if (tfi + sum(data_packet) + data_checksum) & 0xFF != 0:
820            self.log.error("Frame failed data checksum")
821
822        postamble = bytearray(self.device.read(1))[0]
823        if postamble != 0x00:
824            if tfi != 0xD5:
825                self.log.error(
826                    "Unexpected postamble byte when performing read, got %02x",
827                    frame[4],
828                )
829
830        self._send_ack_frame()
831
832        self.log.debug(
833            "Received frame %s%s",
834            hexlify(frame).decode(),
835            hexlify(data_packet).decode(),
836        )
837
838        return data_packet
839