xref: /aosp_15_r20/external/toolchain-utils/crosperf/suite_runner.py (revision 760c253c1ed00ce9abd48f8546f08516e57485fe)
1# -*- coding: utf-8 -*-
2# Copyright 2013 The ChromiumOS Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""SuiteRunner defines the interface from crosperf to test script."""
7
8
9import contextlib
10import json
11import os
12from pathlib import Path
13import pipes
14import random
15import shlex
16import subprocess
17import time
18
19from cros_utils import command_executer
20from cros_utils import misc
21
22
23# sshwatcher path, relative to ChromiumOS source root.
24SSHWATCHER = "src/platform/dev/contrib/sshwatcher/sshwatcher.go"
25TEST_THAT_PATH = "/usr/bin/test_that"
26TAST_PATH = "/usr/bin/tast"
27CROSFLEET_PATH = "crosfleet"
28GS_UTIL = "src/chromium/depot_tools/gsutil.py"
29AUTOTEST_DIR = "/mnt/host/source/src/third_party/autotest/files"
30CHROME_MOUNT_DIR = "/tmp/chrome_root"
31
32
33def GetProfilerArgs(profiler_args):
34    # Remove "--" from in front of profiler args.
35    args_list = shlex.split(profiler_args)
36    new_list = []
37    for arg in args_list:
38        if arg[0:2] == "--":
39            arg = arg[2:]
40        new_list.append(arg)
41    args_list = new_list
42
43    # Remove "perf_options=" from middle of profiler args.
44    new_list = []
45    for arg in args_list:
46        idx = arg.find("perf_options=")
47        if idx != -1:
48            prefix = arg[0:idx]
49            suffix = arg[idx + len("perf_options=") + 1 : -1]
50            new_arg = prefix + "'" + suffix + "'"
51            new_list.append(new_arg)
52        else:
53            new_list.append(arg)
54    args_list = new_list
55
56    return " ".join(args_list)
57
58
59def GetDutConfigArgs(dut_config):
60    return f"dut_config={pipes.quote(json.dumps(dut_config))}"
61
62
63@contextlib.contextmanager
64def ssh_tunnel(sshwatcher: "os.PathLike", machinename: str) -> str:
65    """Context manager that forwards a TCP port over SSH while active.
66
67    This class is used to set up port forwarding before entering the
68    chroot, so that the forwarded port can be used from inside
69    the chroot.
70
71    Args:
72        sshwatcher: Path to sshwatcher.go
73        machinename: Hostname of the machine to connect to.
74
75    Returns:
76        host:port string that can be passed to tast
77    """
78    # We have to tell sshwatcher which port we want to use.
79    # We pick a port that is likely to be available.
80    port = random.randrange(4096, 32768)
81    cmd = ["go", "run", str(sshwatcher), machinename, str(port)]
82    # Pylint wants us to use subprocess.Popen as a context manager,
83    # but we don't, so that we can ask sshwatcher to terminate and
84    # limit the time we wait for it to do so.
85    # pylint: disable=consider-using-with
86    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
87    try:
88        # sshwatcher takes a few seconds before it binds to the port,
89        # presumably due to SSH handshaking taking a while.
90        # Give it 12 seconds before we ask the client to connect.
91        time.sleep(12)
92        yield f"localhost:{port}"
93    finally:
94        proc.terminate()
95        proc.wait(timeout=5)
96
97
98class SuiteRunner(object):
99    """This defines the interface from crosperf to test script."""
100
101    def __init__(
102        self,
103        dut_config,
104        logger_to_use=None,
105        log_level="verbose",
106        cmd_exec=None,
107        cmd_term=None,
108    ):
109        self.logger = logger_to_use
110        self.log_level = log_level
111        self._ce = cmd_exec or command_executer.GetCommandExecuter(
112            self.logger, log_level=self.log_level
113        )
114        # DUT command executer.
115        # Will be initialized and used within Run.
116        self._ct = cmd_term or command_executer.CommandTerminator()
117        self.dut_config = dut_config
118
119    def Run(self, cros_machine, label, benchmark, test_args, profiler_args):
120        machine_name = cros_machine.name
121        for i in range(0, benchmark.retries + 1):
122            if label.crosfleet:
123                ret_tup = self.Crosfleet_Run(
124                    label, benchmark, test_args, profiler_args
125                )
126            else:
127                if benchmark.suite == "tast":
128                    with ssh_tunnel(
129                        Path(label.chromeos_root, SSHWATCHER), machine_name
130                    ) as hostport:
131                        ret_tup = self.Tast_Run(hostport, label, benchmark)
132                else:
133                    ret_tup = self.Test_That_Run(
134                        machine_name, label, benchmark, test_args, profiler_args
135                    )
136            if ret_tup[0] != 0:
137                self.logger.LogOutput(
138                    "benchmark %s failed. Retries left: %s"
139                    % (benchmark.name, benchmark.retries - i)
140                )
141            elif i > 0:
142                self.logger.LogOutput(
143                    "benchmark %s succeded after %s retries"
144                    % (benchmark.name, i)
145                )
146                break
147            else:
148                self.logger.LogOutput(
149                    "benchmark %s succeded on first try" % benchmark.name
150                )
151                break
152        return ret_tup
153
154    def RemoveTelemetryTempFile(self, machine, chromeos_root):
155        filename = "telemetry@%s" % machine
156        fullname = misc.GetOutsideChrootPath(
157            chromeos_root, os.path.join("/tmp", filename)
158        )
159        if os.path.exists(fullname):
160            os.remove(fullname)
161
162    def GenTestArgs(self, benchmark, test_args, profiler_args):
163        args_list = []
164
165        if benchmark.suite != "telemetry_Crosperf" and profiler_args:
166            self.logger.LogFatal(
167                "Tests other than telemetry_Crosperf do not "
168                "support profiler."
169            )
170
171        if test_args:
172            # Strip double quotes off args (so we can wrap them in single
173            # quotes, to pass through to Telemetry).
174            if test_args[0] == '"' and test_args[-1] == '"':
175                test_args = test_args[1:-1]
176            args_list.append("test_args='%s'" % test_args)
177
178        args_list.append(GetDutConfigArgs(self.dut_config))
179
180        if not (
181            benchmark.suite == "telemetry_Crosperf"
182            or benchmark.suite == "crosperf_Wrapper"
183        ):
184            self.logger.LogWarning(
185                "Please make sure the server test has stage for "
186                "device setup.\n"
187            )
188        else:
189            args_list.append("test=%s" % benchmark.test_name)
190            if benchmark.suite == "telemetry_Crosperf":
191                args_list.append("run_local=%s" % benchmark.run_local)
192                args_list.append(GetProfilerArgs(profiler_args))
193
194        return args_list
195
196    # TODO(zhizhouy): Currently do not support passing arguments or running
197    # customized tast tests, as we do not have such requirements.
198    def Tast_Run(self, machine, label, benchmark):
199        # Remove existing tast results
200        command = "rm -rf /usr/local/autotest/results/*"
201        self._ce.CrosRunCommand(
202            command, machine=machine, chromeos_root=label.chromeos_root
203        )
204
205        command = " ".join(
206            [TAST_PATH, "run", "-build=False", machine, benchmark.test_name]
207        )
208
209        if self.log_level != "verbose":
210            self.logger.LogOutput("Running test.")
211            self.logger.LogOutput("CMD: %s" % command)
212
213        return self._ce.ChrootRunCommandWOutput(
214            label.chromeos_root, command, command_terminator=self._ct
215        )
216
217    def Test_That_Run(
218        self, machine, label, benchmark, test_args, profiler_args
219    ):
220        """Run the test_that test.."""
221
222        # Remove existing test_that results
223        command = "rm -rf /usr/local/autotest/results/*"
224        self._ce.CrosRunCommand(
225            command, machine=machine, chromeos_root=label.chromeos_root
226        )
227
228        if benchmark.suite == "telemetry_Crosperf":
229            if not os.path.isdir(label.chrome_src):
230                self.logger.LogFatal(
231                    "Cannot find chrome src dir to "
232                    "run telemetry: %s" % label.chrome_src
233                )
234            # Check for and remove temporary file that may have been left by
235            # previous telemetry runs (and which might prevent this run from
236            # working).
237            self.RemoveTelemetryTempFile(machine, label.chromeos_root)
238
239        # --autotest_dir specifies which autotest directory to use.
240        autotest_dir_arg = "--autotest_dir=%s" % (
241            label.autotest_path if label.autotest_path else AUTOTEST_DIR
242        )
243
244        # --fast avoids unnecessary copies of syslogs.
245        fast_arg = "--fast"
246        board_arg = "--board=%s" % label.board
247
248        args_list = self.GenTestArgs(benchmark, test_args, profiler_args)
249        args_arg = "--args=%s" % pipes.quote(" ".join(args_list))
250
251        command = " ".join(
252            [
253                TEST_THAT_PATH,
254                autotest_dir_arg,
255                fast_arg,
256                board_arg,
257                args_arg,
258                machine,
259                benchmark.suite
260                if (
261                    benchmark.suite == "telemetry_Crosperf"
262                    or benchmark.suite == "crosperf_Wrapper"
263                )
264                else benchmark.test_name,
265            ]
266        )
267
268        # Use --no-ns-pid so that cros_sdk does not create a different
269        # process namespace and we can kill process created easily by their
270        # process group.
271        chrome_root_options = (
272            f"--no-ns-pid "
273            f"--chrome_root={label.chrome_src} --chrome_root_mount={CHROME_MOUNT_DIR} "
274            f'FEATURES="-usersandbox" '
275            f"CHROME_ROOT={CHROME_MOUNT_DIR}"
276        )
277
278        if self.log_level != "verbose":
279            self.logger.LogOutput("Running test.")
280            self.logger.LogOutput("CMD: %s" % command)
281
282        return self._ce.ChrootRunCommandWOutput(
283            label.chromeos_root,
284            command,
285            command_terminator=self._ct,
286            cros_sdk_options=chrome_root_options,
287        )
288
289    def DownloadResult(self, label, task_id):
290        gsutil_cmd = os.path.join(label.chromeos_root, GS_UTIL)
291        result_dir = "gs://chromeos-autotest-results/swarming-%s" % task_id
292        download_path = misc.GetOutsideChrootPath(label.chromeos_root, "/tmp")
293        ls_command = "%s ls %s" % (
294            gsutil_cmd,
295            os.path.join(result_dir, "autoserv_test"),
296        )
297        cp_command = "%s -mq cp -r %s %s" % (
298            gsutil_cmd,
299            result_dir,
300            download_path,
301        )
302
303        # Server sometimes will not be able to generate the result directory right
304        # after the test. Will try to access this gs location every 60s for
305        # RETRY_LIMIT mins.
306        t = 0
307        RETRY_LIMIT = 10
308        while t < RETRY_LIMIT:
309            t += 1
310            status = self._ce.RunCommand(ls_command, print_to_console=False)
311            if status == 0:
312                break
313            if t < RETRY_LIMIT:
314                self.logger.LogOutput(
315                    "Result directory not generated yet, "
316                    "retry (%d) in 60s." % t
317                )
318                time.sleep(60)
319            else:
320                self.logger.LogOutput(
321                    "No result directory for task %s" % task_id
322                )
323                return status
324
325        # Wait for 60s to make sure server finished writing to gs location.
326        time.sleep(60)
327
328        status = self._ce.RunCommand(cp_command)
329        if status != 0:
330            self.logger.LogOutput(
331                "Cannot download results from task %s" % task_id
332            )
333        else:
334            self.logger.LogOutput("Result downloaded for task %s" % task_id)
335        return status
336
337    def Crosfleet_Run(self, label, benchmark, test_args, profiler_args):
338        """Run the test via crosfleet.."""
339        options = []
340        if label.board:
341            options.append("-board=%s" % label.board)
342        if label.build:
343            options.append("-image=%s" % label.build)
344        # TODO: now only put toolchain pool here, user need to be able to specify
345        # which pool to use. Need to request feature to not use this option at all.
346        options.append("-pool=toolchain")
347
348        args_list = self.GenTestArgs(benchmark, test_args, profiler_args)
349        options.append("-test-args=%s" % pipes.quote(" ".join(args_list)))
350
351        dimensions = []
352        for dut in label.remote:
353            dimensions.append("-dim dut_name:%s" % dut.rstrip(".cros"))
354
355        command = ("%s create-test %s %s %s") % (
356            CROSFLEET_PATH,
357            " ".join(dimensions),
358            " ".join(options),
359            benchmark.suite
360            if (
361                benchmark.suite == "telemetry_Crosperf"
362                or benchmark.suite == "crosperf_Wrapper"
363            )
364            else benchmark.test_name,
365        )
366
367        if self.log_level != "verbose":
368            self.logger.LogOutput("Starting crosfleet test.")
369            self.logger.LogOutput("CMD: %s" % command)
370        ret_tup = self._ce.RunCommandWOutput(
371            command, command_terminator=self._ct
372        )
373
374        if ret_tup[0] != 0:
375            self.logger.LogOutput("Crosfleet test not created successfully.")
376            return ret_tup
377
378        # Std output of the command will look like:
379        # Created request at https://ci.chromium.org/../cros_test_platform/b12345
380        # We want to parse it and get the id number of the task, which is the
381        # number in the very end of the link address.
382        task_id = ret_tup[1].strip().split("b")[-1]
383
384        command = "crosfleet wait-task %s" % task_id
385        if self.log_level != "verbose":
386            self.logger.LogOutput("Waiting for crosfleet test to finish.")
387            self.logger.LogOutput("CMD: %s" % command)
388
389        ret_tup = self._ce.RunCommandWOutput(
390            command, command_terminator=self._ct
391        )
392
393        # The output of `wait-task` command will be a combination of verbose and a
394        # json format result in the end. The json result looks like this:
395        # {"task-result":
396        #   {"name":"Test Platform Invocation",
397        #    "state":"", "failure":false, "success":true,
398        #    "task-run-id":"12345",
399        #    "task-run-url":"https://ci.chromium.org/.../cros_test_platform/b12345",
400        #    "task-logs-url":""
401        #    },
402        #  "stdout":"",
403        #  "child-results":
404        #    [{"name":"graphics_WebGLAquarium",
405        #      "state":"", "failure":false, "success":true, "task-run-id":"",
406        #      "task-run-url":"https://chromeos-swarming.appspot.com/task?id=1234",
407        #      "task-logs-url":"https://stainless.corp.google.com/1234/"}
408        #    ]
409        # }
410        # We need the task id of the child-results to download result.
411        output = json.loads(ret_tup[1].split("\n")[-1])
412        output = output["child-results"][0]
413        if output["success"]:
414            task_id = output["task-run-url"].split("=")[-1]
415            if self.DownloadResult(label, task_id) == 0:
416                result_dir = "\nResults placed in tmp/swarming-%s\n" % task_id
417                return (ret_tup[0], result_dir, ret_tup[2])
418        return ret_tup
419
420    def CommandTerminator(self):
421        return self._ct
422
423    def Terminate(self):
424        self._ct.Terminate()
425
426
427class MockSuiteRunner(object):
428    """Mock suite runner for test."""
429
430    def __init__(self):
431        self._true = True
432
433    def Run(self, *_args):
434        if self._true:
435            return [0, "", ""]
436        else:
437            return [0, "", ""]
438