xref: /aosp_15_r20/external/toolchain-utils/image_chromeos.py (revision 760c253c1ed00ce9abd48f8546f08516e57485fe)
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright 2019 The ChromiumOS Authors
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8"""Script to image a ChromeOS device.
9
10This script images a remote ChromeOS device with a specific image."
11"""
12
13
14__author__ = "[email protected] (Ahmad Sharif)"
15
16import argparse
17import filecmp
18import glob
19import os
20import re
21import shutil
22import sys
23import tempfile
24import time
25
26from cros_utils import command_executer
27from cros_utils import locks
28from cros_utils import logger
29from cros_utils import misc
30from cros_utils.file_utils import FileUtils
31
32
33checksum_file = "/usr/local/osimage_checksum_file"
34lock_file = "/tmp/image_chromeos_lock/image_chromeos_lock"
35
36
37def Usage(parser, message):
38    print("ERROR: %s" % message)
39    parser.print_help()
40    sys.exit(0)
41
42
43def CheckForCrosFlash(chromeos_root, remote, log_level):
44    cmd_executer = command_executer.GetCommandExecuter(log_level=log_level)
45
46    # Check to see if remote machine has cherrypy, ctypes
47    command = "python -c 'import cherrypy, ctypes'"
48    ret = cmd_executer.CrosRunCommand(
49        command, chromeos_root=chromeos_root, machine=remote
50    )
51    logger.GetLogger().LogFatalIf(
52        ret == 255, f"Failed ssh to {remote} (for checking cherrypy)"
53    )
54    logger.GetLogger().LogFatalIf(
55        ret != 0,
56        f"Failed to find cherrypy or ctypes on '{remote}', "
57        "cros flash cannot work.",
58    )
59
60
61def DisableCrosBeeps(chromeos_root, remote, log_level):
62    """Disable annoying chromebooks beeps after reboots."""
63    cmd_executer = command_executer.GetCommandExecuter(log_level=log_level)
64
65    command = "/usr/bin/futility gbb --set --flash --flags=0x1"
66    logger.GetLogger().LogOutput("Trying to disable beeping.")
67
68    ret, o, _ = cmd_executer.CrosRunCommandWOutput(
69        command, chromeos_root=chromeos_root, machine=remote
70    )
71    if ret != 0:
72        logger.GetLogger().LogOutput(o)
73        logger.GetLogger().LogOutput("Failed to disable beeps.")
74
75
76def FindChromeOSImage(image_file, chromeos_root):
77    """Find path for ChromeOS image inside chroot.
78
79    This function could be called with image paths that are either inside
80    or outside the chroot.  In either case the path needs to be translated
81    to an real/absolute path inside the chroot.
82    Example input paths:
83    /usr/local/google/home/uname/chromeos/out/tmp/my-test-images/image
84    ~/chromiumos/src/build/images/board/latest/image
85    /tmp/peppy-release/R67-1235.0.0/image
86
87    Corresponding example output paths:
88    /tmp/my-test-images/image
89    /mnt/host/source/src/build/images/board/latest/image
90    /tmp/peppy-release/R67-1235.0,0/image
91    """
92
93    sys.path.insert(0, chromeos_root)
94
95    from chromite.lib import path_util
96
97    return path_util.ToChrootPath(image_file, source_path=chromeos_root)
98
99
100def DoImage(argv):
101    """Image ChromeOS."""
102
103    parser = argparse.ArgumentParser()
104    parser.add_argument(
105        "-c",
106        "--chromeos_root",
107        dest="chromeos_root",
108        help="Target directory for ChromeOS installation.",
109    )
110    parser.add_argument("-r", "--remote", dest="remote", help="Target device.")
111    parser.add_argument(
112        "-i", "--image", dest="image", help="Image binary file."
113    )
114    parser.add_argument(
115        "-b", "--board", dest="board", help="Target board override."
116    )
117    parser.add_argument(
118        "-f",
119        "--force",
120        dest="force",
121        action="store_true",
122        default=False,
123        help="Force an image even if it is non-test.",
124    )
125    parser.add_argument(
126        "-n",
127        "--no_lock",
128        dest="no_lock",
129        default=False,
130        action="store_true",
131        help="Do not attempt to lock remote before imaging.  "
132        "This option should only be used in cases where the "
133        "exclusive lock has already been acquired (e.g. in "
134        "a script that calls this one).",
135    )
136    parser.add_argument(
137        "-l",
138        "--logging_level",
139        dest="log_level",
140        default="verbose",
141        help="Amount of logging to be used. Valid levels are "
142        "'quiet', 'average', and 'verbose'.",
143    )
144    parser.add_argument("-a", "--image_args", dest="image_args")
145    parser.add_argument(
146        "--keep_stateful",
147        dest="keep_stateful",
148        default=False,
149        action="store_true",
150        help="Do not clobber the stateful partition.",
151    )
152
153    options = parser.parse_args(argv[1:])
154
155    if not options.log_level in command_executer.LOG_LEVEL:
156        Usage(parser, "--logging_level must be 'quiet', 'average' or 'verbose'")
157    else:
158        log_level = options.log_level
159
160    # Common initializations
161    cmd_executer = command_executer.GetCommandExecuter(log_level=log_level)
162    l = logger.GetLogger()
163
164    if options.chromeos_root is None:
165        Usage(parser, "--chromeos_root must be set")
166
167    if options.remote is None:
168        Usage(parser, "--remote must be set")
169
170    options.chromeos_root = os.path.expanduser(options.chromeos_root)
171
172    if options.board is None:
173        board = cmd_executer.CrosLearnBoard(
174            options.chromeos_root, options.remote
175        )
176    else:
177        board = options.board
178
179    if options.image is None:
180        images_dir = misc.GetImageDir(options.chromeos_root, board)
181        image = os.path.join(images_dir, "latest", "chromiumos_test_image.bin")
182        if not os.path.exists(image):
183            image = os.path.join(images_dir, "latest", "chromiumos_image.bin")
184        is_xbuddy_image = False
185    else:
186        image = options.image
187        is_xbuddy_image = image.startswith("xbuddy://")
188        if not is_xbuddy_image:
189            image = os.path.expanduser(image)
190
191    if not is_xbuddy_image:
192        image = os.path.realpath(image)
193
194    if not os.path.exists(image) and not is_xbuddy_image:
195        Usage(parser, "Image file: " + image + " does not exist!")
196
197    try:
198        should_unlock = False
199        if not options.no_lock:
200            try:
201                _ = locks.AcquireLock(
202                    list(options.remote.split()), options.chromeos_root
203                )
204                should_unlock = True
205            except Exception as e:
206                raise RuntimeError("Error acquiring machine: %s" % str(e))
207
208        reimage = False
209        local_image = False
210        if not is_xbuddy_image:
211            local_image = True
212            image_checksum = FileUtils().Md5File(image, log_level=log_level)
213
214            command = "cat " + checksum_file
215            ret, device_checksum, _ = cmd_executer.CrosRunCommandWOutput(
216                command,
217                chromeos_root=options.chromeos_root,
218                machine=options.remote,
219            )
220
221            device_checksum = device_checksum.strip()
222            image_checksum = str(image_checksum)
223
224            l.LogOutput("Image checksum: " + image_checksum)
225            l.LogOutput("Device checksum: " + device_checksum)
226
227            if image_checksum != device_checksum:
228                [found, located_image] = LocateOrCopyImage(
229                    options.chromeos_root, image, board=board
230                )
231
232                reimage = True
233                l.LogOutput("Checksums do not match. Re-imaging...")
234
235                chroot_image = FindChromeOSImage(
236                    located_image, options.chromeos_root
237                )
238
239                is_test_image = IsImageModdedForTest(
240                    options.chromeos_root, chroot_image, log_level
241                )
242
243                if not is_test_image and not options.force:
244                    logger.GetLogger().LogFatal(
245                        "Have to pass --force to image a " "non-test image!"
246                    )
247        else:
248            reimage = True
249            found = True
250            l.LogOutput("Using non-local image; Re-imaging...")
251
252        if reimage:
253            # If the device has /tmp mounted as noexec, image_to_live.sh can fail.
254            command = "mount -o remount,rw,exec /tmp"
255            cmd_executer.CrosRunCommand(
256                command,
257                chromeos_root=options.chromeos_root,
258                machine=options.remote,
259            )
260
261            # Check to see if cros flash will work for the remote machine.
262            CheckForCrosFlash(options.chromeos_root, options.remote, log_level)
263
264            # Disable the annoying chromebook beeps after reboot.
265            DisableCrosBeeps(options.chromeos_root, options.remote, log_level)
266
267            cros_flash_args = [
268                "cros",
269                "flash",
270                "--board=%s" % board,
271            ]
272            if not options.keep_stateful:
273                cros_flash_args.append("--clobber-stateful")
274            # New arguments should be added here.
275
276            # The last two arguments are positional and have to be at the end.
277            cros_flash_args.append(options.remote)
278            if local_image:
279                cros_flash_args.append(chroot_image)
280            else:
281                cros_flash_args.append(image)
282
283            command = " ".join(cros_flash_args)
284
285            # Workaround for crosbug.com/35684.
286            os.chmod(misc.GetChromeOSKeyFile(options.chromeos_root), 0o600)
287
288            if log_level == "average":
289                cmd_executer.SetLogLevel("verbose")
290            retries = 0
291            while True:
292                if log_level == "quiet":
293                    l.LogOutput("CMD : %s" % command)
294                ret = cmd_executer.ChrootRunCommand(
295                    options.chromeos_root, command, command_timeout=1800
296                )
297                if ret == 0 or retries >= 2:
298                    break
299                retries += 1
300                if log_level == "quiet":
301                    l.LogOutput("Imaging failed. Retry # %d." % retries)
302
303            if log_level == "average":
304                cmd_executer.SetLogLevel(log_level)
305
306            logger.GetLogger().LogFatalIf(ret, "Image command failed")
307
308            # Unfortunately cros_image_to_target.py sometimes returns early when the
309            # machine isn't fully up yet.
310            ret = EnsureMachineUp(
311                options.chromeos_root, options.remote, log_level
312            )
313
314            # If this is a non-local image, then the ret returned from
315            # EnsureMachineUp is the one that will be returned by this function;
316            # in that case, make sure the value in 'ret' is appropriate.
317            if not local_image and ret:
318                ret = 0
319            else:
320                ret = 1
321
322            if local_image:
323                if log_level == "average":
324                    l.LogOutput("Verifying image.")
325                command = "echo %s > %s && chmod -w %s" % (
326                    image_checksum,
327                    checksum_file,
328                    checksum_file,
329                )
330                ret = cmd_executer.CrosRunCommand(
331                    command,
332                    chromeos_root=options.chromeos_root,
333                    machine=options.remote,
334                )
335                logger.GetLogger().LogFatalIf(ret, "Writing checksum failed.")
336
337                successfully_imaged = VerifyChromeChecksum(
338                    options.chromeos_root,
339                    chroot_image,
340                    options.remote,
341                    log_level,
342                )
343                logger.GetLogger().LogFatalIf(
344                    not successfully_imaged, "Image verification failed!"
345                )
346                TryRemountPartitionAsRW(
347                    options.chromeos_root, options.remote, log_level
348                )
349
350            if not found:
351                temp_dir = os.path.dirname(located_image)
352                l.LogOutput("Deleting temp image dir: %s" % temp_dir)
353                shutil.rmtree(temp_dir)
354            l.LogOutput("Image updated.")
355        else:
356            l.LogOutput("Checksums match, skip image update and reboot.")
357            command = "reboot && exit"
358            _ = cmd_executer.CrosRunCommand(
359                command,
360                chromeos_root=options.chromeos_root,
361                machine=options.remote,
362            )
363            # Wait 30s after reboot.
364            time.sleep(30)
365
366    finally:
367        if should_unlock:
368            locks.ReleaseLock(
369                list(options.remote.split()), options.chromeos_root
370            )
371
372    return ret
373
374
375def LocateOrCopyImage(chromeos_root, image, board=None):
376    l = logger.GetLogger()
377    if board is None:
378        board_glob = "*"
379    else:
380        board_glob = board
381
382    chromeos_root_realpath = os.path.realpath(chromeos_root)
383    image = os.path.realpath(image)
384
385    if image.startswith("%s/" % chromeos_root_realpath):
386        return [True, image]
387
388    # First search within the existing build dirs for any matching files.
389    images_glob = "%s/src/build/images/%s/*/*.bin" % (
390        chromeos_root_realpath,
391        board_glob,
392    )
393    images_list = glob.glob(images_glob)
394    for potential_image in images_list:
395        if filecmp.cmp(potential_image, image):
396            l.LogOutput(
397                "Found matching image %s in chromeos_root." % potential_image
398            )
399            return [True, potential_image]
400    # We did not find an image. Copy it in the src dir and return the copied
401    # file.
402    if board is None:
403        board = ""
404    base_dir = "%s/src/build/images/%s" % (chromeos_root_realpath, board)
405    if not os.path.isdir(base_dir):
406        os.makedirs(base_dir)
407    temp_dir = tempfile.mkdtemp(prefix="%s/tmp" % base_dir)
408    new_image = "%s/%s" % (temp_dir, os.path.basename(image))
409    l.LogOutput(
410        "No matching image found. Copying %s to %s" % (image, new_image)
411    )
412    shutil.copyfile(image, new_image)
413    return [False, new_image]
414
415
416def GetImageMountCommand(image, rootfs_mp, stateful_mp):
417    image_dir = os.path.dirname(image)
418    image_file = os.path.basename(image)
419    mount_command = (
420        "cd /mnt/host/source/src/scripts &&"
421        "./mount_gpt_image.sh --from=%s --image=%s"
422        " --safe --read_only"
423        " --rootfs_mountpt=%s"
424        " --stateful_mountpt=%s"
425        % (image_dir, image_file, rootfs_mp, stateful_mp)
426    )
427    return mount_command
428
429
430def MountImage(
431    chromeos_root,
432    image,
433    rootfs_mp,
434    stateful_mp,
435    log_level,
436    unmount=False,
437    extra_commands="",
438):
439    cmd_executer = command_executer.GetCommandExecuter(log_level=log_level)
440    command = GetImageMountCommand(image, rootfs_mp, stateful_mp)
441    if unmount:
442        command = "%s --unmount" % command
443    if extra_commands:
444        command = "%s ; %s" % (command, extra_commands)
445    ret, out, _ = cmd_executer.ChrootRunCommandWOutput(chromeos_root, command)
446    logger.GetLogger().LogFatalIf(ret, "Mount/unmount command failed!")
447    return out
448
449
450def IsImageModdedForTest(chromeos_root, image, log_level):
451    if log_level != "verbose":
452        log_level = "quiet"
453    command = "mktemp -d"
454    cmd_executer = command_executer.GetCommandExecuter(log_level=log_level)
455    _, rootfs_mp, _ = cmd_executer.ChrootRunCommandWOutput(
456        chromeos_root, command
457    )
458    _, stateful_mp, _ = cmd_executer.ChrootRunCommandWOutput(
459        chromeos_root, command
460    )
461    rootfs_mp = rootfs_mp.strip()
462    stateful_mp = stateful_mp.strip()
463    lsb_release_file = os.path.join(rootfs_mp, "etc/lsb-release")
464    extra = "grep CHROMEOS_RELEASE_TRACK %s | grep -i test" % lsb_release_file
465    output = MountImage(
466        chromeos_root,
467        image,
468        rootfs_mp,
469        stateful_mp,
470        log_level,
471        extra_commands=extra,
472    )
473    is_test_image = re.search("test", output, re.IGNORECASE)
474    MountImage(
475        chromeos_root, image, rootfs_mp, stateful_mp, log_level, unmount=True
476    )
477    return is_test_image
478
479
480def VerifyChromeChecksum(chromeos_root, image, remote, log_level):
481    command = "mktemp -d"
482    cmd_executer = command_executer.GetCommandExecuter(log_level=log_level)
483    _, rootfs_mp, _ = cmd_executer.ChrootRunCommandWOutput(
484        chromeos_root, command
485    )
486    _, stateful_mp, _ = cmd_executer.ChrootRunCommandWOutput(
487        chromeos_root, command
488    )
489    rootfs_mp = rootfs_mp.strip()
490    stateful_mp = stateful_mp.strip()
491    chrome_file = "%s/opt/google/chrome/chrome" % rootfs_mp
492    extra = "md5sum %s" % chrome_file
493    out = MountImage(
494        chromeos_root,
495        image,
496        rootfs_mp,
497        stateful_mp,
498        log_level,
499        extra_commands=extra,
500    )
501    image_chrome_checksum = out.strip().split()[0]
502    MountImage(
503        chromeos_root, image, rootfs_mp, stateful_mp, log_level, unmount=True
504    )
505
506    command = "md5sum /opt/google/chrome/chrome"
507    [_, o, _] = cmd_executer.CrosRunCommandWOutput(
508        command, chromeos_root=chromeos_root, machine=remote
509    )
510    device_chrome_checksum = o.split()[0]
511    return image_chrome_checksum.strip() == device_chrome_checksum.strip()
512
513
514# Remount partition as writable.
515# TODO: auto-detect if an image is built using --noenable_rootfs_verification.
516def TryRemountPartitionAsRW(chromeos_root, remote, log_level):
517    l = logger.GetLogger()
518    cmd_executer = command_executer.GetCommandExecuter(log_level=log_level)
519    command = "sudo mount -o remount,rw /"
520    ret = cmd_executer.CrosRunCommand(
521        command,
522        chromeos_root=chromeos_root,
523        machine=remote,
524        terminated_timeout=10,
525    )
526    if ret:
527        ## Safely ignore.
528        l.LogWarning(
529            "Failed to remount partition as rw, "
530            "probably the image was not built with "
531            '"--noenable_rootfs_verification", '
532            "you can safely ignore this."
533        )
534    else:
535        l.LogOutput("Re-mounted partition as writable.")
536
537
538def EnsureMachineUp(chromeos_root, remote, log_level):
539    l = logger.GetLogger()
540    cmd_executer = command_executer.GetCommandExecuter(log_level=log_level)
541    timeout = 600
542    magic = "abcdefghijklmnopqrstuvwxyz"
543    command = "echo %s" % magic
544    start_time = time.time()
545    while True:
546        current_time = time.time()
547        if current_time - start_time > timeout:
548            l.LogError(
549                "Timeout of %ss reached. Machine still not up. Aborting."
550                % timeout
551            )
552            return False
553        ret = cmd_executer.CrosRunCommand(
554            command, chromeos_root=chromeos_root, machine=remote
555        )
556        if not ret:
557            return True
558
559
560if __name__ == "__main__":
561    retval = DoImage(sys.argv)
562    sys.exit(retval)
563