1# Lint as: python2, python3 2# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Wrapper for an RF switch built on an Elexol EtherIO24. 7 8The EtherIO is documented at 9 http://www.elexol.com/IO_Modules/Ether_IO_24_Dip_R.php 10 11This file is both a python module and a command line utility to speak 12to the module 13""" 14 15from __future__ import absolute_import 16from __future__ import division 17from __future__ import print_function 18 19import collections 20import socket 21import struct 22import sys 23 24from six.moves import range 25 26from autotest_lib.client.cros.cellular import cellular_logging 27 28log = cellular_logging.SetupCellularLogging('ether_io_rf_switch') 29 30 31class Error(Exception): 32 pass 33 34 35class EtherIo24(object): 36 """Encapsulates an EtherIO24 UDP-GPIO bridge.""" 37 38 def __init__(self, hostname, port=2424): 39 self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 40 self.socket.bind(('', 0)) 41 self.destination = (hostname, port) 42 self.socket.settimeout(3) # In seconds 43 44 def SendPayload(self, payload): 45 self.socket.sendto(payload, self.destination) 46 47 def SendOperation(self, opcode, list_bytes): 48 """Sends the specified opcode with [list_bytes] as an argument.""" 49 payload = opcode + struct.pack(('=%dB' % len(list_bytes)), *list_bytes).decode('utf-8') 50 self.SendPayload(payload) 51 return payload 52 53 def SendCommandVerify(self, write_opcode, list_bytes, read_opcode=None): 54 """Sends opcode and bytes, 55 then reads to make sure command was executed.""" 56 if read_opcode is None: 57 read_opcode = write_opcode.lower() 58 for _ in range(3): 59 write_sent = self.SendOperation(write_opcode, list_bytes) 60 self.SendOperation(read_opcode, list_bytes) 61 try: 62 response = self.AwaitResponse() 63 if response == write_sent: 64 return 65 else: 66 log.warning('Unexpected reply: sent %s, got %s', 67 write_sent.encode('hex_codec'), 68 response.encode('hex_codec')) 69 except socket.timeout: 70 log.warning('Timed out awaiting reply for %s', write_opcode) 71 continue 72 raise Error('Failed to execute %s' % write_sent.encode('hex_codec')) 73 74 def AwaitResponse(self): 75 (response, address) = self.socket.recvfrom(65536) 76 if (socket.gethostbyname(address[0]) != 77 socket.gethostbyname(self.destination[0])): 78 log.warning('Unexpected reply source: %s (expected %s)', 79 address, self.destination) 80 return response 81 82 83class RfSwitch(object): 84 """An RF switch hooked to an Elexol EtherIO24.""" 85 86 def __init__(self, ip): 87 self.io = EtherIo24(ip) 88 # Must run on pythons without 0bxxx notation. These are 1110, 89 # 1101, 1011, 0111 90 decode = [0xe, 0xd, 0xb, 0x7] 91 92 self.port_mapping = [] 93 for upper in range(3): 94 for lower in range(4): 95 self.port_mapping.append(decode[upper] << 4 | decode[lower]) 96 97 def SelectPort(self, n): 98 """Connects port n to the RF generator.""" 99 # Set all pins to output 100 101 # !A0: all pins output 102 self.io.SendCommandVerify('!A', [0]) 103 self.io.SendCommandVerify('A', [self.port_mapping[n]]) 104 105 def Query(self): 106 """Returns (binary port status, selected port, port direction).""" 107 self.io.SendOperation('!a', []) 108 raw_direction = self.io.AwaitResponse() 109 direction = ord(raw_direction[2]) 110 111 self.io.SendOperation('a', []) 112 status = ord(self.io.AwaitResponse()[1]) 113 try: 114 port = self.port_mapping.index(status) 115 except ValueError: 116 port = None 117 118 return status, port, direction 119 120 121def CommandLineUtility(arguments): 122 """Command line utility to control a switch.""" 123 124 def Select(switch, remaining_args): 125 switch.SelectPort(int(remaining_args.popleft())) 126 127 def Query(switch, unused_remaining_args): 128 (raw_status, port, direction) = switch.Query() 129 if direction != 0x00: 130 print('Warning: Direction register is %x, should be 0x00' % \ 131 direction) 132 if port is None: 133 port_str = 'Invalid' 134 else: 135 port_str = str(port) 136 print('Port %s (0x%x)' % (port_str, raw_status)) 137 138 def Usage(): 139 print('usage: %s hostname {query|select portnumber}' % sys.argv[0]) 140 exit(1) 141 142 try: 143 hostname = arguments.popleft() 144 operation = arguments.popleft() 145 146 switch = RfSwitch(hostname) 147 148 if operation == 'query': 149 Query(switch, arguments) 150 elif operation == 'select': 151 Select(switch, arguments) 152 else: 153 Usage() 154 except IndexError: 155 Usage() 156 157if __name__ == '__main__': 158 CommandLineUtility(collections.deque(sys.argv[1:])) 159