#!/usr/bin/env python3 # # Copyright (c) 2018, The OpenThread Authors. # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # 3. Neither the name of the copyright holder nor the # names of its contributors may be used to endorse or promote products # derived from this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. # import binascii import bisect import os import socket import struct import traceback import time import io import config import mesh_cop import message import pcap import wpan def dbg_print(*args): if False: print(args) class BaseSimulator(object): def __init__(self): self._nodes = {} self.commissioning_messages = {} self._payload_parse_factory = mesh_cop.MeshCopCommandFactory(mesh_cop.create_default_mesh_cop_tlv_factories()) self._mesh_cop_msg_set = mesh_cop.create_mesh_cop_message_type_set() def __del__(self): self._nodes = None def add_node(self, node): self._nodes[node.nodeid] = node self.commissioning_messages[node.nodeid] = [] def set_lowpan_context(self, cid, prefix): raise NotImplementedError def get_messages_sent_by(self, nodeid): raise NotImplementedError def go(self, duration, nodeid=None): raise NotImplementedError def stop(self): raise NotImplementedError def read_cert_messages_in_commissioning_log(self, nodeids): for nodeid in nodeids: node = self._nodes[nodeid] if not node: continue for ( direction, type, payload, ) in node.read_cert_messages_in_commissioning_log(): msg = self._payload_parse_factory.parse(type.decode("utf-8"), io.BytesIO(payload)) self.commissioning_messages[nodeid].append(msg) class RealTime(BaseSimulator): def __init__(self, use_message_factory=True): super(RealTime, self).__init__() self._sniffer = config.create_default_thread_sniffer(use_message_factory=use_message_factory) self._sniffer.start() def set_lowpan_context(self, cid, prefix): self._sniffer.set_lowpan_context(cid, prefix) def get_messages_sent_by(self, nodeid): messages = self._sniffer.get_messages_sent_by(nodeid).messages ret = message.MessagesSet(messages, self.commissioning_messages[nodeid]) self.commissioning_messages[nodeid] = [] return ret def now(self): return time.time() def go(self, duration, nodeid=None): time.sleep(duration) def stop(self): if self.is_running: # self._sniffer.stop() # FIXME: seems it blocks forever self._sniffer = None @property def is_running(self): return self._sniffer is not None class VirtualTime(BaseSimulator): OT_SIM_EVENT_ALARM_FIRED = 0 OT_SIM_EVENT_RADIO_RECEIVED = 1 OT_SIM_EVENT_UART_WRITE = 2 OT_SIM_EVENT_RADIO_SPINEL_WRITE = 3 OT_SIM_EVENT_POSTCMD = 4 EVENT_TIME = 0 EVENT_SEQUENCE = 1 EVENT_ADDR = 2 EVENT_TYPE = 3 EVENT_DATA_LENGTH = 4 EVENT_DATA = 5 BASE_PORT = 9000 MAX_NODES = 33 MAX_MESSAGE = 1024 END_OF_TIME = float('inf') PORT_OFFSET = int(os.getenv('PORT_OFFSET', '0')) BLOCK_TIMEOUT = 10 NCP_SIM = os.getenv('NODE_TYPE', 'sim') == 'ncp-sim' _message_factory = None def __init__(self, use_message_factory=True): super(VirtualTime, self).__init__() self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 2 * 1024 * 1024) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2 * 1024 * 1024) ip = '127.0.0.1' self.port = self.BASE_PORT + (self.PORT_OFFSET * (self.MAX_NODES + 1)) self.sock.bind((ip, self.port)) self.devices = {} self.event_queue = [] # there could be events scheduled at exactly the same time self.event_sequence = 0 self.current_time = 0 self.current_event = None self.awake_devices = set() self._nodes_by_ack_seq = {} self._node_ack_seq = {} self._pcap = pcap.PcapCodec(os.getenv('TEST_NAME', 'current')) # the addr for spinel-cli sending OT_SIM_EVENT_POSTCMD self._spinel_cli_addr = (ip, self.BASE_PORT + self.port) self.current_nodeid = None self._pause_time = 0 if use_message_factory: self._message_factory = config.create_default_thread_message_factory() else: self._message_factory = None def __del__(self): if self.sock: self.stop() def stop(self): if self.sock: self.sock.close() self.sock = None @property def is_running(self): return self.sock is not None def _add_message(self, nodeid, message_obj): addr = ('127.0.0.1', self.port + nodeid) # Ignore any exceptions try: if self._message_factory is not None: messages = self._message_factory.create(io.BytesIO(message_obj)) self.devices[addr]['msgs'] += messages except message.DropPacketException: print('Drop current packet because it cannot be handled in test scripts') except Exception as e: # Just print the exception to the console print("EXCEPTION: %s" % e) traceback.print_exc() def set_lowpan_context(self, cid, prefix): if self._message_factory is not None: self._message_factory.set_lowpan_context(cid, prefix) def get_messages_sent_by(self, nodeid): """ Get sniffed messages. Note! This method flushes the message queue so calling this method again will return only the newly logged messages. Args: nodeid (int): node id Returns: MessagesSet: a set with received messages. """ addr = ('127.0.0.1', self.port + nodeid) messages = self.devices[addr]['msgs'] self.devices[addr]['msgs'] = [] ret = message.MessagesSet(messages, self.commissioning_messages[nodeid]) self.commissioning_messages[nodeid] = [] return ret def _is_radio(self, addr): return addr[1] < self.BASE_PORT * 2 def _to_core_addr(self, addr): assert self._is_radio(addr) return (addr[0], addr[1] + self.BASE_PORT) def _to_radio_addr(self, addr): assert not self._is_radio(addr) return (addr[0], addr[1] - self.BASE_PORT) def _core_addr_from(self, nodeid): if self._nodes[nodeid].is_posix: return ('127.0.0.1', self.BASE_PORT + self.port + nodeid) else: return ('127.0.0.1', self.port + nodeid) def _next_event_time(self): if len(self.event_queue) == 0: return self.END_OF_TIME else: return self.event_queue[0][0] def receive_events(self): """ Receive events until all devices are asleep. """ while True: if (self.current_event or len(self.awake_devices) or (self._next_event_time() > self._pause_time and self.current_nodeid)): self.sock.settimeout(self.BLOCK_TIMEOUT) try: msg, addr = self.sock.recvfrom(self.MAX_MESSAGE) except socket.error: # print debug information on failure print('Current nodeid:') print(self.current_nodeid) print('Current awake:') print(self.awake_devices) print('Current time:') print(self.current_time) print('Current event:') print(self.current_event) print('Events:') for event in self.event_queue: print(event) raise else: self.sock.settimeout(0) try: msg, addr = self.sock.recvfrom(self.MAX_MESSAGE) except socket.error: break if addr != self._spinel_cli_addr and addr not in self.devices: self.devices[addr] = {} self.devices[addr]['alarm'] = None self.devices[addr]['msgs'] = [] self.devices[addr]['time'] = self.current_time self.awake_devices.discard(addr) # print "New device:", addr, self.devices delay, type, datalen = struct.unpack('=QBH', msg[:11]) data = msg[11:] event_time = self.current_time + delay if data: dbg_print( "New event: ", event_time, addr, type, datalen, binascii.hexlify(data), ) else: dbg_print("New event: ", event_time, addr, type, datalen) if type == self.OT_SIM_EVENT_ALARM_FIRED: # remove any existing alarm event for device if self.devices[addr]['alarm']: self.event_queue.remove(self.devices[addr]['alarm']) # print "-- Remove\t", self.devices[addr]['alarm'] # add alarm event to event queue event = (event_time, self.event_sequence, addr, type, datalen) self.event_sequence += 1 # print "-- Enqueue\t", event, delay, self.current_time bisect.insort(self.event_queue, event) self.devices[addr]['alarm'] = event self.awake_devices.discard(addr) if (self.current_event and self.current_event[self.EVENT_ADDR] == addr): # print "Done\t", self.current_event self.current_event = None elif type == self.OT_SIM_EVENT_RADIO_RECEIVED: assert self._is_radio(addr) # add radio receive events event queue frame_info = wpan.dissect(data) recv_devices = None if frame_info.frame_type == wpan.FrameType.ACK: recv_devices = self._nodes_by_ack_seq.get(frame_info.seq_no) recv_devices = recv_devices or self.devices.keys() for device in recv_devices: if device != addr and self._is_radio(device): event = ( event_time, self.event_sequence, device, type, datalen, data, ) self.event_sequence += 1 # print "-- Enqueue\t", event bisect.insort(self.event_queue, event) self._pcap.append(data, (event_time // 1000000, event_time % 1000000)) self._add_message(addr[1] - self.port, data) # add radio transmit done events to event queue event = ( event_time, self.event_sequence, addr, type, datalen, data, ) self.event_sequence += 1 bisect.insort(self.event_queue, event) if frame_info.frame_type != wpan.FrameType.ACK and not frame_info.is_broadcast: self._on_ack_seq_change(addr, frame_info.seq_no) self.awake_devices.add(addr) elif type == self.OT_SIM_EVENT_RADIO_SPINEL_WRITE: assert not self._is_radio(addr) radio_addr = self._to_radio_addr(addr) if radio_addr not in self.devices: self.awake_devices.add(radio_addr) event = ( event_time, self.event_sequence, radio_addr, self.OT_SIM_EVENT_UART_WRITE, datalen, data, ) self.event_sequence += 1 bisect.insort(self.event_queue, event) self.awake_devices.add(addr) elif type == self.OT_SIM_EVENT_UART_WRITE: assert self._is_radio(addr) core_addr = self._to_core_addr(addr) if core_addr not in self.devices: self.awake_devices.add(core_addr) event = ( event_time, self.event_sequence, core_addr, self.OT_SIM_EVENT_RADIO_SPINEL_WRITE, datalen, data, ) self.event_sequence += 1 bisect.insort(self.event_queue, event) self.awake_devices.add(addr) elif type == self.OT_SIM_EVENT_POSTCMD: assert self.current_time == self._pause_time nodeid = struct.unpack('=B', data)[0] if self.current_nodeid == nodeid: self.current_nodeid = None def _on_ack_seq_change(self, device: tuple, seq_no: int): old_seq = self._node_ack_seq.pop(device, None) if old_seq is not None: self._nodes_by_ack_seq[old_seq].remove(device) self._node_ack_seq[device] = seq_no self._nodes_by_ack_seq.setdefault(seq_no, set()).add(device) def _send_message(self, message, addr): while True: try: sent = self.sock.sendto(message, addr) except socket.error: traceback.print_exc() time.sleep(0) else: break assert sent == len(message) def process_next_event(self): assert self.current_event is None assert self._next_event_time() < self.END_OF_TIME # process next event event = self.event_queue.pop(0) if len(event) == 5: event_time, sequence, addr, type, datalen = event dbg_print("Pop event: ", event_time, addr, type, datalen) else: event_time, sequence, addr, type, datalen, data = event dbg_print( "Pop event: ", event_time, addr, type, datalen, binascii.hexlify(data), ) self.current_event = event assert event_time >= self.current_time self.current_time = event_time elapsed = event_time - self.devices[addr]['time'] self.devices[addr]['time'] = event_time message = struct.pack('=QBH', elapsed, type, datalen) if type == self.OT_SIM_EVENT_ALARM_FIRED: self.devices[addr]['alarm'] = None self._send_message(message, addr) elif type == self.OT_SIM_EVENT_RADIO_RECEIVED: message += data self._send_message(message, addr) elif type == self.OT_SIM_EVENT_RADIO_SPINEL_WRITE: message += data self._send_message(message, addr) elif type == self.OT_SIM_EVENT_UART_WRITE: message += data self._send_message(message, addr) def sync_devices(self): self.current_time = self._pause_time for addr in self.devices: elapsed = self.current_time - self.devices[addr]['time'] if elapsed == 0: continue dbg_print('syncing', addr, elapsed) self.devices[addr]['time'] = self.current_time message = struct.pack('=QBH', elapsed, self.OT_SIM_EVENT_ALARM_FIRED, 0) self._send_message(message, addr) self.awake_devices.add(addr) self.receive_events() self.awake_devices.clear() def now(self): return self.current_time / 1000000 def go(self, duration, nodeid=None): assert self.current_time == self._pause_time duration = int(duration * 1000000) dbg_print('running for %d us' % duration) self._pause_time += duration if nodeid: if self.NCP_SIM: self.current_nodeid = nodeid self.awake_devices.add(self._core_addr_from(nodeid)) self.receive_events() while self._next_event_time() <= self._pause_time: self.process_next_event() self.receive_events() if duration > 0: self.sync_devices() dbg_print('current time %d us' % self.current_time) if __name__ == '__main__': simulator = VirtualTime() while True: simulator.go(0)