1*9c5db199SXin Li# Lint as: python2, python3 2*9c5db199SXin Li# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 3*9c5db199SXin Li# Use of this source code is governed by a BSD-style license that can be 4*9c5db199SXin Li# found in the LICENSE file. 5*9c5db199SXin Li 6*9c5db199SXin Lifrom autotest_lib.client.cros.cellular import cellular_logging 7*9c5db199SXin Lifrom autotest_lib.client.cros.cellular import cellular_system_error 8*9c5db199SXin Li 9*9c5db199SXin Lilog = cellular_logging.SetupCellularLogging('scpi_driver') 10*9c5db199SXin Li 11*9c5db199SXin Li 12*9c5db199SXin Liclass _ErrorCheckerContext(object): 13*9c5db199SXin Li """Reference-count our error-checking state and only check for 14*9c5db199SXin Li errors when we take the first ref or drop the last ref. 15*9c5db199SXin Li 16*9c5db199SXin Li This way, we can minimize the number of checks; each one takes a 17*9c5db199SXin Li bit of time. You will likely want to set always_check to True when 18*9c5db199SXin Li debugging new SCPI interactions. 19*9c5db199SXin Li 20*9c5db199SXin Li On first entry, we check for errors, but do not stop if we find 21*9c5db199SXin Li them; these are errors that were accumulated on the device before 22*9c5db199SXin Li this test ran. 23*9c5db199SXin Li """ 24*9c5db199SXin Li 25*9c5db199SXin Li def __init__(self, scpi): 26*9c5db199SXin Li self.always_check = True # True for serious debugging 27*9c5db199SXin Li self.scpi = scpi 28*9c5db199SXin Li self.depth = 0 29*9c5db199SXin Li self.raise_on_error = True 30*9c5db199SXin Li 31*9c5db199SXin Li def __enter__(self): 32*9c5db199SXin Li log.debug('ErrorCheckerContext Depth: %s' % self.depth) 33*9c5db199SXin Li if self.depth == 0 or self.always_check: 34*9c5db199SXin Li errors = self.scpi._WaitAndFetchErrors( 35*9c5db199SXin Li raise_on_error=False) # Never raise when clearing old errors 36*9c5db199SXin Li self.depth += 1 37*9c5db199SXin Li return self 38*9c5db199SXin Li 39*9c5db199SXin Li def __exit__(self, type, value, traceback): 40*9c5db199SXin Li self.depth -= 1 41*9c5db199SXin Li if self.depth <= 0 or self.always_check: 42*9c5db199SXin Li self.scpi._WaitAndFetchErrors() 43*9c5db199SXin Li return 44*9c5db199SXin Li 45*9c5db199SXin Li 46*9c5db199SXin Liclass Scpi(object): 47*9c5db199SXin Li """Wrapper for SCPI. 48*9c5db199SXin Li 49*9c5db199SXin Li SCPI = "standard commands for programmable instruments", 50*9c5db199SXin Li a relative of GPIB. 51*9c5db199SXin Li 52*9c5db199SXin Li The SCPI driver must export: Query, Send, Reset and Close 53*9c5db199SXin Li """ 54*9c5db199SXin Li 55*9c5db199SXin Li def __init__(self, driver, opc_on_stanza=False): 56*9c5db199SXin Li self.driver = driver 57*9c5db199SXin Li self.opc_on_stanza = opc_on_stanza 58*9c5db199SXin Li self.checker_context = _ErrorCheckerContext(self) 59*9c5db199SXin Li 60*9c5db199SXin Li def Query(self, command): 61*9c5db199SXin Li """Send the SCPI command and return the response.""" 62*9c5db199SXin Li response = self.driver.Query(command) 63*9c5db199SXin Li return response 64*9c5db199SXin Li 65*9c5db199SXin Li def Send(self, command): 66*9c5db199SXin Li """Send the SCPI command.""" 67*9c5db199SXin Li self.driver.Send(command) 68*9c5db199SXin Li 69*9c5db199SXin Li def Reset(self): 70*9c5db199SXin Li """Tell the device to reset with *RST.""" 71*9c5db199SXin Li # Some devices (like the prologix) require special handling for 72*9c5db199SXin Li # reset. 73*9c5db199SXin Li self.driver.Reset() 74*9c5db199SXin Li 75*9c5db199SXin Li def Close(self): 76*9c5db199SXin Li """Close the device.""" 77*9c5db199SXin Li self.driver.Close() 78*9c5db199SXin Li 79*9c5db199SXin Li def RetrieveErrors(self): 80*9c5db199SXin Li """Retrieves all SYSTem:ERRor messages from the device.""" 81*9c5db199SXin Li errors = [] 82*9c5db199SXin Li while True: 83*9c5db199SXin Li error = self.Query('SYSTem:ERRor?') 84*9c5db199SXin Li if '+0,"No error"' in error: 85*9c5db199SXin Li # We've reached the end of the error stack 86*9c5db199SXin Li break 87*9c5db199SXin Li 88*9c5db199SXin Li if '-420' in error and 'Query UNTERMINATED' in error: 89*9c5db199SXin Li # This is benign; the GPIB bridge asked for a response when 90*9c5db199SXin Li # the device didn't have one to give. 91*9c5db199SXin Li 92*9c5db199SXin Li # TODO(rochberg): This is a layering violation; we should 93*9c5db199SXin Li # really only accept -420 if the underlying driver is in a 94*9c5db199SXin Li # mode that is known to cause this 95*9c5db199SXin Li continue 96*9c5db199SXin Li 97*9c5db199SXin Li if '+292' in error and 'Data arrived on unknown SAPI' in error: 98*9c5db199SXin Li # This may be benign; It is known to occur when we do a switch 99*9c5db199SXin Li # from GPRS to WCDMA 100*9c5db199SXin Li continue 101*9c5db199SXin Li 102*9c5db199SXin Li errors.append(error) 103*9c5db199SXin Li 104*9c5db199SXin Li self.Send('*CLS') # Clear status 105*9c5db199SXin Li errors.reverse() 106*9c5db199SXin Li return errors 107*9c5db199SXin Li 108*9c5db199SXin Li def _WaitAndFetchErrors(self, raise_on_error=True): 109*9c5db199SXin Li """Waits for command completion, returns errors.""" 110*9c5db199SXin Li self.Query('*OPC?') # Wait for operation complete 111*9c5db199SXin Li errors = self.RetrieveErrors() 112*9c5db199SXin Li if errors and raise_on_error: 113*9c5db199SXin Li raise cellular_system_error.BadScpiCommand('\n'.join(errors)) 114*9c5db199SXin Li return errors 115*9c5db199SXin Li 116*9c5db199SXin Li def SimpleVerify(self, command, arg): 117*9c5db199SXin Li """Sends "command arg", then "command?", expecting arg back. 118*9c5db199SXin Li 119*9c5db199SXin Li Arguments: 120*9c5db199SXin Li command: SCPI command 121*9c5db199SXin Li arg: Argument. We currently check for exact equality: you should 122*9c5db199SXin Li send strings quoted with " because that's what the 8960 returns. 123*9c5db199SXin Li We also fail if you send 1 and receive +1 back. 124*9c5db199SXin Li 125*9c5db199SXin Li Raises: 126*9c5db199SXin Li Error: Verification failed 127*9c5db199SXin Li """ 128*9c5db199SXin Li self.always_check = False 129*9c5db199SXin Li with self.checker_context: 130*9c5db199SXin Li self.Send('%s %s' % (command, arg)) 131*9c5db199SXin Li result = self.Query('%s?' % (command,)) 132*9c5db199SXin Li if result != arg: 133*9c5db199SXin Li raise cellular_system_error.BadScpiCommand( 134*9c5db199SXin Li 'Error on %s: sent %s, got %s' % (command, arg, result)) 135*9c5db199SXin Li 136*9c5db199SXin Li def SendStanza(self, commands): 137*9c5db199SXin Li """ 138*9c5db199SXin Li Sends a list of commands and verifies that they complete correctly. 139*9c5db199SXin Li """ 140*9c5db199SXin Li with self.checker_context: 141*9c5db199SXin Li for c in commands: 142*9c5db199SXin Li if self.opc_on_stanza: 143*9c5db199SXin Li self.Send(c) 144*9c5db199SXin Li self.Query('*OPC?') 145*9c5db199SXin Li else: 146*9c5db199SXin Li self.Send(c) 147