xref: /aosp_15_r20/system/nfc/tools/casimir/scripts/t4at.py (revision 7eba2f3b06c51ae21384f6a4f14577b668a869b3)
1#!/usr/bin/env python3
2
3# Copyright 2023 Google LLC
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     https://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import argparse
18import inspect
19import json
20import random
21import readline
22import socket
23import sys
24import time
25import requests
26import struct
27import asyncio
28from concurrent.futures import ThreadPoolExecutor
29
30import rf_packets as rf
31
32
33class T4AT:
34    def __init__(self, reader, writer):
35        self.nfcid1 = bytes([0x08]) + int.to_bytes(random.randint(0, 0xffffff), length=3)
36        self.rats_response = bytes([0x2, 0x0])
37        self.reader = reader
38        self.writer = writer
39
40    async def _read(self) -> rf.RfPacket:
41        header_bytes = await self.reader.read(2)
42        packet_length = int.from_bytes(header_bytes, byteorder='little')
43        packet_bytes = await self.reader.read(packet_length)
44
45        packet = rf.RfPacket.parse_all(packet_bytes)
46        packet.show()
47        return packet
48
49    def _write(self, packet: rf.RfPacket):
50        packet_bytes = packet.serialize()
51        header_bytes = int.to_bytes(len(packet_bytes), length=2, byteorder='little')
52        self.writer.write(header_bytes + packet_bytes)
53
54    async def listen(self):
55        """Emulate device in passive listen mode. Respond to poll requests until
56        the device is activated by a select command."""
57        while True:
58            packet = await self._read()
59            match packet:
60                case rf.PollCommand(technology=rf.Technology.NFC_A):
61                    self._write(rf.NfcAPollResponse(
62                        nfcid1=self.nfcid1, int_protocol=0b01))
63                case rf.T4ATSelectCommand(_):
64                    self._write(rf.T4ATSelectResponse(
65                        rats_response=self.rats_response,
66                        receiver=packet.sender))
67                    print(f"t4at device selected by #{packet.sender}")
68                    await self.active(packet.sender)
69                case _:
70                    pass
71
72    async def poll(self):
73        """Emulate device in passive poll mode. Automatically selects the
74        first discovered device."""
75        while True:
76            try:
77                self._write(rf.PollCommand(technology=rf.Technology.NFC_A))
78                packet = await asyncio.wait_for(self._read(), timeout=1.0)
79                match packet:
80                    # [DIGITAL] Table 20: SEL_RES Response Format
81                    # 01b: Configured for Type 4A Tag Platform
82                    case rf.NfcAPollResponse(int_protocol=0b01):
83                        nfcid1 = bytes(packet.nfcid1)
84                        print(f"discovered t4at device with nfcid1 #{nfcid1.hex()}")
85                        self._write(rf.T4ATSelectCommand(receiver=packet.sender, param=0))
86                        response = await asyncio.wait_for(
87                            self.wait_for_select_response(packet.sender), timeout=1.0)
88                        print(f"t4at device activation complete")
89                        await self.active(response.sender)
90                    case _:
91                        pass
92                time.sleep(0.050);
93            except TimeoutError:
94                pass
95            time.sleep(0.050);
96            try:
97                signature = bytes([0x1, 0x2, 0x3, 0x4]);
98                self._write(rf.PollCommand(technology=rf.Technology.NFC_RAW, data=signature))
99                await asyncio.wait_for(self._read(), timeout=1.0)
100            except TimeoutError:
101                pass
102
103    async def wait_for_select_response(self, sender_id: int):
104        while True:
105            packet = await self._read()
106            if isinstance(packet, rf.T4ATSelectResponse) and packet.sender == sender_id:
107                return packet
108
109    async def active(self, peer: int):
110        """Active mode. Respond to data requests until the device
111        is deselected."""
112        while True:
113            packet = await self._read()
114            match packet:
115                case rf.DeactivateNotification(_):
116                    return
117                case rf.Data(_):
118                    pass
119                case _:
120                    pass
121
122
123async def run(address: str, rf_port: int, mode: str):
124    """Emulate a T4AT compatible device in Listen mode."""
125    try:
126        reader, writer = await asyncio.open_connection(address, rf_port)
127        device = T4AT(reader, writer)
128        if mode == 'poll':
129            await device.poll()
130        elif mode == 'listen':
131            await device.listen()
132        else:
133            print(f"unsupported device mode {mode}")
134    except Exception as exn:
135        print(
136            f'Failed to connect to Casimir server at address {address}:{rf_port}:\n' +
137            f'    {exn}\n' +
138            'Make sure the server is running')
139        exit(1)
140
141
142def main():
143    """Start a Casimir interactive console."""
144    parser = argparse.ArgumentParser(description=__doc__)
145    parser.add_argument('--address',
146                        type=str,
147                        default='127.0.0.1',
148                        help='Select the casimir server address')
149    parser.add_argument('--rf-port',
150                        type=int,
151                        default=7001,
152                        help='Select the casimir TCP RF port')
153    parser.add_argument('--mode',
154                        type=str,
155                        choices=['poll', 'listen'],
156                        default='poll',
157                        help='Select the tag mode')
158    asyncio.run(run(**vars(parser.parse_args())))
159
160
161if __name__ == '__main__':
162    main()
163