1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright 2019 The ChromiumOS Authors 5# Use of this source code is governed by a BSD-style license that can be 6# found in the LICENSE file. 7 8"""Script to image a ChromeOS device. 9 10This script images a remote ChromeOS device with a specific image." 11""" 12 13 14__author__ = "[email protected] (Ahmad Sharif)" 15 16import argparse 17import filecmp 18import glob 19import os 20import re 21import shutil 22import sys 23import tempfile 24import time 25 26from cros_utils import command_executer 27from cros_utils import locks 28from cros_utils import logger 29from cros_utils import misc 30from cros_utils.file_utils import FileUtils 31 32 33checksum_file = "/usr/local/osimage_checksum_file" 34lock_file = "/tmp/image_chromeos_lock/image_chromeos_lock" 35 36 37def Usage(parser, message): 38 print("ERROR: %s" % message) 39 parser.print_help() 40 sys.exit(0) 41 42 43def CheckForCrosFlash(chromeos_root, remote, log_level): 44 cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) 45 46 # Check to see if remote machine has cherrypy, ctypes 47 command = "python -c 'import cherrypy, ctypes'" 48 ret = cmd_executer.CrosRunCommand( 49 command, chromeos_root=chromeos_root, machine=remote 50 ) 51 logger.GetLogger().LogFatalIf( 52 ret == 255, f"Failed ssh to {remote} (for checking cherrypy)" 53 ) 54 logger.GetLogger().LogFatalIf( 55 ret != 0, 56 f"Failed to find cherrypy or ctypes on '{remote}', " 57 "cros flash cannot work.", 58 ) 59 60 61def DisableCrosBeeps(chromeos_root, remote, log_level): 62 """Disable annoying chromebooks beeps after reboots.""" 63 cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) 64 65 command = "/usr/bin/futility gbb --set --flash --flags=0x1" 66 logger.GetLogger().LogOutput("Trying to disable beeping.") 67 68 ret, o, _ = cmd_executer.CrosRunCommandWOutput( 69 command, chromeos_root=chromeos_root, machine=remote 70 ) 71 if ret != 0: 72 logger.GetLogger().LogOutput(o) 73 logger.GetLogger().LogOutput("Failed to disable beeps.") 74 75 76def FindChromeOSImage(image_file, chromeos_root): 77 """Find path for ChromeOS image inside chroot. 78 79 This function could be called with image paths that are either inside 80 or outside the chroot. In either case the path needs to be translated 81 to an real/absolute path inside the chroot. 82 Example input paths: 83 /usr/local/google/home/uname/chromeos/out/tmp/my-test-images/image 84 ~/chromiumos/src/build/images/board/latest/image 85 /tmp/peppy-release/R67-1235.0.0/image 86 87 Corresponding example output paths: 88 /tmp/my-test-images/image 89 /mnt/host/source/src/build/images/board/latest/image 90 /tmp/peppy-release/R67-1235.0,0/image 91 """ 92 93 sys.path.insert(0, chromeos_root) 94 95 from chromite.lib import path_util 96 97 return path_util.ToChrootPath(image_file, source_path=chromeos_root) 98 99 100def DoImage(argv): 101 """Image ChromeOS.""" 102 103 parser = argparse.ArgumentParser() 104 parser.add_argument( 105 "-c", 106 "--chromeos_root", 107 dest="chromeos_root", 108 help="Target directory for ChromeOS installation.", 109 ) 110 parser.add_argument("-r", "--remote", dest="remote", help="Target device.") 111 parser.add_argument( 112 "-i", "--image", dest="image", help="Image binary file." 113 ) 114 parser.add_argument( 115 "-b", "--board", dest="board", help="Target board override." 116 ) 117 parser.add_argument( 118 "-f", 119 "--force", 120 dest="force", 121 action="store_true", 122 default=False, 123 help="Force an image even if it is non-test.", 124 ) 125 parser.add_argument( 126 "-n", 127 "--no_lock", 128 dest="no_lock", 129 default=False, 130 action="store_true", 131 help="Do not attempt to lock remote before imaging. " 132 "This option should only be used in cases where the " 133 "exclusive lock has already been acquired (e.g. in " 134 "a script that calls this one).", 135 ) 136 parser.add_argument( 137 "-l", 138 "--logging_level", 139 dest="log_level", 140 default="verbose", 141 help="Amount of logging to be used. Valid levels are " 142 "'quiet', 'average', and 'verbose'.", 143 ) 144 parser.add_argument("-a", "--image_args", dest="image_args") 145 parser.add_argument( 146 "--keep_stateful", 147 dest="keep_stateful", 148 default=False, 149 action="store_true", 150 help="Do not clobber the stateful partition.", 151 ) 152 153 options = parser.parse_args(argv[1:]) 154 155 if not options.log_level in command_executer.LOG_LEVEL: 156 Usage(parser, "--logging_level must be 'quiet', 'average' or 'verbose'") 157 else: 158 log_level = options.log_level 159 160 # Common initializations 161 cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) 162 l = logger.GetLogger() 163 164 if options.chromeos_root is None: 165 Usage(parser, "--chromeos_root must be set") 166 167 if options.remote is None: 168 Usage(parser, "--remote must be set") 169 170 options.chromeos_root = os.path.expanduser(options.chromeos_root) 171 172 if options.board is None: 173 board = cmd_executer.CrosLearnBoard( 174 options.chromeos_root, options.remote 175 ) 176 else: 177 board = options.board 178 179 if options.image is None: 180 images_dir = misc.GetImageDir(options.chromeos_root, board) 181 image = os.path.join(images_dir, "latest", "chromiumos_test_image.bin") 182 if not os.path.exists(image): 183 image = os.path.join(images_dir, "latest", "chromiumos_image.bin") 184 is_xbuddy_image = False 185 else: 186 image = options.image 187 is_xbuddy_image = image.startswith("xbuddy://") 188 if not is_xbuddy_image: 189 image = os.path.expanduser(image) 190 191 if not is_xbuddy_image: 192 image = os.path.realpath(image) 193 194 if not os.path.exists(image) and not is_xbuddy_image: 195 Usage(parser, "Image file: " + image + " does not exist!") 196 197 try: 198 should_unlock = False 199 if not options.no_lock: 200 try: 201 _ = locks.AcquireLock( 202 list(options.remote.split()), options.chromeos_root 203 ) 204 should_unlock = True 205 except Exception as e: 206 raise RuntimeError("Error acquiring machine: %s" % str(e)) 207 208 reimage = False 209 local_image = False 210 if not is_xbuddy_image: 211 local_image = True 212 image_checksum = FileUtils().Md5File(image, log_level=log_level) 213 214 command = "cat " + checksum_file 215 ret, device_checksum, _ = cmd_executer.CrosRunCommandWOutput( 216 command, 217 chromeos_root=options.chromeos_root, 218 machine=options.remote, 219 ) 220 221 device_checksum = device_checksum.strip() 222 image_checksum = str(image_checksum) 223 224 l.LogOutput("Image checksum: " + image_checksum) 225 l.LogOutput("Device checksum: " + device_checksum) 226 227 if image_checksum != device_checksum: 228 [found, located_image] = LocateOrCopyImage( 229 options.chromeos_root, image, board=board 230 ) 231 232 reimage = True 233 l.LogOutput("Checksums do not match. Re-imaging...") 234 235 chroot_image = FindChromeOSImage( 236 located_image, options.chromeos_root 237 ) 238 239 is_test_image = IsImageModdedForTest( 240 options.chromeos_root, chroot_image, log_level 241 ) 242 243 if not is_test_image and not options.force: 244 logger.GetLogger().LogFatal( 245 "Have to pass --force to image a " "non-test image!" 246 ) 247 else: 248 reimage = True 249 found = True 250 l.LogOutput("Using non-local image; Re-imaging...") 251 252 if reimage: 253 # If the device has /tmp mounted as noexec, image_to_live.sh can fail. 254 command = "mount -o remount,rw,exec /tmp" 255 cmd_executer.CrosRunCommand( 256 command, 257 chromeos_root=options.chromeos_root, 258 machine=options.remote, 259 ) 260 261 # Check to see if cros flash will work for the remote machine. 262 CheckForCrosFlash(options.chromeos_root, options.remote, log_level) 263 264 # Disable the annoying chromebook beeps after reboot. 265 DisableCrosBeeps(options.chromeos_root, options.remote, log_level) 266 267 cros_flash_args = [ 268 "cros", 269 "flash", 270 "--board=%s" % board, 271 ] 272 if not options.keep_stateful: 273 cros_flash_args.append("--clobber-stateful") 274 # New arguments should be added here. 275 276 # The last two arguments are positional and have to be at the end. 277 cros_flash_args.append(options.remote) 278 if local_image: 279 cros_flash_args.append(chroot_image) 280 else: 281 cros_flash_args.append(image) 282 283 command = " ".join(cros_flash_args) 284 285 # Workaround for crosbug.com/35684. 286 os.chmod(misc.GetChromeOSKeyFile(options.chromeos_root), 0o600) 287 288 if log_level == "average": 289 cmd_executer.SetLogLevel("verbose") 290 retries = 0 291 while True: 292 if log_level == "quiet": 293 l.LogOutput("CMD : %s" % command) 294 ret = cmd_executer.ChrootRunCommand( 295 options.chromeos_root, command, command_timeout=1800 296 ) 297 if ret == 0 or retries >= 2: 298 break 299 retries += 1 300 if log_level == "quiet": 301 l.LogOutput("Imaging failed. Retry # %d." % retries) 302 303 if log_level == "average": 304 cmd_executer.SetLogLevel(log_level) 305 306 logger.GetLogger().LogFatalIf(ret, "Image command failed") 307 308 # Unfortunately cros_image_to_target.py sometimes returns early when the 309 # machine isn't fully up yet. 310 ret = EnsureMachineUp( 311 options.chromeos_root, options.remote, log_level 312 ) 313 314 # If this is a non-local image, then the ret returned from 315 # EnsureMachineUp is the one that will be returned by this function; 316 # in that case, make sure the value in 'ret' is appropriate. 317 if not local_image and ret: 318 ret = 0 319 else: 320 ret = 1 321 322 if local_image: 323 if log_level == "average": 324 l.LogOutput("Verifying image.") 325 command = "echo %s > %s && chmod -w %s" % ( 326 image_checksum, 327 checksum_file, 328 checksum_file, 329 ) 330 ret = cmd_executer.CrosRunCommand( 331 command, 332 chromeos_root=options.chromeos_root, 333 machine=options.remote, 334 ) 335 logger.GetLogger().LogFatalIf(ret, "Writing checksum failed.") 336 337 successfully_imaged = VerifyChromeChecksum( 338 options.chromeos_root, 339 chroot_image, 340 options.remote, 341 log_level, 342 ) 343 logger.GetLogger().LogFatalIf( 344 not successfully_imaged, "Image verification failed!" 345 ) 346 TryRemountPartitionAsRW( 347 options.chromeos_root, options.remote, log_level 348 ) 349 350 if not found: 351 temp_dir = os.path.dirname(located_image) 352 l.LogOutput("Deleting temp image dir: %s" % temp_dir) 353 shutil.rmtree(temp_dir) 354 l.LogOutput("Image updated.") 355 else: 356 l.LogOutput("Checksums match, skip image update and reboot.") 357 command = "reboot && exit" 358 _ = cmd_executer.CrosRunCommand( 359 command, 360 chromeos_root=options.chromeos_root, 361 machine=options.remote, 362 ) 363 # Wait 30s after reboot. 364 time.sleep(30) 365 366 finally: 367 if should_unlock: 368 locks.ReleaseLock( 369 list(options.remote.split()), options.chromeos_root 370 ) 371 372 return ret 373 374 375def LocateOrCopyImage(chromeos_root, image, board=None): 376 l = logger.GetLogger() 377 if board is None: 378 board_glob = "*" 379 else: 380 board_glob = board 381 382 chromeos_root_realpath = os.path.realpath(chromeos_root) 383 image = os.path.realpath(image) 384 385 if image.startswith("%s/" % chromeos_root_realpath): 386 return [True, image] 387 388 # First search within the existing build dirs for any matching files. 389 images_glob = "%s/src/build/images/%s/*/*.bin" % ( 390 chromeos_root_realpath, 391 board_glob, 392 ) 393 images_list = glob.glob(images_glob) 394 for potential_image in images_list: 395 if filecmp.cmp(potential_image, image): 396 l.LogOutput( 397 "Found matching image %s in chromeos_root." % potential_image 398 ) 399 return [True, potential_image] 400 # We did not find an image. Copy it in the src dir and return the copied 401 # file. 402 if board is None: 403 board = "" 404 base_dir = "%s/src/build/images/%s" % (chromeos_root_realpath, board) 405 if not os.path.isdir(base_dir): 406 os.makedirs(base_dir) 407 temp_dir = tempfile.mkdtemp(prefix="%s/tmp" % base_dir) 408 new_image = "%s/%s" % (temp_dir, os.path.basename(image)) 409 l.LogOutput( 410 "No matching image found. Copying %s to %s" % (image, new_image) 411 ) 412 shutil.copyfile(image, new_image) 413 return [False, new_image] 414 415 416def GetImageMountCommand(image, rootfs_mp, stateful_mp): 417 image_dir = os.path.dirname(image) 418 image_file = os.path.basename(image) 419 mount_command = ( 420 "cd /mnt/host/source/src/scripts &&" 421 "./mount_gpt_image.sh --from=%s --image=%s" 422 " --safe --read_only" 423 " --rootfs_mountpt=%s" 424 " --stateful_mountpt=%s" 425 % (image_dir, image_file, rootfs_mp, stateful_mp) 426 ) 427 return mount_command 428 429 430def MountImage( 431 chromeos_root, 432 image, 433 rootfs_mp, 434 stateful_mp, 435 log_level, 436 unmount=False, 437 extra_commands="", 438): 439 cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) 440 command = GetImageMountCommand(image, rootfs_mp, stateful_mp) 441 if unmount: 442 command = "%s --unmount" % command 443 if extra_commands: 444 command = "%s ; %s" % (command, extra_commands) 445 ret, out, _ = cmd_executer.ChrootRunCommandWOutput(chromeos_root, command) 446 logger.GetLogger().LogFatalIf(ret, "Mount/unmount command failed!") 447 return out 448 449 450def IsImageModdedForTest(chromeos_root, image, log_level): 451 if log_level != "verbose": 452 log_level = "quiet" 453 command = "mktemp -d" 454 cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) 455 _, rootfs_mp, _ = cmd_executer.ChrootRunCommandWOutput( 456 chromeos_root, command 457 ) 458 _, stateful_mp, _ = cmd_executer.ChrootRunCommandWOutput( 459 chromeos_root, command 460 ) 461 rootfs_mp = rootfs_mp.strip() 462 stateful_mp = stateful_mp.strip() 463 lsb_release_file = os.path.join(rootfs_mp, "etc/lsb-release") 464 extra = "grep CHROMEOS_RELEASE_TRACK %s | grep -i test" % lsb_release_file 465 output = MountImage( 466 chromeos_root, 467 image, 468 rootfs_mp, 469 stateful_mp, 470 log_level, 471 extra_commands=extra, 472 ) 473 is_test_image = re.search("test", output, re.IGNORECASE) 474 MountImage( 475 chromeos_root, image, rootfs_mp, stateful_mp, log_level, unmount=True 476 ) 477 return is_test_image 478 479 480def VerifyChromeChecksum(chromeos_root, image, remote, log_level): 481 command = "mktemp -d" 482 cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) 483 _, rootfs_mp, _ = cmd_executer.ChrootRunCommandWOutput( 484 chromeos_root, command 485 ) 486 _, stateful_mp, _ = cmd_executer.ChrootRunCommandWOutput( 487 chromeos_root, command 488 ) 489 rootfs_mp = rootfs_mp.strip() 490 stateful_mp = stateful_mp.strip() 491 chrome_file = "%s/opt/google/chrome/chrome" % rootfs_mp 492 extra = "md5sum %s" % chrome_file 493 out = MountImage( 494 chromeos_root, 495 image, 496 rootfs_mp, 497 stateful_mp, 498 log_level, 499 extra_commands=extra, 500 ) 501 image_chrome_checksum = out.strip().split()[0] 502 MountImage( 503 chromeos_root, image, rootfs_mp, stateful_mp, log_level, unmount=True 504 ) 505 506 command = "md5sum /opt/google/chrome/chrome" 507 [_, o, _] = cmd_executer.CrosRunCommandWOutput( 508 command, chromeos_root=chromeos_root, machine=remote 509 ) 510 device_chrome_checksum = o.split()[0] 511 return image_chrome_checksum.strip() == device_chrome_checksum.strip() 512 513 514# Remount partition as writable. 515# TODO: auto-detect if an image is built using --noenable_rootfs_verification. 516def TryRemountPartitionAsRW(chromeos_root, remote, log_level): 517 l = logger.GetLogger() 518 cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) 519 command = "sudo mount -o remount,rw /" 520 ret = cmd_executer.CrosRunCommand( 521 command, 522 chromeos_root=chromeos_root, 523 machine=remote, 524 terminated_timeout=10, 525 ) 526 if ret: 527 ## Safely ignore. 528 l.LogWarning( 529 "Failed to remount partition as rw, " 530 "probably the image was not built with " 531 '"--noenable_rootfs_verification", ' 532 "you can safely ignore this." 533 ) 534 else: 535 l.LogOutput("Re-mounted partition as writable.") 536 537 538def EnsureMachineUp(chromeos_root, remote, log_level): 539 l = logger.GetLogger() 540 cmd_executer = command_executer.GetCommandExecuter(log_level=log_level) 541 timeout = 600 542 magic = "abcdefghijklmnopqrstuvwxyz" 543 command = "echo %s" % magic 544 start_time = time.time() 545 while True: 546 current_time = time.time() 547 if current_time - start_time > timeout: 548 l.LogError( 549 "Timeout of %ss reached. Machine still not up. Aborting." 550 % timeout 551 ) 552 return False 553 ret = cmd_executer.CrosRunCommand( 554 command, chromeos_root=chromeos_root, machine=remote 555 ) 556 if not ret: 557 return True 558 559 560if __name__ == "__main__": 561 retval = DoImage(sys.argv) 562 sys.exit(retval) 563