xref: /aosp_15_r20/external/autotest/client/cros/liststorage.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1#!/usr/bin/python3
2
3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""This is a module to scan /sys/block/ virtual FS, query udev
8
9It provides a list of all removable or USB devices connected to the machine on
10which the module is running.
11It can be used from command line or from a python script.
12
13To use it as python module it's enough to call the get_all() function.
14@see |get_all| documentation for the output format
15|get_all()| output is human readable (as oppposite to python's data structures)
16"""
17
18from __future__ import absolute_import
19from __future__ import division
20from __future__ import print_function
21
22import logging, os, re
23import six
24
25# this script can be run at command line on DUT (ie /usr/local/autotest
26# contains only the client/ subtree), on a normal autotest
27# installation/repository or as a python module used on a client-side test.
28import common
29
30from autotest_lib.client.common_lib import error
31from autotest_lib.client.common_lib import utils
32from autotest_lib.client.common_lib import seven
33
34
35INFO_PATH = "/sys/block"
36UDEV_CMD_FOR_SERIAL_NUMBER = "udevadm info -a -n %s | grep -iE 'ATTRS{" \
37                             "serial}' | head -n 1"
38LSUSB_CMD = "lsusb -v | grep -iE '^Device Desc|bcdUSB|iSerial'"
39DESC_PATTERN = r'Device Descriptor:'
40BCDUSB_PATTERN = r'bcdUSB\s+(\d+\.\d+)'
41ISERIAL_PATTERN = r'iSerial\s+\d\s(\S*)'
42UDEV_SERIAL_PATTERN = r'=="(.*)"'
43
44
45def read_file(path_to_file, host=None):
46    """Reads the file and returns the file content
47    @param path_to_file: Full path to the file
48    @param host: DUT object
49    @return: Returns the content of file
50    """
51    if host:
52        if not host.path_exists(path_to_file):
53            raise error.TestError("No such file or directory %s" % path_to_file)
54        return host.run('cat %s' % path_to_file).stdout.strip()
55
56    if not os.path.isfile(path_to_file):
57        raise error.TestError("No such file or directory %s" % path_to_file)
58    return utils.read_file(path_to_file).strip()
59
60
61def system_output(command, host=None, ignore_status=False):
62    """Executes command on client
63
64    @param host: DUT object
65    @param command: command to execute
66    @return: output of command
67    """
68    if host:
69        return host.run(command, ignore_status=ignore_status).stdout.strip()
70
71    return utils.system_output(command, ignore_status=ignore_status)
72
73
74def get_udev_info(blockdev, method='udev', host=None):
75    """Get information about |blockdev|
76
77    @param blockdev: a block device, e.g., /dev/sda1 or /dev/sda
78    @param method: either 'udev' (default) or 'blkid'
79    @param host: DUT object
80
81    @return a dictionary with two or more of the followig keys:
82        "ID_BUS", "ID_MODEL": always present
83        "ID_FS_UUID", "ID_FS_TYPE", "ID_FS_LABEL": present only if those info
84         are meaningul and present for the queried device
85    """
86    ret = {}
87    cmd = None
88    ignore_status = False
89
90    if method == "udev":
91        cmd = "udevadm info --name %s --query=property" % blockdev
92    elif method == "blkid":
93        # this script is run as root in a normal autotest run,
94        # so this works: It doesn't have access to the necessary info
95        # when run as a non-privileged user
96        cmd = "blkid -c /dev/null -o udev %s" % blockdev
97        ignore_status = True
98
99    if cmd:
100        output = system_output(cmd, host, ignore_status=ignore_status)
101
102        udev_keys = ("ID_BUS", "ID_MODEL", "ID_FS_UUID", "ID_FS_TYPE",
103                     "ID_FS_LABEL")
104        for line in output.splitlines():
105            udev_key, udev_val = line.split('=')
106
107            if udev_key in udev_keys:
108                ret[udev_key] = udev_val
109
110    return ret
111
112
113def get_lsusb_info(host=None):
114    """Get lsusb info in list format
115
116    @param host: DUT object
117    @return: Returns lsusb output in list format
118    """
119
120    usb_info_list = []
121    # Getting the USB type and Serial number info using 'lsusb -v'. Sample
122    # output is shown in below
123    # Device Descriptor:
124    #      bcdUSB               2.00
125    #      iSerial                 3 131BC7
126    #      bcdUSB               2.00
127    # Device Descriptor:
128    #      bcdUSB               2.10
129    #      iSerial                 3 001A4D5E8634B03169273995
130
131    lsusb_output = system_output(LSUSB_CMD, host)
132    # we are parsing each line and getting the usb info
133    for line in lsusb_output.splitlines():
134        desc_matched = re.search(DESC_PATTERN, line)
135        bcdusb_matched = re.search(BCDUSB_PATTERN, line)
136        iserial_matched = re.search(ISERIAL_PATTERN, line)
137        if desc_matched:
138            usb_info = {}
139        elif bcdusb_matched:
140            # bcdUSB may appear multiple time. Drop the remaining.
141            usb_info['bcdUSB'] = bcdusb_matched.group(1)
142        elif iserial_matched:
143            usb_info['iSerial'] = iserial_matched.group(1)
144            usb_info_list.append(usb_info)
145    logging.debug('lsusb output is %s', usb_info_list)
146    return usb_info_list
147
148
149def get_usbdevice_type_and_serial(device, lsusb_info, host=None):
150    """Get USB device type and Serial number
151
152    @param device: USB device mount point Example: /dev/sda or /dev/sdb
153    @param lsusb_info: lsusb info
154    @param host: DUT object
155    @return: Returns the information about USB type and the serial number
156            of the device
157    """
158
159    # Comparing the lsusb serial number with udev output serial number
160    # Both serial numbers should be same. Sample udev command output is
161    # shown in below.
162    # ATTRS{serial}=="001A4D5E8634B03169273995"
163    udev_serial_output = system_output(UDEV_CMD_FOR_SERIAL_NUMBER % device,
164                                       host)
165    udev_serial_matched = re.search(UDEV_SERIAL_PATTERN, udev_serial_output)
166    if udev_serial_matched:
167        udev_serial = udev_serial_matched.group(1)
168        logging.debug("udev serial number is %s", udev_serial)
169        for usb_details in lsusb_info:
170            if usb_details['iSerial'] == udev_serial:
171                return usb_details.get('bcdUSB'), udev_serial
172    return None, None
173
174def get_partition_info(part_path, bus, model, partid=None, fstype=None,
175                       label=None, block_size=0, is_removable=False,
176                       lsusb_info=[], host=None):
177    """Return information about a device as a list of dictionaries
178
179    Normally a single device described by the passed parameters will match a
180    single device on the system, and thus a single element list as return
181    value; although it's possible that a single block device is associated with
182    several mountpoints, this scenario will lead to a dictionary for each
183    mountpoint.
184
185    @param part_path: full partition path under |INFO_PATH|
186                      e.g., /sys/block/sda or /sys/block/sda/sda1
187    @param bus: bus, e.g., 'usb' or 'ata', according to udev
188    @param model: device moduel, e.g., according to udev
189    @param partid: partition id, if present
190    @param fstype: filesystem type, if present
191    @param label: filesystem label, if present
192    @param block_size: filesystem block size
193    @param is_removable: whether it is a removable device
194    @param host: DUT object
195    @param lsusb_info: lsusb info
196
197    @return a list of dictionaries contaning each a partition info.
198            An empty list can be returned if no matching device is found
199    """
200    ret = []
201    # take the partitioned device name from the /sys/block/ path name
202    part = part_path.split('/')[-1]
203    device = "/dev/%s" % part
204
205    if not partid:
206        info = get_udev_info(device, "blkid", host=host)
207        partid = info.get('ID_FS_UUID', None)
208        if not fstype:
209            fstype = info.get('ID_FS_TYPE', None)
210        if not label:
211            label = partid
212
213    readonly = read_file("%s/ro" % part_path, host)
214    if not int(readonly):
215        partition_blocks = read_file("%s/size" % part_path, host)
216        size = block_size * int(partition_blocks)
217
218        stub = {}
219        stub['device'] = device
220        stub['bus'] = bus
221        stub['model'] = model
222        stub['size'] = size
223
224        # look for it among the mounted devices first
225        mounts = read_file("/proc/mounts", host).splitlines()
226        seen = False
227        for line in mounts:
228            dev, mount, proc_fstype, flags = line.split(' ', 3)
229
230            if device == dev:
231                if 'rw' in flags.split(','):
232                    seen = True # at least one match occurred
233
234                    # Sorround mountpoint with quotes, to make it parsable in
235                    # case of spaces. Also information retrieved from
236                    # /proc/mount override the udev passed ones (e.g.,
237                    # proc_fstype instead of fstype)
238                    dev = stub.copy()
239                    dev['fs_uuid'] = partid
240                    dev['fstype'] = proc_fstype
241                    dev['is_mounted'] = True
242                    # When USB device is mounted automatically after login a
243                    # non-labelled drive is mounted to:
244                    # '/media/removable/USB Drive'
245                    # Here an octal unicode '\040' is added to the path
246                    # replacing ' ' (space).
247                    # Following '.decode('unicode-escape')' handles the same
248                    dev['mountpoint'] = seven.ensure_text(mount, 'unicode-escape')
249                    dev['is_removable'] = is_removable
250                    dev['usb_type'], dev['serial'] = \
251                            get_usbdevice_type_and_serial(dev['device'],
252                                                          lsusb_info=lsusb_info,
253                                                          host=host)
254                    ret.append(dev)
255
256        # If not among mounted devices, it's just attached, print about the
257        # same information but suggest a place where the user can mount the
258        # device instead
259        if not seen:
260            # we consider it if it's removable and and a partition id
261            # OR it's on the USB bus or ATA bus.
262            # Some USB HD do not get announced as removable, but they should be
263            # showed.
264            # There are good changes that if it's on a USB bus it's removable
265            # and thus interesting for us, independently whether it's declared
266            # removable
267            if (is_removable and partid) or bus in ['usb', 'ata']:
268                if not label:
269                    info = get_udev_info(device, 'blkid', host=host)
270                    label = info.get('ID_FS_LABEL', partid)
271
272                dev = stub.copy()
273                dev['fs_uuid'] = partid
274                dev['fstype'] = fstype
275                dev['is_mounted'] = False
276                dev['mountpoint'] = "/media/removable/%s" % label
277                dev['is_removable'] = is_removable
278                dev['usb_type'], dev['serial'] = \
279                        get_usbdevice_type_and_serial(dev['device'],
280                                                      lsusb_info=lsusb_info,
281                                                      host=host)
282                ret.append(dev)
283        return ret
284
285
286def get_device_info(blockdev, lsusb_info, host=None):
287    """Retrieve information about |blockdev|
288
289    @see |get_partition_info()| doc for the dictionary format
290
291    @param blockdev: a block device name, e.g., "sda".
292    @param host: DUT object
293    @param lsusb_info: lsusb info
294    @return a list of dictionary, with each item representing a found device
295    """
296    ret = []
297
298    spath = "%s/%s" % (INFO_PATH, blockdev)
299    block_size = int(read_file("%s/queue/physical_block_size" % spath,
300                                   host))
301    is_removable = bool(int(read_file("%s/removable" % spath, host)))
302
303    info = get_udev_info(blockdev, "udev", host=host)
304    dev_bus = info['ID_BUS']
305    dev_model = info['ID_MODEL']
306    dev_fs = info.get('ID_FS_TYPE', None)
307    dev_uuid = info.get('ID_FS_UUID', None)
308    dev_label = info.get('ID_FS_LABEL', dev_uuid)
309
310    has_partitions = False
311    for basename in system_output('ls %s' % spath, host).splitlines():
312        partition_path = "%s/%s" % (spath, basename)
313        # we want to check if within |spath| there are subdevices with
314        # partitions
315        # e.g., if within /sys/block/sda sda1 and other partition are present
316        if not re.match("%s[0-9]+" % blockdev, basename):
317            continue # ignore what is not a subdevice
318
319        # |blockdev| has subdevices: get info for them
320        has_partitions = True
321        devs = get_partition_info(partition_path, dev_bus, dev_model,
322                                  block_size=block_size,
323                                  is_removable=is_removable,
324                                  lsusb_info=lsusb_info, host=host)
325        ret.extend(devs)
326
327    if not has_partitions:
328        devs = get_partition_info(spath, dev_bus, dev_model, dev_uuid, dev_fs,
329                                  dev_label, block_size=block_size,
330                                  is_removable=is_removable,
331                                  lsusb_info=lsusb_info, host=host)
332        ret.extend(devs)
333
334    return ret
335
336
337def get_all(host=None):
338    """Return all removable or USB storage devices attached
339
340    @param host: DUT object
341    @return a list of dictionaries, each list element describing a device
342    """
343    ret = []
344    lsusb_info = get_lsusb_info(host)
345    for dev in system_output('ls %s' % INFO_PATH, host).splitlines():
346        # Among block devices we need to filter out what are virtual
347        if re.match("s[a-z]+", dev):
348            # for each of them try to obtain some info
349            ret.extend(get_device_info(dev, lsusb_info, host=host))
350    return ret
351
352
353def main():
354    for device in get_all():
355        print("%(device)s %(bus)s %(model)s %(size)d %(fs_uuid)s %(fstype)s "
356              "%(is_mounted)d %(mountpoint)s %(usb_type)s %(serial)s" %
357              device)
358
359
360if __name__ == "__main__":
361    main()
362