1# Lint as: python2, python3
2# Copyright 2019 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""A test to verify the muxable USB port on servo works with a stick."""
7
8import logging
9import time
10
11from autotest_lib.client.common_lib import error
12from autotest_lib.server import test
13
14class servo_USBMuxVerification(test.test):
15    """Test to expose mux to both servo and dut host as well as power off."""
16    version = 1
17
18    # The file names to find the stick's vid, pid, serial.
19    STICK_ID_FILE_NAMES = ('idVendor', 'idProduct', 'serial')
20
21    def _validate_state(self):
22      """Validate the current state of the mux and the visibility of the stick.
23
24      Given the mux direction, and the power state, validates that the stick
25      is showing up on the host it is expected to show up, and not on the
26      host(s) it is not.
27
28      Raises:
29        error.TestFail: if the USB stick is visible when it should not be or if
30        the USB stick is not visible when it should be.
31      """
32      # A small sleep count to ensure everyone is in the right state and the
33      # caches values/sysfs files are all updated.
34      time.sleep(2)
35      host_map = {'dut': self.dut_host,
36                  'servo': self.servo_host}
37      direction = self.servo.get('image_usbkey_direction')
38      pwr = self.servo.get('image_usbkey_pwr')
39      expectation = {}
40      expectation['dut'] = expectation['servo'] = True
41      if pwr == 'off':
42        expectation['dut'] = expectation['servo'] = False
43      else:
44        if direction == 'dut_sees_usbkey':
45          expectation['servo'] = False
46        elif direction == 'servo_sees_usbkey':
47          expectation['dut'] = False
48        else:
49          raise error.TestNA('Unknown servo usbkey direction %s' % direction)
50      for host in expectation:
51        if expectation[host] and not self.is_usb_stick_visible(host_map[host]):
52          raise error.TestFail ('Usb stick not visible on %s side even though '
53                                'it should be.' % host)
54        if not expectation[host] and self.is_usb_stick_visible(host_map[host]):
55          raise error.TestFail ('Usb stick visible on %s side even though it '
56                                'should not be.' % host)
57
58    def is_usb_stick_visible(self, host):
59      """Check whether the stick is visible on |host|.
60
61      On |host| check whether a usb device exists that has the
62      (vid, pid, serial) that was originally read out from the stick.
63
64      Args:
65        host: dut or servo host to inspect
66
67      Returns:
68        True if the stick is visible on |host|, False otherwise
69
70      Raises:
71        error.TestError: if more than one usb device have the vid, pid, serial
72        that was originally identified by the init sequence
73      """
74      usb_dirs = []
75      for value, fname in zip(self.stick_id, self.STICK_ID_FILE_NAMES):
76        fs = host.run('grep -lr %s $(find /sys/bus/usb/devices/*/ -maxdepth 1 '
77                      '-name %s)' % (value, fname),
78                      ignore_status=True).stdout.strip().split()
79        # Remove the file name to have the usb sysfs dir
80        dirnames = ['/'.join(f.split('/')[:-1]) for f in fs]
81        usb_dirs.append(set(dirnames))
82      common_usb_dev = usb_dirs.pop()
83      while usb_dirs:
84        # This will only leave the usb device dirs that share the same
85        # vid, pid, serial. Ideally, that's 1 - the stick, or 0, if the stick
86        # is not visible
87        common_usb_dev = common_usb_dev & usb_dirs.pop()
88      if len(common_usb_dev) > 1:
89        raise error.TestError('More than one usb device detected on host %s '
90                              'with vid:0x%s pid:0x%s serial:%s.'
91                              % ((host.hostname,) + self.stick_id))
92      return len(common_usb_dev) == 1
93
94    def get_usb_stick_id(self):
95      """Helper to retrieve usb stick's vid, pid, and serial.
96
97      Returns:
98        (vid, pid, serial) tuple of the stick
99
100      Raises:
101        error.TestFail: if the usb stick cannot be found or if reading
102        any of the idVendor, idProduct, serial files fails.
103      """
104      # Getting the stick id by pointing it to the servo host.
105      self.servo.set('image_usbkey_direction', 'servo_sees_usbkey')
106      # Get the usb sysfs directory of the stick
107      symbolic_dev_name = self.servo.get('image_usbkey_dev')
108      if not symbolic_dev_name:
109        raise error.TestFail('No usb stick dev file found.')
110      # This will get just the sdx portion of /dev/sdx
111      path_cmd = 'realpath /sys/block/%s' % symbolic_dev_name.split('/')[-1]
112      # This sed command essentially anchors the path on the usb device's
113      # interface's pattern i.e. the bus-port.port...:x.x pattern. Then
114      # it removes anything beyond it and it itself, leaving the usb
115      # device file's sysfs directory that houses the ID files.
116      sed_cmd =  r"sed -nr 's|/[0-9]+\-[1-9.]+\:[0-9.]+/.*$|/|p'"
117      cmd = r'%s | %s' % (path_cmd, sed_cmd)
118      logging.debug('Using cmd: %s over ssh to see the stick\'s usb sysfs '
119                    'device file directory.', cmd)
120      dev_dir = self.servo_host.run(cmd).stdout.strip()
121      # Get vid, pid, serial
122      if not dev_dir:
123        raise error.TestFail('Failed to find the usb stick usb sysfs device '
124                             'directory on the servo host.')
125      id_elems = []
126      for id_file in self.STICK_ID_FILE_NAMES:
127        cmd = 'sudo cat %s%s' % (dev_dir, id_file)
128        try:
129          elem = self.servo_host.run(cmd).stdout.strip()
130          if not elem:
131            raise error.TestFail('Device id file %s not found.' % id_file)
132        except (error.AutoservRunError, error.TestFail) as e:
133          raise error.TestFail('Failed to get %s file for dev %s' % (id_file,
134                                                                     dev_dir))
135        id_elems.append(elem)
136      return tuple(id_elems)
137
138    def initialize(self, host):
139      """Initialize test by extracting usb stick information."""
140      self.dut_host = host
141      self.servo_host = host._servo_host
142      self.servo = host.servo
143      self._original_pwr = self.servo.get('image_usbkey_pwr')
144      self._original_direction = self.servo.get('image_usbkey_direction')
145      self.servo.set('image_usbkey_pwr', 'on')
146      logging.info('Turning the stick to the servo host.')
147      # Stick id is a (vid, pid, serial) tuple for the stick.
148      self.stick_id = self.get_usb_stick_id()
149
150    def run_once(self, host):
151      """Run through the test cases.
152
153      - Facing the servo host
154        - power off
155      - Facing the DUT
156        - power off
157      """
158      self.servo.set('image_usbkey_direction', 'servo_sees_usbkey')
159      # Sleep for 2s to let the device properly enumerate.
160      self._validate_state()
161      logging.info('Turning the power to the stick off.')
162      self.servo.set('image_usbkey_pwr', 'off')
163      self._validate_state()
164      logging.info('Turning the power to the stick on.')
165      self.servo.set('image_usbkey_pwr', 'on')
166      logging.info('Turning the stick to the dut host.')
167      self.servo.set('image_usbkey_direction', 'dut_sees_usbkey')
168      # Sleep for 2s to let the device properly enumerate.
169      self._validate_state()
170      logging.info('Turning the power to the stick off.')
171      self.servo.set('image_usbkey_pwr', 'off')
172      self._validate_state()
173
174    def cleanup(self, host):
175      """Restore usb mux to its original state."""
176      self.servo.set('image_usbkey_pwr', self._original_pwr)
177      self.servo.set('image_usbkey_direction', self._original_direction)
178