xref: /aosp_15_r20/external/autotest/client/cros/cellular/ether_io_rf_switch.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Wrapper for an RF switch built on an Elexol EtherIO24.
7
8The EtherIO is documented at
9        http://www.elexol.com/IO_Modules/Ether_IO_24_Dip_R.php
10
11This file is both a python module and a command line utility to speak
12to the module
13"""
14
15from __future__ import absolute_import
16from __future__ import division
17from __future__ import print_function
18
19import collections
20import socket
21import struct
22import sys
23
24from six.moves import range
25
26from autotest_lib.client.cros.cellular import cellular_logging
27
28log = cellular_logging.SetupCellularLogging('ether_io_rf_switch')
29
30
31class Error(Exception):
32    pass
33
34
35class EtherIo24(object):
36    """Encapsulates an EtherIO24 UDP-GPIO bridge."""
37
38    def __init__(self, hostname, port=2424):
39        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
40        self.socket.bind(('', 0))
41        self.destination = (hostname, port)
42        self.socket.settimeout(3)   # In seconds
43
44    def SendPayload(self, payload):
45        self.socket.sendto(payload, self.destination)
46
47    def SendOperation(self, opcode, list_bytes):
48        """Sends the specified opcode with [list_bytes] as an argument."""
49        payload = opcode + struct.pack(('=%dB' % len(list_bytes)), *list_bytes).decode('utf-8')
50        self.SendPayload(payload)
51        return payload
52
53    def SendCommandVerify(self, write_opcode, list_bytes, read_opcode=None):
54        """Sends opcode and bytes,
55        then reads to make sure command was executed."""
56        if read_opcode is None:
57            read_opcode = write_opcode.lower()
58        for _ in range(3):
59            write_sent = self.SendOperation(write_opcode, list_bytes)
60            self.SendOperation(read_opcode, list_bytes)
61            try:
62                response = self.AwaitResponse()
63                if response == write_sent:
64                    return
65                else:
66                    log.warning('Unexpected reply:  sent %s, got %s',
67                                write_sent.encode('hex_codec'),
68                                response.encode('hex_codec'))
69            except socket.timeout:
70                log.warning('Timed out awaiting reply for %s', write_opcode)
71                continue
72        raise Error('Failed to execute %s' % write_sent.encode('hex_codec'))
73
74    def AwaitResponse(self):
75        (response, address) = self.socket.recvfrom(65536)
76        if (socket.gethostbyname(address[0]) !=
77                socket.gethostbyname(self.destination[0])):
78            log.warning('Unexpected reply source: %s (expected %s)',
79                        address, self.destination)
80        return response
81
82
83class RfSwitch(object):
84    """An RF switch hooked to an Elexol EtherIO24."""
85
86    def __init__(self, ip):
87        self.io = EtherIo24(ip)
88        # Must run on pythons without 0bxxx notation.  These are 1110,
89        # 1101, 1011, 0111
90        decode = [0xe, 0xd, 0xb, 0x7]
91
92        self.port_mapping = []
93        for upper in range(3):
94            for lower in range(4):
95                self.port_mapping.append(decode[upper] << 4 | decode[lower])
96
97    def SelectPort(self, n):
98        """Connects port n to the RF generator."""
99        # Set all pins to output
100
101        # !A0:  all pins output
102        self.io.SendCommandVerify('!A', [0])
103        self.io.SendCommandVerify('A', [self.port_mapping[n]])
104
105    def Query(self):
106        """Returns (binary port status, selected port, port direction)."""
107        self.io.SendOperation('!a', [])
108        raw_direction = self.io.AwaitResponse()
109        direction = ord(raw_direction[2])
110
111        self.io.SendOperation('a', [])
112        status = ord(self.io.AwaitResponse()[1])
113        try:
114            port = self.port_mapping.index(status)
115        except ValueError:
116            port = None
117
118        return status, port, direction
119
120
121def CommandLineUtility(arguments):
122    """Command line utility to control a switch."""
123
124    def Select(switch, remaining_args):
125        switch.SelectPort(int(remaining_args.popleft()))
126
127    def Query(switch, unused_remaining_args):
128        (raw_status, port, direction) = switch.Query()
129        if direction != 0x00:
130            print('Warning: Direction register is %x, should be 0x00' % \
131                  direction)
132        if port is None:
133            port_str = 'Invalid'
134        else:
135            port_str = str(port)
136        print('Port %s  (0x%x)' % (port_str, raw_status))
137
138    def Usage():
139        print('usage:  %s hostname {query|select portnumber}' % sys.argv[0])
140        exit(1)
141
142    try:
143        hostname = arguments.popleft()
144        operation = arguments.popleft()
145
146        switch = RfSwitch(hostname)
147
148        if operation == 'query':
149            Query(switch, arguments)
150        elif operation == 'select':
151            Select(switch, arguments)
152        else:
153            Usage()
154    except IndexError:
155        Usage()
156
157if __name__ == '__main__':
158    CommandLineUtility(collections.deque(sys.argv[1:]))
159