1#!/usr/bin/env python3 2 3# Copyright 2023 Google LLC 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# https://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import argparse 18import inspect 19import json 20import random 21import readline 22import socket 23import sys 24import time 25import requests 26import struct 27import asyncio 28from concurrent.futures import ThreadPoolExecutor 29 30import rf_packets as rf 31 32 33class T4AT: 34 def __init__(self, reader, writer): 35 self.nfcid1 = bytes([0x08]) + int.to_bytes(random.randint(0, 0xffffff), length=3) 36 self.rats_response = bytes([0x2, 0x0]) 37 self.reader = reader 38 self.writer = writer 39 40 async def _read(self) -> rf.RfPacket: 41 header_bytes = await self.reader.read(2) 42 packet_length = int.from_bytes(header_bytes, byteorder='little') 43 packet_bytes = await self.reader.read(packet_length) 44 45 packet = rf.RfPacket.parse_all(packet_bytes) 46 packet.show() 47 return packet 48 49 def _write(self, packet: rf.RfPacket): 50 packet_bytes = packet.serialize() 51 header_bytes = int.to_bytes(len(packet_bytes), length=2, byteorder='little') 52 self.writer.write(header_bytes + packet_bytes) 53 54 async def listen(self): 55 """Emulate device in passive listen mode. Respond to poll requests until 56 the device is activated by a select command.""" 57 while True: 58 packet = await self._read() 59 match packet: 60 case rf.PollCommand(technology=rf.Technology.NFC_A): 61 self._write(rf.NfcAPollResponse( 62 nfcid1=self.nfcid1, int_protocol=0b01)) 63 case rf.T4ATSelectCommand(_): 64 self._write(rf.T4ATSelectResponse( 65 rats_response=self.rats_response, 66 receiver=packet.sender)) 67 print(f"t4at device selected by #{packet.sender}") 68 await self.active(packet.sender) 69 case _: 70 pass 71 72 async def poll(self): 73 """Emulate device in passive poll mode. Automatically selects the 74 first discovered device.""" 75 while True: 76 try: 77 self._write(rf.PollCommand(technology=rf.Technology.NFC_A)) 78 packet = await asyncio.wait_for(self._read(), timeout=1.0) 79 match packet: 80 # [DIGITAL] Table 20: SEL_RES Response Format 81 # 01b: Configured for Type 4A Tag Platform 82 case rf.NfcAPollResponse(int_protocol=0b01): 83 nfcid1 = bytes(packet.nfcid1) 84 print(f"discovered t4at device with nfcid1 #{nfcid1.hex()}") 85 self._write(rf.T4ATSelectCommand(receiver=packet.sender, param=0)) 86 response = await asyncio.wait_for( 87 self.wait_for_select_response(packet.sender), timeout=1.0) 88 print(f"t4at device activation complete") 89 await self.active(response.sender) 90 case _: 91 pass 92 time.sleep(0.050); 93 except TimeoutError: 94 pass 95 time.sleep(0.050); 96 try: 97 signature = bytes([0x1, 0x2, 0x3, 0x4]); 98 self._write(rf.PollCommand(technology=rf.Technology.NFC_RAW, data=signature)) 99 await asyncio.wait_for(self._read(), timeout=1.0) 100 except TimeoutError: 101 pass 102 103 async def wait_for_select_response(self, sender_id: int): 104 while True: 105 packet = await self._read() 106 if isinstance(packet, rf.T4ATSelectResponse) and packet.sender == sender_id: 107 return packet 108 109 async def active(self, peer: int): 110 """Active mode. Respond to data requests until the device 111 is deselected.""" 112 while True: 113 packet = await self._read() 114 match packet: 115 case rf.DeactivateNotification(_): 116 return 117 case rf.Data(_): 118 pass 119 case _: 120 pass 121 122 123async def run(address: str, rf_port: int, mode: str): 124 """Emulate a T4AT compatible device in Listen mode.""" 125 try: 126 reader, writer = await asyncio.open_connection(address, rf_port) 127 device = T4AT(reader, writer) 128 if mode == 'poll': 129 await device.poll() 130 elif mode == 'listen': 131 await device.listen() 132 else: 133 print(f"unsupported device mode {mode}") 134 except Exception as exn: 135 print( 136 f'Failed to connect to Casimir server at address {address}:{rf_port}:\n' + 137 f' {exn}\n' + 138 'Make sure the server is running') 139 exit(1) 140 141 142def main(): 143 """Start a Casimir interactive console.""" 144 parser = argparse.ArgumentParser(description=__doc__) 145 parser.add_argument('--address', 146 type=str, 147 default='127.0.0.1', 148 help='Select the casimir server address') 149 parser.add_argument('--rf-port', 150 type=int, 151 default=7001, 152 help='Select the casimir TCP RF port') 153 parser.add_argument('--mode', 154 type=str, 155 choices=['poll', 'listen'], 156 default='poll', 157 help='Select the tag mode') 158 asyncio.run(run(**vars(parser.parse_args()))) 159 160 161if __name__ == '__main__': 162 main() 163