1# -*- coding: utf-8 -*- 2# Copyright 2013 The ChromiumOS Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""SuiteRunner defines the interface from crosperf to test script.""" 7 8 9import contextlib 10import json 11import os 12from pathlib import Path 13import pipes 14import random 15import shlex 16import subprocess 17import time 18 19from cros_utils import command_executer 20from cros_utils import misc 21 22 23# sshwatcher path, relative to ChromiumOS source root. 24SSHWATCHER = "src/platform/dev/contrib/sshwatcher/sshwatcher.go" 25TEST_THAT_PATH = "/usr/bin/test_that" 26TAST_PATH = "/usr/bin/tast" 27CROSFLEET_PATH = "crosfleet" 28GS_UTIL = "src/chromium/depot_tools/gsutil.py" 29AUTOTEST_DIR = "/mnt/host/source/src/third_party/autotest/files" 30CHROME_MOUNT_DIR = "/tmp/chrome_root" 31 32 33def GetProfilerArgs(profiler_args): 34 # Remove "--" from in front of profiler args. 35 args_list = shlex.split(profiler_args) 36 new_list = [] 37 for arg in args_list: 38 if arg[0:2] == "--": 39 arg = arg[2:] 40 new_list.append(arg) 41 args_list = new_list 42 43 # Remove "perf_options=" from middle of profiler args. 44 new_list = [] 45 for arg in args_list: 46 idx = arg.find("perf_options=") 47 if idx != -1: 48 prefix = arg[0:idx] 49 suffix = arg[idx + len("perf_options=") + 1 : -1] 50 new_arg = prefix + "'" + suffix + "'" 51 new_list.append(new_arg) 52 else: 53 new_list.append(arg) 54 args_list = new_list 55 56 return " ".join(args_list) 57 58 59def GetDutConfigArgs(dut_config): 60 return f"dut_config={pipes.quote(json.dumps(dut_config))}" 61 62 63@contextlib.contextmanager 64def ssh_tunnel(sshwatcher: "os.PathLike", machinename: str) -> str: 65 """Context manager that forwards a TCP port over SSH while active. 66 67 This class is used to set up port forwarding before entering the 68 chroot, so that the forwarded port can be used from inside 69 the chroot. 70 71 Args: 72 sshwatcher: Path to sshwatcher.go 73 machinename: Hostname of the machine to connect to. 74 75 Returns: 76 host:port string that can be passed to tast 77 """ 78 # We have to tell sshwatcher which port we want to use. 79 # We pick a port that is likely to be available. 80 port = random.randrange(4096, 32768) 81 cmd = ["go", "run", str(sshwatcher), machinename, str(port)] 82 # Pylint wants us to use subprocess.Popen as a context manager, 83 # but we don't, so that we can ask sshwatcher to terminate and 84 # limit the time we wait for it to do so. 85 # pylint: disable=consider-using-with 86 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) 87 try: 88 # sshwatcher takes a few seconds before it binds to the port, 89 # presumably due to SSH handshaking taking a while. 90 # Give it 12 seconds before we ask the client to connect. 91 time.sleep(12) 92 yield f"localhost:{port}" 93 finally: 94 proc.terminate() 95 proc.wait(timeout=5) 96 97 98class SuiteRunner(object): 99 """This defines the interface from crosperf to test script.""" 100 101 def __init__( 102 self, 103 dut_config, 104 logger_to_use=None, 105 log_level="verbose", 106 cmd_exec=None, 107 cmd_term=None, 108 ): 109 self.logger = logger_to_use 110 self.log_level = log_level 111 self._ce = cmd_exec or command_executer.GetCommandExecuter( 112 self.logger, log_level=self.log_level 113 ) 114 # DUT command executer. 115 # Will be initialized and used within Run. 116 self._ct = cmd_term or command_executer.CommandTerminator() 117 self.dut_config = dut_config 118 119 def Run(self, cros_machine, label, benchmark, test_args, profiler_args): 120 machine_name = cros_machine.name 121 for i in range(0, benchmark.retries + 1): 122 if label.crosfleet: 123 ret_tup = self.Crosfleet_Run( 124 label, benchmark, test_args, profiler_args 125 ) 126 else: 127 if benchmark.suite == "tast": 128 with ssh_tunnel( 129 Path(label.chromeos_root, SSHWATCHER), machine_name 130 ) as hostport: 131 ret_tup = self.Tast_Run(hostport, label, benchmark) 132 else: 133 ret_tup = self.Test_That_Run( 134 machine_name, label, benchmark, test_args, profiler_args 135 ) 136 if ret_tup[0] != 0: 137 self.logger.LogOutput( 138 "benchmark %s failed. Retries left: %s" 139 % (benchmark.name, benchmark.retries - i) 140 ) 141 elif i > 0: 142 self.logger.LogOutput( 143 "benchmark %s succeded after %s retries" 144 % (benchmark.name, i) 145 ) 146 break 147 else: 148 self.logger.LogOutput( 149 "benchmark %s succeded on first try" % benchmark.name 150 ) 151 break 152 return ret_tup 153 154 def RemoveTelemetryTempFile(self, machine, chromeos_root): 155 filename = "telemetry@%s" % machine 156 fullname = misc.GetOutsideChrootPath( 157 chromeos_root, os.path.join("/tmp", filename) 158 ) 159 if os.path.exists(fullname): 160 os.remove(fullname) 161 162 def GenTestArgs(self, benchmark, test_args, profiler_args): 163 args_list = [] 164 165 if benchmark.suite != "telemetry_Crosperf" and profiler_args: 166 self.logger.LogFatal( 167 "Tests other than telemetry_Crosperf do not " 168 "support profiler." 169 ) 170 171 if test_args: 172 # Strip double quotes off args (so we can wrap them in single 173 # quotes, to pass through to Telemetry). 174 if test_args[0] == '"' and test_args[-1] == '"': 175 test_args = test_args[1:-1] 176 args_list.append("test_args='%s'" % test_args) 177 178 args_list.append(GetDutConfigArgs(self.dut_config)) 179 180 if not ( 181 benchmark.suite == "telemetry_Crosperf" 182 or benchmark.suite == "crosperf_Wrapper" 183 ): 184 self.logger.LogWarning( 185 "Please make sure the server test has stage for " 186 "device setup.\n" 187 ) 188 else: 189 args_list.append("test=%s" % benchmark.test_name) 190 if benchmark.suite == "telemetry_Crosperf": 191 args_list.append("run_local=%s" % benchmark.run_local) 192 args_list.append(GetProfilerArgs(profiler_args)) 193 194 return args_list 195 196 # TODO(zhizhouy): Currently do not support passing arguments or running 197 # customized tast tests, as we do not have such requirements. 198 def Tast_Run(self, machine, label, benchmark): 199 # Remove existing tast results 200 command = "rm -rf /usr/local/autotest/results/*" 201 self._ce.CrosRunCommand( 202 command, machine=machine, chromeos_root=label.chromeos_root 203 ) 204 205 command = " ".join( 206 [TAST_PATH, "run", "-build=False", machine, benchmark.test_name] 207 ) 208 209 if self.log_level != "verbose": 210 self.logger.LogOutput("Running test.") 211 self.logger.LogOutput("CMD: %s" % command) 212 213 return self._ce.ChrootRunCommandWOutput( 214 label.chromeos_root, command, command_terminator=self._ct 215 ) 216 217 def Test_That_Run( 218 self, machine, label, benchmark, test_args, profiler_args 219 ): 220 """Run the test_that test..""" 221 222 # Remove existing test_that results 223 command = "rm -rf /usr/local/autotest/results/*" 224 self._ce.CrosRunCommand( 225 command, machine=machine, chromeos_root=label.chromeos_root 226 ) 227 228 if benchmark.suite == "telemetry_Crosperf": 229 if not os.path.isdir(label.chrome_src): 230 self.logger.LogFatal( 231 "Cannot find chrome src dir to " 232 "run telemetry: %s" % label.chrome_src 233 ) 234 # Check for and remove temporary file that may have been left by 235 # previous telemetry runs (and which might prevent this run from 236 # working). 237 self.RemoveTelemetryTempFile(machine, label.chromeos_root) 238 239 # --autotest_dir specifies which autotest directory to use. 240 autotest_dir_arg = "--autotest_dir=%s" % ( 241 label.autotest_path if label.autotest_path else AUTOTEST_DIR 242 ) 243 244 # --fast avoids unnecessary copies of syslogs. 245 fast_arg = "--fast" 246 board_arg = "--board=%s" % label.board 247 248 args_list = self.GenTestArgs(benchmark, test_args, profiler_args) 249 args_arg = "--args=%s" % pipes.quote(" ".join(args_list)) 250 251 command = " ".join( 252 [ 253 TEST_THAT_PATH, 254 autotest_dir_arg, 255 fast_arg, 256 board_arg, 257 args_arg, 258 machine, 259 benchmark.suite 260 if ( 261 benchmark.suite == "telemetry_Crosperf" 262 or benchmark.suite == "crosperf_Wrapper" 263 ) 264 else benchmark.test_name, 265 ] 266 ) 267 268 # Use --no-ns-pid so that cros_sdk does not create a different 269 # process namespace and we can kill process created easily by their 270 # process group. 271 chrome_root_options = ( 272 f"--no-ns-pid " 273 f"--chrome_root={label.chrome_src} --chrome_root_mount={CHROME_MOUNT_DIR} " 274 f'FEATURES="-usersandbox" ' 275 f"CHROME_ROOT={CHROME_MOUNT_DIR}" 276 ) 277 278 if self.log_level != "verbose": 279 self.logger.LogOutput("Running test.") 280 self.logger.LogOutput("CMD: %s" % command) 281 282 return self._ce.ChrootRunCommandWOutput( 283 label.chromeos_root, 284 command, 285 command_terminator=self._ct, 286 cros_sdk_options=chrome_root_options, 287 ) 288 289 def DownloadResult(self, label, task_id): 290 gsutil_cmd = os.path.join(label.chromeos_root, GS_UTIL) 291 result_dir = "gs://chromeos-autotest-results/swarming-%s" % task_id 292 download_path = misc.GetOutsideChrootPath(label.chromeos_root, "/tmp") 293 ls_command = "%s ls %s" % ( 294 gsutil_cmd, 295 os.path.join(result_dir, "autoserv_test"), 296 ) 297 cp_command = "%s -mq cp -r %s %s" % ( 298 gsutil_cmd, 299 result_dir, 300 download_path, 301 ) 302 303 # Server sometimes will not be able to generate the result directory right 304 # after the test. Will try to access this gs location every 60s for 305 # RETRY_LIMIT mins. 306 t = 0 307 RETRY_LIMIT = 10 308 while t < RETRY_LIMIT: 309 t += 1 310 status = self._ce.RunCommand(ls_command, print_to_console=False) 311 if status == 0: 312 break 313 if t < RETRY_LIMIT: 314 self.logger.LogOutput( 315 "Result directory not generated yet, " 316 "retry (%d) in 60s." % t 317 ) 318 time.sleep(60) 319 else: 320 self.logger.LogOutput( 321 "No result directory for task %s" % task_id 322 ) 323 return status 324 325 # Wait for 60s to make sure server finished writing to gs location. 326 time.sleep(60) 327 328 status = self._ce.RunCommand(cp_command) 329 if status != 0: 330 self.logger.LogOutput( 331 "Cannot download results from task %s" % task_id 332 ) 333 else: 334 self.logger.LogOutput("Result downloaded for task %s" % task_id) 335 return status 336 337 def Crosfleet_Run(self, label, benchmark, test_args, profiler_args): 338 """Run the test via crosfleet..""" 339 options = [] 340 if label.board: 341 options.append("-board=%s" % label.board) 342 if label.build: 343 options.append("-image=%s" % label.build) 344 # TODO: now only put toolchain pool here, user need to be able to specify 345 # which pool to use. Need to request feature to not use this option at all. 346 options.append("-pool=toolchain") 347 348 args_list = self.GenTestArgs(benchmark, test_args, profiler_args) 349 options.append("-test-args=%s" % pipes.quote(" ".join(args_list))) 350 351 dimensions = [] 352 for dut in label.remote: 353 dimensions.append("-dim dut_name:%s" % dut.rstrip(".cros")) 354 355 command = ("%s create-test %s %s %s") % ( 356 CROSFLEET_PATH, 357 " ".join(dimensions), 358 " ".join(options), 359 benchmark.suite 360 if ( 361 benchmark.suite == "telemetry_Crosperf" 362 or benchmark.suite == "crosperf_Wrapper" 363 ) 364 else benchmark.test_name, 365 ) 366 367 if self.log_level != "verbose": 368 self.logger.LogOutput("Starting crosfleet test.") 369 self.logger.LogOutput("CMD: %s" % command) 370 ret_tup = self._ce.RunCommandWOutput( 371 command, command_terminator=self._ct 372 ) 373 374 if ret_tup[0] != 0: 375 self.logger.LogOutput("Crosfleet test not created successfully.") 376 return ret_tup 377 378 # Std output of the command will look like: 379 # Created request at https://ci.chromium.org/../cros_test_platform/b12345 380 # We want to parse it and get the id number of the task, which is the 381 # number in the very end of the link address. 382 task_id = ret_tup[1].strip().split("b")[-1] 383 384 command = "crosfleet wait-task %s" % task_id 385 if self.log_level != "verbose": 386 self.logger.LogOutput("Waiting for crosfleet test to finish.") 387 self.logger.LogOutput("CMD: %s" % command) 388 389 ret_tup = self._ce.RunCommandWOutput( 390 command, command_terminator=self._ct 391 ) 392 393 # The output of `wait-task` command will be a combination of verbose and a 394 # json format result in the end. The json result looks like this: 395 # {"task-result": 396 # {"name":"Test Platform Invocation", 397 # "state":"", "failure":false, "success":true, 398 # "task-run-id":"12345", 399 # "task-run-url":"https://ci.chromium.org/.../cros_test_platform/b12345", 400 # "task-logs-url":"" 401 # }, 402 # "stdout":"", 403 # "child-results": 404 # [{"name":"graphics_WebGLAquarium", 405 # "state":"", "failure":false, "success":true, "task-run-id":"", 406 # "task-run-url":"https://chromeos-swarming.appspot.com/task?id=1234", 407 # "task-logs-url":"https://stainless.corp.google.com/1234/"} 408 # ] 409 # } 410 # We need the task id of the child-results to download result. 411 output = json.loads(ret_tup[1].split("\n")[-1]) 412 output = output["child-results"][0] 413 if output["success"]: 414 task_id = output["task-run-url"].split("=")[-1] 415 if self.DownloadResult(label, task_id) == 0: 416 result_dir = "\nResults placed in tmp/swarming-%s\n" % task_id 417 return (ret_tup[0], result_dir, ret_tup[2]) 418 return ret_tup 419 420 def CommandTerminator(self): 421 return self._ct 422 423 def Terminate(self): 424 self._ct.Terminate() 425 426 427class MockSuiteRunner(object): 428 """Mock suite runner for test.""" 429 430 def __init__(self): 431 self._true = True 432 433 def Run(self, *_args): 434 if self._true: 435 return [0, "", ""] 436 else: 437 return [0, "", ""] 438