1#!/usr/bin/python3 2 3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""This is a module to scan /sys/block/ virtual FS, query udev 8 9It provides a list of all removable or USB devices connected to the machine on 10which the module is running. 11It can be used from command line or from a python script. 12 13To use it as python module it's enough to call the get_all() function. 14@see |get_all| documentation for the output format 15|get_all()| output is human readable (as oppposite to python's data structures) 16""" 17 18from __future__ import absolute_import 19from __future__ import division 20from __future__ import print_function 21 22import logging, os, re 23import six 24 25# this script can be run at command line on DUT (ie /usr/local/autotest 26# contains only the client/ subtree), on a normal autotest 27# installation/repository or as a python module used on a client-side test. 28import common 29 30from autotest_lib.client.common_lib import error 31from autotest_lib.client.common_lib import utils 32from autotest_lib.client.common_lib import seven 33 34 35INFO_PATH = "/sys/block" 36UDEV_CMD_FOR_SERIAL_NUMBER = "udevadm info -a -n %s | grep -iE 'ATTRS{" \ 37 "serial}' | head -n 1" 38LSUSB_CMD = "lsusb -v | grep -iE '^Device Desc|bcdUSB|iSerial'" 39DESC_PATTERN = r'Device Descriptor:' 40BCDUSB_PATTERN = r'bcdUSB\s+(\d+\.\d+)' 41ISERIAL_PATTERN = r'iSerial\s+\d\s(\S*)' 42UDEV_SERIAL_PATTERN = r'=="(.*)"' 43 44 45def read_file(path_to_file, host=None): 46 """Reads the file and returns the file content 47 @param path_to_file: Full path to the file 48 @param host: DUT object 49 @return: Returns the content of file 50 """ 51 if host: 52 if not host.path_exists(path_to_file): 53 raise error.TestError("No such file or directory %s" % path_to_file) 54 return host.run('cat %s' % path_to_file).stdout.strip() 55 56 if not os.path.isfile(path_to_file): 57 raise error.TestError("No such file or directory %s" % path_to_file) 58 return utils.read_file(path_to_file).strip() 59 60 61def system_output(command, host=None, ignore_status=False): 62 """Executes command on client 63 64 @param host: DUT object 65 @param command: command to execute 66 @return: output of command 67 """ 68 if host: 69 return host.run(command, ignore_status=ignore_status).stdout.strip() 70 71 return utils.system_output(command, ignore_status=ignore_status) 72 73 74def get_udev_info(blockdev, method='udev', host=None): 75 """Get information about |blockdev| 76 77 @param blockdev: a block device, e.g., /dev/sda1 or /dev/sda 78 @param method: either 'udev' (default) or 'blkid' 79 @param host: DUT object 80 81 @return a dictionary with two or more of the followig keys: 82 "ID_BUS", "ID_MODEL": always present 83 "ID_FS_UUID", "ID_FS_TYPE", "ID_FS_LABEL": present only if those info 84 are meaningul and present for the queried device 85 """ 86 ret = {} 87 cmd = None 88 ignore_status = False 89 90 if method == "udev": 91 cmd = "udevadm info --name %s --query=property" % blockdev 92 elif method == "blkid": 93 # this script is run as root in a normal autotest run, 94 # so this works: It doesn't have access to the necessary info 95 # when run as a non-privileged user 96 cmd = "blkid -c /dev/null -o udev %s" % blockdev 97 ignore_status = True 98 99 if cmd: 100 output = system_output(cmd, host, ignore_status=ignore_status) 101 102 udev_keys = ("ID_BUS", "ID_MODEL", "ID_FS_UUID", "ID_FS_TYPE", 103 "ID_FS_LABEL") 104 for line in output.splitlines(): 105 udev_key, udev_val = line.split('=') 106 107 if udev_key in udev_keys: 108 ret[udev_key] = udev_val 109 110 return ret 111 112 113def get_lsusb_info(host=None): 114 """Get lsusb info in list format 115 116 @param host: DUT object 117 @return: Returns lsusb output in list format 118 """ 119 120 usb_info_list = [] 121 # Getting the USB type and Serial number info using 'lsusb -v'. Sample 122 # output is shown in below 123 # Device Descriptor: 124 # bcdUSB 2.00 125 # iSerial 3 131BC7 126 # bcdUSB 2.00 127 # Device Descriptor: 128 # bcdUSB 2.10 129 # iSerial 3 001A4D5E8634B03169273995 130 131 lsusb_output = system_output(LSUSB_CMD, host) 132 # we are parsing each line and getting the usb info 133 for line in lsusb_output.splitlines(): 134 desc_matched = re.search(DESC_PATTERN, line) 135 bcdusb_matched = re.search(BCDUSB_PATTERN, line) 136 iserial_matched = re.search(ISERIAL_PATTERN, line) 137 if desc_matched: 138 usb_info = {} 139 elif bcdusb_matched: 140 # bcdUSB may appear multiple time. Drop the remaining. 141 usb_info['bcdUSB'] = bcdusb_matched.group(1) 142 elif iserial_matched: 143 usb_info['iSerial'] = iserial_matched.group(1) 144 usb_info_list.append(usb_info) 145 logging.debug('lsusb output is %s', usb_info_list) 146 return usb_info_list 147 148 149def get_usbdevice_type_and_serial(device, lsusb_info, host=None): 150 """Get USB device type and Serial number 151 152 @param device: USB device mount point Example: /dev/sda or /dev/sdb 153 @param lsusb_info: lsusb info 154 @param host: DUT object 155 @return: Returns the information about USB type and the serial number 156 of the device 157 """ 158 159 # Comparing the lsusb serial number with udev output serial number 160 # Both serial numbers should be same. Sample udev command output is 161 # shown in below. 162 # ATTRS{serial}=="001A4D5E8634B03169273995" 163 udev_serial_output = system_output(UDEV_CMD_FOR_SERIAL_NUMBER % device, 164 host) 165 udev_serial_matched = re.search(UDEV_SERIAL_PATTERN, udev_serial_output) 166 if udev_serial_matched: 167 udev_serial = udev_serial_matched.group(1) 168 logging.debug("udev serial number is %s", udev_serial) 169 for usb_details in lsusb_info: 170 if usb_details['iSerial'] == udev_serial: 171 return usb_details.get('bcdUSB'), udev_serial 172 return None, None 173 174def get_partition_info(part_path, bus, model, partid=None, fstype=None, 175 label=None, block_size=0, is_removable=False, 176 lsusb_info=[], host=None): 177 """Return information about a device as a list of dictionaries 178 179 Normally a single device described by the passed parameters will match a 180 single device on the system, and thus a single element list as return 181 value; although it's possible that a single block device is associated with 182 several mountpoints, this scenario will lead to a dictionary for each 183 mountpoint. 184 185 @param part_path: full partition path under |INFO_PATH| 186 e.g., /sys/block/sda or /sys/block/sda/sda1 187 @param bus: bus, e.g., 'usb' or 'ata', according to udev 188 @param model: device moduel, e.g., according to udev 189 @param partid: partition id, if present 190 @param fstype: filesystem type, if present 191 @param label: filesystem label, if present 192 @param block_size: filesystem block size 193 @param is_removable: whether it is a removable device 194 @param host: DUT object 195 @param lsusb_info: lsusb info 196 197 @return a list of dictionaries contaning each a partition info. 198 An empty list can be returned if no matching device is found 199 """ 200 ret = [] 201 # take the partitioned device name from the /sys/block/ path name 202 part = part_path.split('/')[-1] 203 device = "/dev/%s" % part 204 205 if not partid: 206 info = get_udev_info(device, "blkid", host=host) 207 partid = info.get('ID_FS_UUID', None) 208 if not fstype: 209 fstype = info.get('ID_FS_TYPE', None) 210 if not label: 211 label = partid 212 213 readonly = read_file("%s/ro" % part_path, host) 214 if not int(readonly): 215 partition_blocks = read_file("%s/size" % part_path, host) 216 size = block_size * int(partition_blocks) 217 218 stub = {} 219 stub['device'] = device 220 stub['bus'] = bus 221 stub['model'] = model 222 stub['size'] = size 223 224 # look for it among the mounted devices first 225 mounts = read_file("/proc/mounts", host).splitlines() 226 seen = False 227 for line in mounts: 228 dev, mount, proc_fstype, flags = line.split(' ', 3) 229 230 if device == dev: 231 if 'rw' in flags.split(','): 232 seen = True # at least one match occurred 233 234 # Sorround mountpoint with quotes, to make it parsable in 235 # case of spaces. Also information retrieved from 236 # /proc/mount override the udev passed ones (e.g., 237 # proc_fstype instead of fstype) 238 dev = stub.copy() 239 dev['fs_uuid'] = partid 240 dev['fstype'] = proc_fstype 241 dev['is_mounted'] = True 242 # When USB device is mounted automatically after login a 243 # non-labelled drive is mounted to: 244 # '/media/removable/USB Drive' 245 # Here an octal unicode '\040' is added to the path 246 # replacing ' ' (space). 247 # Following '.decode('unicode-escape')' handles the same 248 dev['mountpoint'] = seven.ensure_text(mount, 'unicode-escape') 249 dev['is_removable'] = is_removable 250 dev['usb_type'], dev['serial'] = \ 251 get_usbdevice_type_and_serial(dev['device'], 252 lsusb_info=lsusb_info, 253 host=host) 254 ret.append(dev) 255 256 # If not among mounted devices, it's just attached, print about the 257 # same information but suggest a place where the user can mount the 258 # device instead 259 if not seen: 260 # we consider it if it's removable and and a partition id 261 # OR it's on the USB bus or ATA bus. 262 # Some USB HD do not get announced as removable, but they should be 263 # showed. 264 # There are good changes that if it's on a USB bus it's removable 265 # and thus interesting for us, independently whether it's declared 266 # removable 267 if (is_removable and partid) or bus in ['usb', 'ata']: 268 if not label: 269 info = get_udev_info(device, 'blkid', host=host) 270 label = info.get('ID_FS_LABEL', partid) 271 272 dev = stub.copy() 273 dev['fs_uuid'] = partid 274 dev['fstype'] = fstype 275 dev['is_mounted'] = False 276 dev['mountpoint'] = "/media/removable/%s" % label 277 dev['is_removable'] = is_removable 278 dev['usb_type'], dev['serial'] = \ 279 get_usbdevice_type_and_serial(dev['device'], 280 lsusb_info=lsusb_info, 281 host=host) 282 ret.append(dev) 283 return ret 284 285 286def get_device_info(blockdev, lsusb_info, host=None): 287 """Retrieve information about |blockdev| 288 289 @see |get_partition_info()| doc for the dictionary format 290 291 @param blockdev: a block device name, e.g., "sda". 292 @param host: DUT object 293 @param lsusb_info: lsusb info 294 @return a list of dictionary, with each item representing a found device 295 """ 296 ret = [] 297 298 spath = "%s/%s" % (INFO_PATH, blockdev) 299 block_size = int(read_file("%s/queue/physical_block_size" % spath, 300 host)) 301 is_removable = bool(int(read_file("%s/removable" % spath, host))) 302 303 info = get_udev_info(blockdev, "udev", host=host) 304 dev_bus = info['ID_BUS'] 305 dev_model = info['ID_MODEL'] 306 dev_fs = info.get('ID_FS_TYPE', None) 307 dev_uuid = info.get('ID_FS_UUID', None) 308 dev_label = info.get('ID_FS_LABEL', dev_uuid) 309 310 has_partitions = False 311 for basename in system_output('ls %s' % spath, host).splitlines(): 312 partition_path = "%s/%s" % (spath, basename) 313 # we want to check if within |spath| there are subdevices with 314 # partitions 315 # e.g., if within /sys/block/sda sda1 and other partition are present 316 if not re.match("%s[0-9]+" % blockdev, basename): 317 continue # ignore what is not a subdevice 318 319 # |blockdev| has subdevices: get info for them 320 has_partitions = True 321 devs = get_partition_info(partition_path, dev_bus, dev_model, 322 block_size=block_size, 323 is_removable=is_removable, 324 lsusb_info=lsusb_info, host=host) 325 ret.extend(devs) 326 327 if not has_partitions: 328 devs = get_partition_info(spath, dev_bus, dev_model, dev_uuid, dev_fs, 329 dev_label, block_size=block_size, 330 is_removable=is_removable, 331 lsusb_info=lsusb_info, host=host) 332 ret.extend(devs) 333 334 return ret 335 336 337def get_all(host=None): 338 """Return all removable or USB storage devices attached 339 340 @param host: DUT object 341 @return a list of dictionaries, each list element describing a device 342 """ 343 ret = [] 344 lsusb_info = get_lsusb_info(host) 345 for dev in system_output('ls %s' % INFO_PATH, host).splitlines(): 346 # Among block devices we need to filter out what are virtual 347 if re.match("s[a-z]+", dev): 348 # for each of them try to obtain some info 349 ret.extend(get_device_info(dev, lsusb_info, host=host)) 350 return ret 351 352 353def main(): 354 for device in get_all(): 355 print("%(device)s %(bus)s %(model)s %(size)d %(fs_uuid)s %(fstype)s " 356 "%(is_mounted)d %(mountpoint)s %(usb_type)s %(serial)s" % 357 device) 358 359 360if __name__ == "__main__": 361 main() 362