xref: /aosp_15_r20/external/autotest/client/cros/cellular/scpi.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li# Lint as: python2, python3
2*9c5db199SXin Li# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be
4*9c5db199SXin Li# found in the LICENSE file.
5*9c5db199SXin Li
6*9c5db199SXin Lifrom autotest_lib.client.cros.cellular import cellular_logging
7*9c5db199SXin Lifrom autotest_lib.client.cros.cellular import cellular_system_error
8*9c5db199SXin Li
9*9c5db199SXin Lilog = cellular_logging.SetupCellularLogging('scpi_driver')
10*9c5db199SXin Li
11*9c5db199SXin Li
12*9c5db199SXin Liclass _ErrorCheckerContext(object):
13*9c5db199SXin Li    """Reference-count our error-checking state and only check for
14*9c5db199SXin Li    errors when we take the first ref or drop the last ref.
15*9c5db199SXin Li
16*9c5db199SXin Li    This way, we can minimize the number of checks; each one takes a
17*9c5db199SXin Li    bit of time.  You will likely want to set always_check to True when
18*9c5db199SXin Li    debugging new SCPI interactions.
19*9c5db199SXin Li
20*9c5db199SXin Li    On first entry, we check for errors, but do not stop if we find
21*9c5db199SXin Li    them; these are errors that were accumulated on the device before
22*9c5db199SXin Li    this test ran.
23*9c5db199SXin Li    """
24*9c5db199SXin Li
25*9c5db199SXin Li    def __init__(self, scpi):
26*9c5db199SXin Li        self.always_check = True  # True for serious debugging
27*9c5db199SXin Li        self.scpi = scpi
28*9c5db199SXin Li        self.depth = 0
29*9c5db199SXin Li        self.raise_on_error = True
30*9c5db199SXin Li
31*9c5db199SXin Li    def __enter__(self):
32*9c5db199SXin Li        log.debug('ErrorCheckerContext Depth: %s' % self.depth)
33*9c5db199SXin Li        if self.depth == 0 or self.always_check:
34*9c5db199SXin Li            errors = self.scpi._WaitAndFetchErrors(
35*9c5db199SXin Li                raise_on_error=False)  # Never raise when clearing old errors
36*9c5db199SXin Li        self.depth += 1
37*9c5db199SXin Li        return self
38*9c5db199SXin Li
39*9c5db199SXin Li    def __exit__(self, type, value, traceback):
40*9c5db199SXin Li        self.depth -= 1
41*9c5db199SXin Li        if self.depth <= 0 or self.always_check:
42*9c5db199SXin Li            self.scpi._WaitAndFetchErrors()
43*9c5db199SXin Li        return
44*9c5db199SXin Li
45*9c5db199SXin Li
46*9c5db199SXin Liclass Scpi(object):
47*9c5db199SXin Li    """Wrapper for SCPI.
48*9c5db199SXin Li
49*9c5db199SXin Li    SCPI = "standard commands for programmable instruments",
50*9c5db199SXin Li    a relative of GPIB.
51*9c5db199SXin Li
52*9c5db199SXin Li    The SCPI driver must export:  Query, Send, Reset and Close
53*9c5db199SXin Li    """
54*9c5db199SXin Li
55*9c5db199SXin Li    def __init__(self, driver, opc_on_stanza=False):
56*9c5db199SXin Li        self.driver = driver
57*9c5db199SXin Li        self.opc_on_stanza = opc_on_stanza
58*9c5db199SXin Li        self.checker_context = _ErrorCheckerContext(self)
59*9c5db199SXin Li
60*9c5db199SXin Li    def Query(self, command):
61*9c5db199SXin Li        """Send the SCPI command and return the response."""
62*9c5db199SXin Li        response = self.driver.Query(command)
63*9c5db199SXin Li        return response
64*9c5db199SXin Li
65*9c5db199SXin Li    def Send(self, command):
66*9c5db199SXin Li        """Send the SCPI command."""
67*9c5db199SXin Li        self.driver.Send(command)
68*9c5db199SXin Li
69*9c5db199SXin Li    def Reset(self):
70*9c5db199SXin Li        """Tell the device to reset with *RST."""
71*9c5db199SXin Li        # Some devices (like the prologix) require special handling for
72*9c5db199SXin Li        # reset.
73*9c5db199SXin Li        self.driver.Reset()
74*9c5db199SXin Li
75*9c5db199SXin Li    def Close(self):
76*9c5db199SXin Li        """Close the device."""
77*9c5db199SXin Li        self.driver.Close()
78*9c5db199SXin Li
79*9c5db199SXin Li    def RetrieveErrors(self):
80*9c5db199SXin Li        """Retrieves all SYSTem:ERRor messages from the device."""
81*9c5db199SXin Li        errors = []
82*9c5db199SXin Li        while True:
83*9c5db199SXin Li            error = self.Query('SYSTem:ERRor?')
84*9c5db199SXin Li            if '+0,"No error"' in error:
85*9c5db199SXin Li                # We've reached the end of the error stack
86*9c5db199SXin Li                break
87*9c5db199SXin Li
88*9c5db199SXin Li            if '-420' in error and 'Query UNTERMINATED' in error:
89*9c5db199SXin Li                # This is benign; the GPIB bridge asked for a response when
90*9c5db199SXin Li                # the device didn't have one to give.
91*9c5db199SXin Li
92*9c5db199SXin Li                # TODO(rochberg): This is a layering violation; we should
93*9c5db199SXin Li                # really only accept -420 if the underlying driver is in a
94*9c5db199SXin Li                # mode that is known to cause this
95*9c5db199SXin Li                continue
96*9c5db199SXin Li
97*9c5db199SXin Li            if '+292' in error and 'Data arrived on unknown SAPI' in error:
98*9c5db199SXin Li                # This may be benign; It is known to occur when we do a switch
99*9c5db199SXin Li                # from GPRS to WCDMA
100*9c5db199SXin Li                continue
101*9c5db199SXin Li
102*9c5db199SXin Li            errors.append(error)
103*9c5db199SXin Li
104*9c5db199SXin Li        self.Send('*CLS')           # Clear status
105*9c5db199SXin Li        errors.reverse()
106*9c5db199SXin Li        return errors
107*9c5db199SXin Li
108*9c5db199SXin Li    def _WaitAndFetchErrors(self, raise_on_error=True):
109*9c5db199SXin Li        """Waits for command completion, returns errors."""
110*9c5db199SXin Li        self.Query('*OPC?')      # Wait for operation complete
111*9c5db199SXin Li        errors = self.RetrieveErrors()
112*9c5db199SXin Li        if errors and raise_on_error:
113*9c5db199SXin Li            raise cellular_system_error.BadScpiCommand('\n'.join(errors))
114*9c5db199SXin Li        return errors
115*9c5db199SXin Li
116*9c5db199SXin Li    def SimpleVerify(self, command, arg):
117*9c5db199SXin Li        """Sends "command arg", then "command?", expecting arg back.
118*9c5db199SXin Li
119*9c5db199SXin Li        Arguments:
120*9c5db199SXin Li          command: SCPI command
121*9c5db199SXin Li          arg: Argument.  We currently check for exact equality: you should
122*9c5db199SXin Li            send strings quoted with " because that's what the 8960 returns.
123*9c5db199SXin Li            We also fail if you send 1 and receive +1 back.
124*9c5db199SXin Li
125*9c5db199SXin Li        Raises:
126*9c5db199SXin Li          Error:  Verification failed
127*9c5db199SXin Li        """
128*9c5db199SXin Li        self.always_check = False
129*9c5db199SXin Li        with self.checker_context:
130*9c5db199SXin Li            self.Send('%s %s' % (command, arg))
131*9c5db199SXin Li            result = self.Query('%s?' % (command,))
132*9c5db199SXin Li            if result != arg:
133*9c5db199SXin Li                raise cellular_system_error.BadScpiCommand(
134*9c5db199SXin Li                    'Error on %s: sent %s, got %s' % (command, arg, result))
135*9c5db199SXin Li
136*9c5db199SXin Li    def SendStanza(self, commands):
137*9c5db199SXin Li        """
138*9c5db199SXin Li        Sends a list of commands and verifies that they complete correctly.
139*9c5db199SXin Li        """
140*9c5db199SXin Li        with self.checker_context:
141*9c5db199SXin Li            for c in commands:
142*9c5db199SXin Li                if self.opc_on_stanza:
143*9c5db199SXin Li                    self.Send(c)
144*9c5db199SXin Li                    self.Query('*OPC?')
145*9c5db199SXin Li                else:
146*9c5db199SXin Li                    self.Send(c)
147