xref: /aosp_15_r20/external/autotest/server/autotest.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# Copyright 2007 Google Inc. Released under the GPL v2
3#pylint: disable-msg=C0111
4
5from __future__ import absolute_import
6from __future__ import division
7from __future__ import print_function
8
9import glob
10import logging
11import os
12import re
13import sys
14import tempfile
15import time
16import traceback
17
18import common
19from autotest_lib.client.bin.result_tools import runner as result_tools_runner
20from autotest_lib.client.common_lib import autotemp
21from autotest_lib.client.common_lib import base_job
22from autotest_lib.client.common_lib import error
23from autotest_lib.client.common_lib import global_config
24from autotest_lib.client.common_lib import packages
25from autotest_lib.client.common_lib import utils as client_utils
26from autotest_lib.server import installable_object
27from autotest_lib.server import utils
28from autotest_lib.server import utils as server_utils
29from autotest_lib.server.cros.dynamic_suite.constants import JOB_REPO_URL
30import six
31from six.moves import map
32
33
34try:
35    from autotest_lib.utils.frozen_chromite.lib import metrics
36except ImportError:
37    metrics = client_utils.metrics_mock
38
39
40# This is assumed to be the value by tests, do not change it.
41OFFLOAD_ENVVAR = "SYNCHRONOUS_OFFLOAD_DIR"
42
43AUTOTEST_SVN = 'svn://test.kernel.org/autotest/trunk/client'
44AUTOTEST_HTTP = 'http://test.kernel.org/svn/autotest/trunk/client'
45
46_CONFIG = global_config.global_config
47AUTOSERV_PREBUILD = _CONFIG.get_config_value(
48        'AUTOSERV', 'enable_server_prebuild', type=bool, default=False)
49
50# Match on a line like this:
51# FAIL test_name  test_name timestamp=1 localtime=Nov 15 12:43:10 <fail_msg>
52_FAIL_STATUS_RE = re.compile(
53    r'\s*FAIL.*localtime=.*\s*.*\s*[0-9]+:[0-9]+:[0-9]+\s*(?P<fail_msg>.*)')
54
55LOG_BUFFER_SIZE_BYTES = 64
56
57
58def _set_py_version():
59    """As of ~R102 (aka when this merges), DUTs only have Python 3."""
60    return '--py_version=3'
61
62
63class AutodirNotFoundError(Exception):
64    """No Autotest installation could be found."""
65
66
67class AutotestFailure(Exception):
68    """Gereric exception class for failures during a test run."""
69
70
71class AutotestAbort(AutotestFailure):
72    """
73    AutotestAborts are thrown when the DUT seems fine,
74    and the test doesn't give us an explicit reason for
75    failure; In this case we have no choice but to abort.
76    """
77
78
79class AutotestDeviceError(AutotestFailure):
80    """
81    Exceptions that inherit from AutotestDeviceError
82    are thrown when we can determine the current
83    state of the DUT and conclude that it probably
84    lead to the test failing; these exceptions lead
85    to failures instead of aborts.
86    """
87
88
89class AutotestDeviceNotPingable(AutotestDeviceError):
90    """Error for when a DUT becomes unpingable."""
91
92
93class AutotestDeviceNotSSHable(AutotestDeviceError):
94    """Error for when a DUT is pingable but not SSHable."""
95
96
97class AutotestDeviceRebooted(AutotestDeviceError):
98    """Error for when a DUT rebooted unexpectedly."""
99
100
101class Autotest(installable_object.InstallableObject):
102    """
103    This class represents the Autotest program.
104
105    Autotest is used to run tests automatically and collect the results.
106    It also supports profilers.
107
108    Implementation details:
109    This is a leaf class in an abstract class hierarchy, it must
110    implement the unimplemented methods in parent classes.
111    """
112
113    def __init__(self, host=None):
114        self.host = host
115        self.got = False
116        self.installed = False
117        self.serverdir = utils.get_server_dir()
118        super(Autotest, self).__init__()
119
120
121    install_in_tmpdir = False
122    @classmethod
123    def set_install_in_tmpdir(cls, flag):
124        """ Sets a flag that controls whether or not Autotest should by
125        default be installed in a "standard" directory (e.g.
126        /home/autotest, /usr/local/autotest) or a temporary directory. """
127        cls.install_in_tmpdir = flag
128
129
130    @classmethod
131    def get_client_autodir_paths(cls, host):
132        return global_config.global_config.get_config_value(
133                'AUTOSERV', 'client_autodir_paths', type=list)
134
135
136    @classmethod
137    def get_installed_autodir(cls, host):
138        """
139        Find where the Autotest client is installed on the host.
140        @returns an absolute path to an installed Autotest client root.
141        @raises AutodirNotFoundError if no Autotest installation can be found.
142        """
143        autodir = host.get_autodir()
144        if autodir:
145            logging.debug('Using existing host autodir: %s', autodir)
146            return autodir
147
148        for path in Autotest.get_client_autodir_paths(host):
149            try:
150                autotest_binary = os.path.join(path, 'bin', 'autotest')
151                host.run('test -x %s' % utils.sh_escape(autotest_binary))
152                host.run('test -w %s' % utils.sh_escape(path))
153                logging.debug('Found existing autodir at %s', path)
154                return path
155            except error.GenericHostRunError:
156                logging.debug('%s does not exist on %s', autotest_binary,
157                              host.hostname)
158        raise AutodirNotFoundError
159
160
161    @classmethod
162    def get_install_dir(cls, host):
163        """
164        Determines the location where autotest should be installed on
165        host. If self.install_in_tmpdir is set, it will return a unique
166        temporary directory that autotest can be installed in. Otherwise, looks
167        for an existing installation to use; if none is found, looks for a
168        usable directory in the global config client_autodir_paths.
169        """
170        try:
171            install_dir = cls.get_installed_autodir(host)
172        except AutodirNotFoundError:
173            install_dir = cls._find_installable_dir(host)
174
175        if cls.install_in_tmpdir:
176            return host.get_tmp_dir(parent=install_dir)
177        return install_dir
178
179
180    @classmethod
181    def _find_installable_dir(cls, host):
182        client_autodir_paths = cls.get_client_autodir_paths(host)
183        for path in client_autodir_paths:
184            try:
185                host.run('mkdir -p %s' % utils.sh_escape(path))
186                host.run('test -w %s' % utils.sh_escape(path))
187                return path
188            except error.AutoservRunError:
189                logging.debug('Failed to create %s', path)
190        metrics.Counter(
191            'chromeos/autotest/errors/no_autotest_install_path').increment(
192                fields={'dut_host_name': host.hostname})
193        raise error.AutoservInstallError(
194                'Unable to find a place to install Autotest; tried %s' %
195                ', '.join(client_autodir_paths))
196
197
198    def get_fetch_location(self):
199        """Generate list of locations where autotest can look for packages.
200
201        Hosts are tagged with an attribute containing the URL from which
202        to source packages when running a test on that host.
203
204        @returns the list of candidate locations to check for packages.
205        """
206        c = global_config.global_config
207        repos = c.get_config_value("PACKAGES", 'fetch_location', type=list,
208                                   default=[])
209        repos.reverse()
210
211        if not server_utils.is_inside_chroot():
212            # Only try to get fetch location from host attribute if the
213            # test is not running inside chroot.
214            #
215            # Look for the repo url via the host attribute. If we are
216            # not running with a full AFE autoserv will fall back to
217            # serving packages itself from whatever source version it is
218            # sync'd to rather than using the proper artifacts for the
219            # build on the host.
220            found_repo = self._get_fetch_location_from_host_attribute()
221            if found_repo is not None:
222                # Add our new repo to the end, the package manager will
223                # later reverse the list of repositories resulting in ours
224                # being first
225                repos.append(found_repo)
226
227        return repos
228
229
230    def _get_fetch_location_from_host_attribute(self):
231        """Get repo to use for packages from host attribute, if possible.
232
233        Hosts are tagged with an attribute containing the URL
234        from which to source packages when running a test on that host.
235        If self.host is set, attempt to look this attribute in the host info.
236
237        @returns value of the 'job_repo_url' host attribute, if present.
238        """
239        if not self.host:
240            return None
241
242        try:
243            info = self.host.host_info_store.get()
244        except Exception as e:
245            # TODO(pprabhu): We really want to catch host_info.StoreError here,
246            # but we can't import host_info from this module.
247            #   - autotest_lib.hosts.host_info pulls in (naturally)
248            #   autotest_lib.hosts.__init__
249            #   - This pulls in all the host classes ever defined
250            #   - That includes abstract_ssh, which depends on autotest
251            logging.warning('Failed to obtain host info: %r', e)
252            logging.warning('Skipping autotest fetch location based on %s',
253                            JOB_REPO_URL)
254            return None
255
256        job_repo_url = info.attributes.get(JOB_REPO_URL, '')
257        if not job_repo_url:
258            logging.warning("No %s for %s", JOB_REPO_URL, self.host)
259            return None
260
261        logging.info('Got job repo url from host attributes: %s',
262                        job_repo_url)
263        return job_repo_url
264
265
266    def install(self, host=None, autodir=None, use_packaging=True):
267        """Install autotest.  If |host| is not None, stores it in |self.host|.
268
269        @param host A Host instance on which autotest will be installed
270        @param autodir Location on the remote host to install to
271        @param use_packaging Enable install modes that use the packaging system.
272
273        """
274        if host:
275            self.host = host
276        self._install(host=host, autodir=autodir, use_packaging=use_packaging)
277
278
279    def install_full_client(self, host=None, autodir=None):
280        self._install(host=host, autodir=autodir, use_autoserv=False,
281                      use_packaging=False)
282
283
284    def install_no_autoserv(self, host=None, autodir=None):
285        self._install(host=host, autodir=autodir, use_autoserv=False)
286
287
288    def _install_using_packaging(self, host, autodir):
289        repos = self.get_fetch_location()
290        if not repos:
291            raise error.PackageInstallError("No repos to install an "
292                                            "autotest client from")
293        # Make sure devserver has the autotest package staged
294        host.verify_job_repo_url()
295        pkgmgr = packages.PackageManager(autodir, hostname=host.hostname,
296                                         repo_urls=repos,
297                                         do_locking=False,
298                                         run_function=host.run,
299                                         run_function_dargs=dict(timeout=600))
300        # The packages dir is used to store all the packages that
301        # are fetched on that client. (for the tests,deps etc.
302        # too apart from the client)
303        pkg_dir = os.path.join(autodir, 'packages')
304        # clean up the autodir except for the packages and result_tools
305        # directory.
306        host.run('cd %s && ls | grep -v "^packages$" | grep -v "^result_tools$"'
307                 ' | xargs rm -rf && rm -rf .[!.]*' % autodir)
308        pkgmgr.install_pkg('autotest', 'client', pkg_dir, autodir,
309                           preserve_install_dir=True)
310        self.installed = True
311
312
313    def _install_using_send_file(self, host, autodir):
314        dirs_to_exclude = set(["tests", "site_tests", "deps", "profilers",
315                               "packages"])
316        light_files = [os.path.join(self.source_material, f)
317                       for f in os.listdir(self.source_material)
318                       if f not in dirs_to_exclude]
319        host.send_file(light_files, autodir, delete_dest=True)
320
321        # create empty dirs for all the stuff we excluded
322        commands = []
323        for path in dirs_to_exclude:
324            abs_path = os.path.join(autodir, path)
325            abs_path = utils.sh_escape(abs_path)
326            commands.append("mkdir -p '%s'" % abs_path)
327            commands.append("touch '%s'/__init__.py" % abs_path)
328        host.run(';'.join(commands))
329
330
331    def _install(self, host=None, autodir=None, use_autoserv=True,
332                 use_packaging=True):
333        """
334        Install autotest.  If get() was not called previously, an
335        attempt will be made to install from the autotest svn
336        repository.
337
338        @param host A Host instance on which autotest will be installed
339        @param autodir Location on the remote host to install to
340        @param use_autoserv Enable install modes that depend on the client
341            running with the autoserv harness
342        @param use_packaging Enable install modes that use the packaging system
343
344        @exception AutoservError if a tarball was not specified and
345            the target host does not have svn installed in its path
346        """
347        if not host:
348            host = self.host
349        if not self.got:
350            self.get()
351        host.wait_up(timeout=30)
352        host.setup()
353        # B/203609358 someting is removing telemetry. Adding this to check the
354        # status of the folder as early as possible.
355        logging.info("Installing autotest on %s", host.hostname)
356
357        # set up the autotest directory on the remote machine
358        if not autodir:
359            autodir = self.get_install_dir(host)
360        logging.info('Using installation dir %s', autodir)
361        host.set_autodir(autodir)
362        host.run('mkdir -p %s' % utils.sh_escape(autodir))
363
364        # make sure there are no files in $AUTODIR/results
365        results_path = os.path.join(autodir, 'results')
366        host.run('rm -rf %s/*' % utils.sh_escape(results_path),
367                 ignore_status=True)
368
369        # Fetch the autotest client from the nearest repository
370        if use_packaging:
371            try:
372                self._install_using_packaging(host, autodir)
373                logging.info("Installation of autotest completed using the "
374                             "packaging system.")
375                return
376            except (error.PackageInstallError, error.AutoservRunError,
377                    global_config.ConfigError) as e:
378                logging.info("Could not install autotest using the packaging "
379                             "system: %s. Trying other methods", e)
380        else:
381            # Delete the package checksum file to force dut updating local
382            # packages.
383            command = ('rm -f "%s"' %
384                       (os.path.join(autodir, packages.CHECKSUM_FILE)))
385            host.run(command)
386
387        # try to install from file or directory
388        if self.source_material:
389            c = global_config.global_config
390            supports_autoserv_packaging = c.get_config_value(
391                "PACKAGES", "serve_packages_from_autoserv", type=bool)
392            # Copy autotest recursively
393            if supports_autoserv_packaging and use_autoserv:
394                self._install_using_send_file(host, autodir)
395            else:
396                host.send_file(self.source_material, autodir, delete_dest=True)
397            logging.info("Installation of autotest completed from %s",
398                         self.source_material)
399            self.installed = True
400        else:
401            # if that fails try to install using svn
402            if utils.run('which svn').exit_status:
403                raise error.AutoservError(
404                        'svn not found on target machine: %s' %
405                        host.hostname)
406            try:
407                host.run('svn checkout %s %s' % (AUTOTEST_SVN, autodir))
408            except error.AutoservRunError as e:
409                host.run('svn checkout %s %s' % (AUTOTEST_HTTP, autodir))
410            logging.info("Installation of autotest completed using SVN.")
411            self.installed = True
412
413        # TODO(milleral): http://crbug.com/258161
414        # Send over the most recent global_config.ini after installation if one
415        # is available.
416        # This code is a bit duplicated from
417        # _Run._create_client_config_file, but oh well.
418        if self.installed and self.source_material:
419            self._send_shadow_config()
420
421        # sync the disk, to avoid getting 0-byte files if a test resets the DUT
422        host.run(os.path.join(autodir, 'bin', 'fs_sync.py'),
423                 ignore_status=True)
424
425    def _send_shadow_config(self):
426        logging.info('Installing updated global_config.ini.')
427        destination = os.path.join(self.host.get_autodir(),
428                                   'global_config.ini')
429        with tempfile.NamedTemporaryFile(mode='w') as client_config:
430            config = global_config.global_config
431            client_section = config.get_section_values('CLIENT')
432            client_section.write(client_config)
433            client_config.flush()
434            self.host.send_file(client_config.name, destination)
435
436
437    def uninstall(self, host=None):
438        """
439        Uninstall (i.e. delete) autotest. Removes the autotest client install
440        from the specified host.
441
442        @params host a Host instance from which the client will be removed
443        """
444        if not self.installed:
445            return
446        if not host:
447            host = self.host
448        autodir = host.get_autodir()
449        if not autodir:
450            return
451
452        # perform the actual uninstall
453        host.run("rm -rf %s" % utils.sh_escape(autodir), ignore_status=True)
454        host.set_autodir(None)
455        self.installed = False
456
457
458    def get(self, location=None):
459        if not location:
460            location = os.path.join(self.serverdir, '../client')
461            location = os.path.abspath(location)
462        installable_object.InstallableObject.get(self, location)
463        self.got = True
464
465
466    def run(self, control_file, results_dir='.', host=None, timeout=None,
467            tag=None, parallel_flag=False, background=False,
468            client_disconnect_timeout=None, use_packaging=True):
469        """
470        Run an autotest job on the remote machine.
471
472        @param control_file: An open file-like-obj of the control file.
473        @param results_dir: A str path where the results should be stored
474                on the local filesystem.
475        @param host: A Host instance on which the control file should
476                be run.
477        @param timeout: Maximum number of seconds to wait for the run or None.
478        @param tag: Tag name for the client side instance of autotest.
479        @param parallel_flag: Flag set when multiple jobs are run at the
480                same time.
481        @param background: Indicates that the client should be launched as
482                a background job; the code calling run will be responsible
483                for monitoring the client and collecting the results.
484        @param client_disconnect_timeout: Seconds to wait for the remote host
485                to come back after a reboot. Defaults to the host setting for
486                DEFAULT_REBOOT_TIMEOUT.
487
488        @raises AutotestRunError: If there is a problem executing
489                the control file.
490        """
491        host = self._get_host_and_setup(host, use_packaging=use_packaging)
492        logging.debug('Autotest job starts on remote host: %s',
493                      host.hostname)
494        results_dir = os.path.abspath(results_dir)
495
496        if client_disconnect_timeout is None:
497            client_disconnect_timeout = host.DEFAULT_REBOOT_TIMEOUT
498
499        if tag:
500            results_dir = os.path.join(results_dir, tag)
501
502        atrun = _Run(host, results_dir, tag, parallel_flag, background)
503        self._do_run(control_file, results_dir, host, atrun, timeout,
504                     client_disconnect_timeout, use_packaging=use_packaging)
505
506
507    def _get_host_and_setup(self, host, use_packaging=True):
508        if not host:
509            host = self.host
510        if not self.installed:
511            self.install(host, use_packaging=use_packaging)
512
513        host.wait_up(timeout=30)
514        return host
515
516
517    def _do_run(self, control_file, results_dir, host, atrun, timeout,
518                client_disconnect_timeout, use_packaging=True):
519        try:
520            atrun.verify_machine()
521        except:
522            logging.error("Verify failed on %s. Reinstalling autotest",
523                          host.hostname)
524            self.install(host)
525            atrun.verify_machine()
526        debug = os.path.join(results_dir, 'debug')
527        try:
528            os.makedirs(debug)
529        except Exception:
530            pass
531
532        delete_file_list = [atrun.remote_control_file,
533                            atrun.remote_control_file + '.state',
534                            atrun.manual_control_file,
535                            atrun.manual_control_file + '.state']
536        cmd = ';'.join('rm -f ' + control for control in delete_file_list)
537        host.run(cmd, ignore_status=True)
538
539        tmppath = utils.get(control_file, local_copy=True)
540
541        # build up the initialization prologue for the control file
542        prologue_lines = []
543
544        # Add the additional user arguments
545        prologue_lines.append("args = %r\n" % self.job.args)
546
547        # If the packaging system is being used, add the repository list.
548        repos = None
549        try:
550            if use_packaging:
551                repos = self.get_fetch_location()
552                prologue_lines.append('job.add_repository(%s)\n' % repos)
553            else:
554                logging.debug('use_packaging is set to False, do not add any '
555                              'repository.')
556        except global_config.ConfigError as e:
557            # If repos is defined packaging is enabled so log the error
558            if repos:
559                logging.error(e)
560
561        # on full-size installs, turn on any profilers the server is using
562        if not atrun.background:
563            running_profilers = six.iteritems(host.job.profilers.add_log)
564            for profiler, (args, dargs) in running_profilers:
565                call_args = [repr(profiler)]
566                call_args += [repr(arg) for arg in args]
567                call_args += ["%s=%r" % item for item in six.iteritems(dargs)]
568                prologue_lines.append("job.profilers.add(%s)\n"
569                                      % ", ".join(call_args))
570        cfile = "".join(prologue_lines)
571
572        cfile += open(tmppath).read()
573        open(tmppath, "w").write(cfile)
574
575        # Create and copy state file to remote_control_file + '.state'
576        state_file = host.job.preprocess_client_state()
577        host.send_file(state_file, atrun.remote_control_file + '.init.state')
578        os.remove(state_file)
579
580        # Copy control_file to remote_control_file on the host
581        host.send_file(tmppath, atrun.remote_control_file)
582        if os.path.abspath(tmppath) != os.path.abspath(control_file):
583            os.remove(tmppath)
584
585        atrun.execute_control(
586                timeout=timeout,
587                client_disconnect_timeout=client_disconnect_timeout)
588
589
590    @staticmethod
591    def extract_test_failure_msg(failure_status_line):
592        """Extract the test failure message from the status line.
593
594        @param failure_status_line:  String of test failure status line, it will
595            look like:
596          FAIL <test name>  <test name> timestamp=<ts> localtime=<lt> <reason>
597
598        @returns String of the reason, return empty string if we can't regex out
599            reason.
600        """
601        fail_msg = ''
602        match = _FAIL_STATUS_RE.match(failure_status_line)
603        if match:
604            fail_msg = match.group('fail_msg')
605        return fail_msg
606
607
608    @classmethod
609    def _check_client_test_result(cls, host, test_name):
610        """
611        Check result of client test.
612        Autotest will store results in the file name status.
613        We check that second to last line in that file begins with 'END GOOD'
614
615        @raises TestFail: If client test does not pass.
616        """
617        client_result_dir = '%s/results/default' % host.autodir
618        command = 'tail -2 %s/status | head -1' % client_result_dir
619        status = host.run(command).stdout.strip()
620        logging.info(status)
621        if status[:8] != 'END GOOD':
622            test_fail_status_line_cmd = (
623                    'grep "^\s*FAIL\s*%s" %s/status | tail -n 1' %
624                    (test_name, client_result_dir))
625            test_fail_msg = cls.extract_test_failure_msg(
626                    host.run(test_fail_status_line_cmd).stdout.strip())
627            test_fail_msg_reason = ('' if not test_fail_msg
628                                    else ' (reason: %s)' % test_fail_msg)
629            test_fail_status = '%s client test did not pass%s.' % (
630                    test_name, test_fail_msg_reason)
631            raise error.TestFail(test_fail_status)
632
633
634    def run_timed_test(self, test_name, results_dir='.', host=None,
635                       timeout=None, parallel_flag=False, background=False,
636                       client_disconnect_timeout=None, *args, **dargs):
637        """
638        Assemble a tiny little control file to just run one test,
639        and run it as an autotest client-side test
640        """
641        if not host:
642            host = self.host
643        if not self.installed:
644            self.install(host)
645
646        opts = ["%s=%s" % (o[0], repr(o[1])) for o in dargs.items()]
647        cmd = ", ".join([repr(test_name)] + list(map(repr, args)) + opts)
648        control = "job.run_test(%s)\n" % cmd
649        self.run(control, results_dir, host, timeout=timeout,
650                 parallel_flag=parallel_flag, background=background,
651                 client_disconnect_timeout=client_disconnect_timeout)
652
653        if dargs.get('check_client_result', False):
654            self._check_client_test_result(host, test_name)
655
656
657    def run_test(self,
658                 test_name,
659                 results_dir='.',
660                 host=None,
661                 parallel_flag=False,
662                 background=False,
663                 client_disconnect_timeout=None,
664                 timeout=None,
665                 *args,
666                 **dargs):
667        self.run_timed_test(
668                test_name,
669                results_dir,
670                host,
671                timeout=timeout,
672                parallel_flag=parallel_flag,
673                background=background,
674                client_disconnect_timeout=client_disconnect_timeout,
675                *args,
676                **dargs)
677
678
679    def run_static_method(self, module, method, results_dir='.', host=None,
680                          *args):
681        """Runs a non-instance method with |args| from |module| on the client.
682
683        This method runs a static/class/module autotest method on the client.
684        For example:
685          run_static_method("autotest_lib.client.cros.cros_ui", "reboot")
686
687        Will run autotest_lib.client.cros.cros_ui.reboot() on the client.
688
689        @param module: module name as you would refer to it when importing in a
690            control file. e.g. autotest_lib.client.common_lib.module_name.
691        @param method: the method you want to call.
692        @param results_dir: A str path where the results should be stored
693            on the local filesystem.
694        @param host: A Host instance on which the control file should
695            be run.
696        @param args: args to pass to the method.
697        """
698        control = "\n".join(["import %s" % module,
699                             "%s.%s(%s)\n" % (module, method,
700                                              ','.join(map(repr, args)))])
701        self.run(control, results_dir=results_dir, host=host)
702
703
704class _Run(object):
705    """
706    Represents a run of autotest control file.  This class maintains
707    all the state necessary as an autotest control file is executed.
708
709    It is not intended to be used directly, rather control files
710    should be run using the run method in Autotest.
711    """
712    def __init__(self, host, results_dir, tag, parallel_flag, background):
713        self.host = host
714        self.results_dir = results_dir
715        self.tag = tag
716        self.parallel_flag = parallel_flag
717        self.background = background
718        self.autodir = Autotest.get_installed_autodir(self.host)
719        control = os.path.join(self.autodir, 'control')
720        if tag:
721            control += '.' + tag
722        self.manual_control_file = control
723        self.remote_control_file = control + '.autoserv'
724        self.config_file = os.path.join(self.autodir, 'global_config.ini')
725
726
727    def verify_machine(self):
728        binary = os.path.join(self.autodir, 'bin/autotest')
729        at_check = "test -e {} && echo True || echo False".format(binary)
730        if not self.parallel_flag:
731            tmpdir = os.path.join(self.autodir, 'tmp')
732            download = os.path.join(self.autodir, 'tests/download')
733            at_check += "; umount {}; umount {}".format(tmpdir, download)
734        # Check if the test dir is missing.
735        if "False" in str(self.host.run(at_check, ignore_status=True).stdout):
736            raise error.AutoservInstallError(
737                "Autotest does not appear to be installed")
738
739
740
741    def get_base_cmd_args(self, section):
742        args = ['--verbose']
743        if section > 0:
744            args.append('-c')
745        if self.tag:
746            args.append('-t %s' % self.tag)
747        if self.host.job.use_external_logging():
748            args.append('-l')
749        if self.host.hostname:
750            args.append('--hostname=%s' % self.host.hostname)
751        args.append('--user=%s' % self.host.job.user)
752
753        args.append(self.remote_control_file)
754        return args
755
756
757    def get_background_cmd(self, section):
758        cmd = [
759                'nohup',
760                os.path.join(self.autodir, 'bin/autotest_client'),
761                _set_py_version()
762        ]
763        cmd += self.get_base_cmd_args(section)
764        cmd += ['>/dev/null', '2>/dev/null', '&']
765        return ' '.join(cmd)
766
767
768    def get_daemon_cmd(self, section, monitor_dir):
769        cmd = [
770                'nohup',
771                os.path.join(self.autodir, 'bin/autotestd'), monitor_dir,
772                '-H autoserv',
773                _set_py_version()
774        ]
775        cmd += self.get_base_cmd_args(section)
776        cmd += ['>/dev/null', '2>/dev/null', '&']
777        return ' '.join(cmd)
778
779
780    def get_monitor_cmd(self, monitor_dir, stdout_read, stderr_read):
781        cmd = [
782                os.path.join(self.autodir, 'bin', 'autotestd_monitor'),
783                monitor_dir,
784                str(stdout_read),
785                str(stderr_read),
786                _set_py_version()
787        ]
788        return ' '.join(cmd)
789
790
791    def get_client_log(self):
792        """Find what the "next" client.* prefix should be
793
794        @returns A string of the form client.INTEGER that should be prefixed
795            to all client debug log files.
796        """
797        max_digit = -1
798        debug_dir = os.path.join(self.results_dir, 'debug')
799        client_logs = glob.glob(os.path.join(debug_dir, 'client.*.*'))
800        for log in client_logs:
801            _, number, _ = log.split('.', 2)
802            if number.isdigit():
803                max_digit = max(max_digit, int(number))
804        return 'client.%d' % (max_digit + 1)
805
806
807    def copy_client_config_file(self, client_log_prefix=None):
808        """
809        Create and copy the client config file based on the server config.
810
811        @param client_log_prefix: Optional prefix to prepend to log files.
812        """
813        client_config_file = self._create_client_config_file(client_log_prefix)
814        self.host.send_file(client_config_file, self.config_file)
815        os.remove(client_config_file)
816
817
818    def _create_client_config_file(self, client_log_prefix=None):
819        """
820        Create a temporary file with the [CLIENT] section configuration values
821        taken from the server global_config.ini.
822
823        @param client_log_prefix: Optional prefix to prepend to log files.
824
825        @return: Path of the temporary file generated.
826        """
827        config = global_config.global_config.get_section_values('CLIENT')
828        if client_log_prefix:
829            config.set('CLIENT', 'default_logging_name', client_log_prefix)
830        return self._create_aux_file(config.write)
831
832
833    def _create_aux_file(self, func, *args):
834        """
835        Creates a temporary file and writes content to it according to a
836        content creation function. The file object is appended to *args, which
837        is then passed to the content creation function
838
839        @param func: Function that will be used to write content to the
840                temporary file.
841        @param *args: List of parameters that func takes.
842        @return: Path to the temporary file that was created.
843        """
844        fd, path = tempfile.mkstemp(dir=self.host.job.tmpdir)
845        aux_file = os.fdopen(fd, "w")
846        try:
847            list_args = list(args)
848            list_args.append(aux_file)
849            func(*list_args)
850        finally:
851            aux_file.close()
852        return path
853
854
855    @staticmethod
856    def is_client_job_finished(last_line):
857        return bool(re.match(r'^\t*END .*\t[\w.-]+\t[\w.-]+\t.*$', last_line))
858
859
860    @staticmethod
861    def is_client_job_rebooting(last_line):
862        return bool(re.match(r'^\t*GOOD\t[\w.-]+\treboot\.start.*$', last_line))
863
864
865    # Roughly ordered list from concrete to less specific reboot causes.
866    _failure_reasons = [
867        # Try to find possible reasons leading towards failure.
868        ('ethernet recovery methods have failed. Rebooting.',
869         'dead ethernet dongle crbug/1031035'),
870        # GPU hangs are not always recovered from.
871        ('[drm:amdgpu_job_timedout] \*ERROR\* ring gfx timeout',
872         'drm ring gfx timeout'),
873        ('[drm:do_aquire_global_lock] \*ERROR(.*)hw_done or flip_done timed',
874         'drm hw/flip timeout'),
875        ('[drm:i915_hangcheck_hung] \*ERROR\* Hangcheck(.*)GPU hung',
876         'drm GPU hung'),
877        # TODO(ihf): try to get a better magic signature for kernel crashes.
878        ('BUG: unable to handle kernel paging request', 'kernel paging'),
879        ('Kernel panic - not syncing: Out of memory', 'kernel out of memory'),
880        ('Kernel panic - not syncing', 'kernel panic'),
881        # Fish for user mode killing OOM messages. Shows unstable system.
882        ('out_of_memory', 'process out of memory'),
883        # Reboot was bad enough to have truncated the logs.
884        ('crash_reporter(.*)Stored kcrash', 'kcrash'),
885        ('crash_reporter(.*)Last shutdown was not clean', 'not clean'),
886    ]
887
888    def _diagnose_reboot(self):
889        """
890        Runs diagnostic check on a rebooted DUT.
891
892        TODO(ihf): if this analysis is useful consider moving the code to the
893                   DUT into a script and call it from here. This is more
894                   powerful and might be cleaner to grow in functionality. But
895                   it may also be less robust if stateful is damaged during the
896                   reboot.
897
898        @returns msg describing reboot reason.
899        """
900        reasons = []
901        for (message, bucket) in self._failure_reasons:
902            # Use -a option for grep to avoid "binary file" warning to stdout.
903            # The grep -v is added to not match itself in the log (across jobs).
904            # Using grep is slightly problematic as it finds any reason, not
905            # just the most recent reason (since 2 boots ago), so it may guess
906            # wrong. Multiple reboots are unusual in the lab setting though and
907            # it is better to have a reasonable guess than no reason at all.
908            found = self.host.run(
909                "grep -aE '" + message + "' /var/log/messages | grep -av grep",
910                ignore_status=True
911            ).stdout
912            if found and found.strip():
913                reasons.append(bucket)
914        signature = 'reason unknown'
915        if reasons:
916            # Concatenate possible reasons found to obtain a magic signature.
917            signature = ', '.join(reasons)
918        return ('DUT rebooted during the test run. (%s)\n' % signature)
919
920
921    def _diagnose_dut(self, old_boot_id=None):
922        """
923        Run diagnostic checks on a DUT.
924
925        1. ping: A dead host will not respond to pings.
926        2. ssh (happens with 3.): DUT hangs usually fail in authentication
927            but respond to pings.
928        3. Check if a reboot occured: A healthy but unexpected reboot leaves the
929            host running with a new boot id.
930
931        This method will always raise an exception from the AutotestFailure
932        family and should only get called when the reason for a test failing
933        is ambiguous.
934
935        @raises AutotestDeviceNotPingable: If the DUT doesn't respond to ping.
936        @raises AutotestDeviceNotSSHable: If we cannot SSH into the DUT.
937        @raises AutotestDeviceRebooted: If the boot id changed.
938        @raises AutotestAbort: If none of the above exceptions were raised.
939            Since we have no recourse we must abort at this stage.
940        """
941        msg = 'Autotest client terminated unexpectedly: '
942        if utils.ping(self.host.hostname, tries=1, deadline=1) != 0:
943            msg += 'DUT is no longer pingable, it may have rebooted or hung.\n'
944            raise AutotestDeviceNotPingable(msg)
945
946        if old_boot_id:
947            try:
948                new_boot_id = self.host.get_boot_id(timeout=60)
949            except Exception as e:
950                msg += ('DUT is pingable but not SSHable, it most likely'
951                        ' sporadically rebooted during testing. %s\n' % str(e))
952                raise AutotestDeviceNotSSHable(msg)
953            else:
954                if new_boot_id != old_boot_id:
955                    msg += self._diagnose_reboot()
956                    raise AutotestDeviceRebooted(msg)
957
958            msg += ('DUT is pingable, SSHable and did NOT restart '
959                    'un-expectedly. We probably lost connectivity during the '
960                    'test.')
961        else:
962            msg += ('DUT is pingable, could not determine if an un-expected '
963                    'reboot occured during the test.')
964
965        raise AutotestAbort(msg)
966
967
968    def log_unexpected_abort(self, stderr_redirector, old_boot_id=None):
969        """
970        Logs that something unexpected happened, then tries to diagnose the
971        failure. The purpose of this function is only to close out the status
972        log with the appropriate error message, not to critically terminate
973        the program.
974
975        @param stderr_redirector: log stream.
976        @param old_boot_id: boot id used to infer if a reboot occured.
977        """
978        stderr_redirector.flush_all_buffers()
979        try:
980            self._diagnose_dut(old_boot_id)
981        except AutotestFailure as e:
982            self.host.job.record('END ABORT', None, None, str(e))
983
984
985    def _execute_in_background(self, section, timeout):
986        full_cmd = self.get_background_cmd(section)
987        devnull = open(os.devnull, "w")
988
989        self.copy_client_config_file(self.get_client_log())
990
991        self.host.job.push_execution_context(self.results_dir)
992        try:
993            result = self.host.run(full_cmd, ignore_status=True,
994                                   timeout=timeout,
995                                   stdout_tee=devnull,
996                                   stderr_tee=devnull)
997        finally:
998            self.host.job.pop_execution_context()
999
1000        return result
1001
1002
1003    @staticmethod
1004    def _strip_stderr_prologue(stderr, monitor_cmd):
1005        """Strips the 'standard' prologue that get pre-pended to every
1006        remote command and returns the text that was actually written to
1007        stderr by the remote command.
1008
1009        This will always strip atleast the first line ('standard' prologue),
1010        and strip any extra messages prior. The following are common 'extra'
1011        messages which could appear.
1012
1013        1.) Any warnings. For example, on CrOS version R90, any script running
1014            in python2 result in the following warning in the stderr:
1015            "warning: Python 2.7 is deprecated and will be removed from CrOS by
1016            end of 2021. All users must migrate ASAP"
1017        2.) The actual command used to launch autotestd_monitor (monitor_cmd)
1018
1019        Additionally there is a NOTE line that could be present needing also to
1020        be stripped.
1021        """
1022        stderr_lines = stderr.split("\n")
1023        if not stderr_lines:
1024            return ""
1025
1026        # If no warnings/monitor_cmd, strip only the first line
1027        skipn = 1
1028        for i, line in enumerate(stderr_lines):
1029            if monitor_cmd in line:
1030                # add *2* (1 for the index, 1 for the 'standard prolouge'
1031                # which follows this line).
1032                skipn = i + 2
1033                break
1034
1035        stderr_lines = stderr_lines[skipn:]
1036
1037        if stderr_lines[0].startswith("NOTE: autotestd_monitor"):
1038            del stderr_lines[0]
1039        return "\n".join(stderr_lines)
1040
1041
1042    def _execute_daemon(self, section, timeout, stderr_redirector,
1043                        client_disconnect_timeout):
1044        monitor_dir = self.host.get_tmp_dir()
1045        daemon_cmd = self.get_daemon_cmd(section, monitor_dir)
1046
1047        # grab the location for the server-side client log file
1048        client_log_prefix = self.get_client_log()
1049        client_log_path = os.path.join(self.results_dir, 'debug',
1050                                       client_log_prefix + '.log')
1051        client_log = open(client_log_path, 'w', LOG_BUFFER_SIZE_BYTES)
1052        self.copy_client_config_file(client_log_prefix)
1053
1054        stdout_read = stderr_read = 0
1055        self.host.job.push_execution_context(self.results_dir)
1056        try:
1057            self.host.run(daemon_cmd, ignore_status=True, timeout=timeout)
1058            disconnect_warnings = []
1059            while True:
1060                monitor_cmd = self.get_monitor_cmd(monitor_dir, stdout_read,
1061                                                   stderr_read)
1062                try:
1063                    result = self.host.run(monitor_cmd, ignore_status=True,
1064                                           timeout=timeout,
1065                                           stdout_tee=client_log,
1066                                           stderr_tee=stderr_redirector)
1067                except error.AutoservRunError as e:
1068                    result = e.result_obj
1069                    result.exit_status = None
1070                    disconnect_warnings.append(e.description)
1071
1072                    stderr_redirector.log_warning(
1073                        "Autotest client was disconnected: %s" % e.description,
1074                        "NETWORK")
1075                except error.AutoservSSHTimeout:
1076                    result = utils.CmdResult(monitor_cmd, "", "", None, 0)
1077                    stderr_redirector.log_warning(
1078                        "Attempt to connect to Autotest client timed out",
1079                        "NETWORK")
1080
1081                stdout_read += len(result.stdout)
1082                stderr_read += len(
1083                        self._strip_stderr_prologue(result.stderr,
1084                                                    monitor_cmd))
1085
1086                if result.exit_status is not None:
1087                    # TODO (crosbug.com/38224)- sbasi: Remove extra logging.
1088                    logging.debug('Result exit status is %d.',
1089                                  result.exit_status)
1090                    return result
1091                elif not self.host.wait_up(client_disconnect_timeout):
1092                    raise error.AutoservSSHTimeout(
1093                        "client was disconnected, reconnect timed out")
1094        finally:
1095            client_log.close()
1096            self.host.job.pop_execution_context()
1097
1098
1099    def execute_section(self, section, timeout, stderr_redirector,
1100                        client_disconnect_timeout, boot_id=None):
1101        # TODO(crbug.com/684311) The claim is that section is never more than 0
1102        # in pratice. After validating for a week or so, delete all support of
1103        # multiple sections.
1104        metrics.Counter('chromeos/autotest/autotest/sections').increment(
1105                fields={'is_first_section': (section == 0)})
1106        logging.info("Executing %s/bin/autotest %s/control phase %d",
1107                     self.autodir, self.autodir, section)
1108
1109        if self.background:
1110            result = self._execute_in_background(section, timeout)
1111        else:
1112            result = self._execute_daemon(section, timeout, stderr_redirector,
1113                                          client_disconnect_timeout)
1114
1115        last_line = stderr_redirector.last_line
1116
1117        # check if we failed hard enough to warrant an exception
1118        if result.exit_status == 1:
1119            err = error.AutotestRunError("client job was aborted")
1120        elif not self.background and not result.stderr:
1121            err = error.AutotestRunError(
1122                "execute_section %s failed to return anything\n"
1123                "stdout:%s\n" % (section, result.stdout))
1124        else:
1125            err = None
1126
1127        # log something if the client failed AND never finished logging
1128        if err and not self.is_client_job_finished(last_line):
1129            self.log_unexpected_abort(stderr_redirector, old_boot_id=boot_id)
1130
1131        if err:
1132            raise err
1133        else:
1134            return stderr_redirector.last_line
1135
1136
1137    def _wait_for_reboot(self, old_boot_id):
1138        logging.info("Client is rebooting")
1139        logging.info("Waiting for client to halt")
1140        if not self.host.wait_down(self.host.WAIT_DOWN_REBOOT_TIMEOUT,
1141                                   old_boot_id=old_boot_id):
1142            err = "%s failed to shutdown after %d"
1143            err %= (self.host.hostname, self.host.WAIT_DOWN_REBOOT_TIMEOUT)
1144            raise error.AutotestRunError(err)
1145        logging.info("Client down, waiting for restart")
1146        if not self.host.wait_up(self.host.DEFAULT_REBOOT_TIMEOUT):
1147            # since reboot failed
1148            # hardreset the machine once if possible
1149            # before failing this control file
1150            warning = "%s did not come back up, hard resetting"
1151            warning %= self.host.hostname
1152            logging.warning(warning)
1153            try:
1154                self.host.hardreset(wait=False)
1155            except (AttributeError, error.AutoservUnsupportedError):
1156                warning = "Hard reset unsupported on %s"
1157                warning %= self.host.hostname
1158                logging.warning(warning)
1159            raise error.AutotestRunError("%s failed to boot after %ds" %
1160                                         (self.host.hostname,
1161                                          self.host.DEFAULT_REBOOT_TIMEOUT))
1162        self.host.reboot_followup()
1163
1164
1165    def execute_control(self, timeout=None, client_disconnect_timeout=None):
1166        if not self.background:
1167            collector = log_collector(self.host, self.tag, self.results_dir)
1168            hostname = self.host.hostname
1169            remote_results = collector.client_results_dir
1170            local_results = collector.server_results_dir
1171            self.host.job.add_client_log(hostname, remote_results,
1172                                         local_results)
1173            job_record_context = self.host.job.get_record_context()
1174
1175        section = 0
1176        start_time = time.time()
1177
1178        logger = client_logger(self.host, self.tag, self.results_dir)
1179        try:
1180            while not timeout or time.time() < start_time + timeout:
1181                if timeout:
1182                    section_timeout = start_time + timeout - time.time()
1183                else:
1184                    section_timeout = None
1185                boot_id = self.host.get_boot_id()
1186                last = self.execute_section(section, section_timeout,
1187                                            logger, client_disconnect_timeout,
1188                                            boot_id=boot_id)
1189                if self.background:
1190                    return
1191                section += 1
1192                if self.is_client_job_finished(last):
1193                    logging.info("Client complete")
1194                    return
1195                elif self.is_client_job_rebooting(last):
1196                    try:
1197                        self._wait_for_reboot(boot_id)
1198                    except error.AutotestRunError as e:
1199                        self.host.job.record("ABORT", None, "reboot", str(e))
1200                        self.host.job.record("END ABORT", None, None, str(e))
1201                        raise
1202                    continue
1203
1204                # If a test fails without probable cause we try to bucket it's
1205                # failure into one of 2 categories. If we can determine the
1206                # current state of the device and it is suspicious, we close the
1207                # status lines indicating a failure. If we either cannot
1208                # determine the state of the device, or it appears totally
1209                # healthy, we give up and abort.
1210                try:
1211                    self._diagnose_dut(boot_id)
1212                except AutotestDeviceError as e:
1213                    # The status lines of the test are pretty much tailed to
1214                    # our log, with indentation, from the client job on the DUT.
1215                    # So if the DUT goes down unexpectedly we'll end up with a
1216                    # malformed status log unless we manually unwind the status
1217                    # stack. Ideally we would want to write a nice wrapper like
1218                    # server_job methods run_reboot, run_group but they expect
1219                    # reboots and we don't.
1220                    self.host.job.record('FAIL', None, None, str(e))
1221                    self.host.job.record('END FAIL', None, None)
1222                    self.host.job.record('END GOOD', None, None)
1223                    self.host.job.failed_with_device_error = True
1224                    return
1225                except AutotestAbort as e:
1226                    self.host.job.record('ABORT', None, None, str(e))
1227                    self.host.job.record('END ABORT', None, None)
1228
1229                    # give the client machine a chance to recover from a crash
1230                    self.host.wait_up(
1231                        self.host.HOURS_TO_WAIT_FOR_RECOVERY * 3600)
1232                    logging.debug('Unexpected final status message from '
1233                                  'client %s: %s', self.host.hostname, last)
1234                    # The line 'last' may have sensitive phrases, like
1235                    # 'END GOOD', which breaks the tko parser. So the error
1236                    # message will exclude it, since it will be recorded to
1237                    # status.log.
1238                    msg = ("Aborting - unexpected final status message from "
1239                           "client on %s\n") % self.host.hostname
1240                    raise error.AutotestRunError(msg)
1241        finally:
1242            # B/203609358 someting is removing telemetry. Adding this to check the
1243            # status of the folder as late as possible.
1244            logging.debug('Autotest job finishes running. Below is the '
1245                          'post-processing operations.')
1246            logger.close()
1247            if not self.background:
1248                collector.collect_client_job_results()
1249                collector.remove_redundant_client_logs()
1250                state_file = os.path.basename(self.remote_control_file
1251                                              + '.state')
1252                state_path = os.path.join(self.results_dir, state_file)
1253                self.host.job.postprocess_client_state(state_path)
1254                self.host.job.remove_client_log(hostname, remote_results,
1255                                                local_results)
1256                job_record_context.restore()
1257
1258            logging.debug('Autotest job finishes.')
1259
1260        # should only get here if we timed out
1261        assert timeout
1262        raise error.AutotestTimeoutError()
1263
1264
1265class log_collector(object):
1266    def __init__(self, host, client_tag, results_dir):
1267        self.host = host
1268        if not client_tag:
1269            client_tag = "default"
1270        self.client_results_dir = os.path.join(host.get_autodir(), "results",
1271                                               client_tag)
1272        self.server_results_dir = results_dir
1273
1274
1275    def collect_client_job_results(self):
1276        """ A method that collects all the current results of a running
1277        client job into the results dir. By default does nothing as no
1278        client job is running, but when running a client job you can override
1279        this with something that will actually do something. """
1280        # make an effort to wait for the machine to come up
1281        try:
1282            self.host.wait_up(timeout=30)
1283        except error.AutoservError:
1284            # don't worry about any errors, we'll try and
1285            # get the results anyway
1286            pass
1287
1288        # Copy all dirs in default to results_dir
1289        try:
1290            # Build test result directory summary
1291            result_tools_runner.run_on_client(
1292                    self.host, self.client_results_dir)
1293
1294            with metrics.SecondsTimer(
1295                    'chromeos/autotest/job/log_collection_duration',
1296                    fields={'dut_host_name': self.host.hostname}):
1297                self.host.get_file(
1298                        self.client_results_dir + '/',
1299                        self.server_results_dir,
1300                        preserve_symlinks=True)
1301        except Exception:
1302            # well, don't stop running just because we couldn't get logs
1303            e_msg = "Unexpected error copying test result logs, continuing ..."
1304            logging.error(e_msg)
1305            traceback.print_exc(file=sys.stdout)
1306
1307
1308    def remove_redundant_client_logs(self):
1309        """Remove client.*.log files in favour of client.*.DEBUG files."""
1310        debug_dir = os.path.join(self.server_results_dir, 'debug')
1311        debug_files = [f for f in os.listdir(debug_dir)
1312                       if re.search(r'^client\.\d+\.DEBUG$', f)]
1313        for debug_file in debug_files:
1314            log_file = debug_file.replace('DEBUG', 'log')
1315            log_file = os.path.join(debug_dir, log_file)
1316            if os.path.exists(log_file):
1317                os.remove(log_file)
1318
1319
1320# a file-like object for catching stderr from an autotest client and
1321# extracting status logs from it
1322class client_logger(object):
1323    """Partial file object to write to both stdout and
1324    the status log file.  We only implement those methods
1325    utils.run() actually calls.
1326    """
1327    status_parser = re.compile(r"^AUTOTEST_STATUS:([^:]*):(.*)$")
1328    test_complete_parser = re.compile(r"^AUTOTEST_TEST_COMPLETE:(.*)$")
1329    fetch_package_parser = re.compile(
1330        r"^AUTOTEST_FETCH_PACKAGE:([^:]*):([^:]*):(.*)$")
1331    extract_indent = re.compile(r"^(\t*).*$")
1332    extract_timestamp = re.compile(r".*\ttimestamp=(\d+)\t.*$")
1333
1334    def __init__(self, host, tag, server_results_dir):
1335        self.host = host
1336        self.job = host.job
1337        self.log_collector = log_collector(host, tag, server_results_dir)
1338        self.leftover = ""
1339        self.last_line = ""
1340        self.logs = {}
1341
1342
1343    def _process_log_dict(self, log_dict):
1344        log_list = log_dict.pop("logs", [])
1345        for key in sorted(six.iterkeys(log_dict)):
1346            log_list += self._process_log_dict(log_dict.pop(key))
1347        return log_list
1348
1349
1350    def _process_logs(self):
1351        """Go through the accumulated logs in self.log and print them
1352        out to stdout and the status log. Note that this processes
1353        logs in an ordering where:
1354
1355        1) logs to different tags are never interleaved
1356        2) logs to x.y come before logs to x.y.z for all z
1357        3) logs to x.y come before x.z whenever y < z
1358
1359        Note that this will in general not be the same as the
1360        chronological ordering of the logs. However, if a chronological
1361        ordering is desired that one can be reconstructed from the
1362        status log by looking at timestamp lines."""
1363        log_list = self._process_log_dict(self.logs)
1364        for entry in log_list:
1365            self.job.record_entry(entry, log_in_subdir=False)
1366        if log_list:
1367            self.last_line = log_list[-1].render()
1368
1369
1370    def _process_quoted_line(self, tag, line):
1371        """Process a line quoted with an AUTOTEST_STATUS flag. If the
1372        tag is blank then we want to push out all the data we've been
1373        building up in self.logs, and then the newest line. If the
1374        tag is not blank, then push the line into the logs for handling
1375        later."""
1376        entry = base_job.status_log_entry.parse(line)
1377        if entry is None:
1378            return  # the line contains no status lines
1379        if tag == "":
1380            self._process_logs()
1381            self.job.record_entry(entry, log_in_subdir=False)
1382            self.last_line = line
1383        else:
1384            tag_parts = [int(x) for x in tag.split(".")]
1385            log_dict = self.logs
1386            for part in tag_parts:
1387                log_dict = log_dict.setdefault(part, {})
1388            log_list = log_dict.setdefault("logs", [])
1389            log_list.append(entry)
1390
1391
1392    def _process_info_line(self, line):
1393        """Check if line is an INFO line, and if it is, interpret any control
1394        messages (e.g. enabling/disabling warnings) that it may contain."""
1395        match = re.search(r"^\t*INFO\t----\t----(.*)\t[^\t]*$", line)
1396        if not match:
1397            return   # not an INFO line
1398        for field in match.group(1).split('\t'):
1399            if field.startswith("warnings.enable="):
1400                func = self.job.warning_manager.enable_warnings
1401            elif field.startswith("warnings.disable="):
1402                func = self.job.warning_manager.disable_warnings
1403            else:
1404                continue
1405            warning_type = field.split("=", 1)[1]
1406            func(warning_type)
1407
1408
1409    def _process_line(self, line):
1410        """Write out a line of data to the appropriate stream.
1411
1412        Returns the package checksum file if it exists.
1413
1414        Status lines sent by autotest will be prepended with
1415        "AUTOTEST_STATUS", and all other lines are ssh error messages.
1416        """
1417        logging.debug(line)
1418        fetch_package_match = self.fetch_package_parser.search(line)
1419        if fetch_package_match:
1420            pkg_name, dest_path, fifo_path = fetch_package_match.groups()
1421            serve_packages = _CONFIG.get_config_value(
1422                "PACKAGES", "serve_packages_from_autoserv", type=bool)
1423            if serve_packages and pkg_name == 'packages.checksum':
1424                try:
1425                    checksum_file = os.path.join(
1426                        self.job.pkgmgr.pkgmgr_dir, 'packages', pkg_name)
1427                    if os.path.exists(checksum_file):
1428                        self.host.send_file(checksum_file, dest_path)
1429                except error.AutoservRunError:
1430                    msg = "Package checksum file not found, continuing anyway"
1431                    logging.exception(msg)
1432
1433                try:
1434                    # When fetching a package, the client expects to be
1435                    # notified when the fetching is complete. Autotest
1436                    # does this pushing a B to a fifo queue to the client.
1437                    self.host.run("echo B > %s" % fifo_path)
1438                except error.AutoservRunError:
1439                    msg = "Checksum installation failed, continuing anyway"
1440                    logging.exception(msg)
1441                finally:
1442                    return
1443
1444        status_match = self.status_parser.search(line)
1445        test_complete_match = self.test_complete_parser.search(line)
1446        fetch_package_match = self.fetch_package_parser.search(line)
1447        if status_match:
1448            tag, line = status_match.groups()
1449            self._process_info_line(line)
1450            self._process_quoted_line(tag, line)
1451        elif test_complete_match:
1452            self._process_logs()
1453            fifo_path, = test_complete_match.groups()
1454            try:
1455                self.log_collector.collect_client_job_results()
1456                self.host.run("echo A > %s" % fifo_path)
1457            except Exception:
1458                msg = "Post-test log collection failed, continuing anyway"
1459                logging.exception(msg)
1460        elif fetch_package_match:
1461            pkg_name, dest_path, fifo_path = fetch_package_match.groups()
1462            serve_packages = global_config.global_config.get_config_value(
1463                "PACKAGES", "serve_packages_from_autoserv", type=bool)
1464            if serve_packages and pkg_name.endswith(".tar.bz2"):
1465                try:
1466                    self._send_tarball(pkg_name, dest_path)
1467                except Exception:
1468                    msg = "Package tarball creation failed, continuing anyway"
1469                    logging.exception(msg)
1470            try:
1471                self.host.run("echo B > %s" % fifo_path)
1472            except Exception:
1473                msg = "Package tarball installation failed, continuing anyway"
1474                logging.exception(msg)
1475        else:
1476            logging.info(line)
1477
1478
1479    def _send_tarball(self, pkg_name, remote_dest):
1480        """Uses tarballs in package manager by default."""
1481        try:
1482            server_package = os.path.join(self.job.pkgmgr.pkgmgr_dir,
1483                                          'packages', pkg_name)
1484            if os.path.exists(server_package):
1485                self.host.send_file(server_package, remote_dest)
1486                return
1487
1488        except error.AutoservRunError:
1489            msg = ("Package %s could not be sent from the package cache." %
1490                   pkg_name)
1491            logging.exception(msg)
1492
1493        name, pkg_type = self.job.pkgmgr.parse_tarball_name(pkg_name)
1494        src_dirs = []
1495        if pkg_type == 'test':
1496            for test_dir in ['site_tests', 'tests']:
1497                src_dir = os.path.join(self.job.clientdir, test_dir, name)
1498                if os.path.exists(src_dir):
1499                    src_dirs += [src_dir]
1500                    break
1501        elif pkg_type == 'profiler':
1502            src_dirs += [os.path.join(self.job.clientdir, 'profilers', name)]
1503        elif pkg_type == 'dep':
1504            src_dirs += [os.path.join(self.job.clientdir, 'deps', name)]
1505        elif pkg_type == 'client':
1506            return  # you must already have a client to hit this anyway
1507        else:
1508            return  # no other types are supported
1509
1510        # iterate over src_dirs until we find one that exists, then tar it
1511        for src_dir in src_dirs:
1512            if os.path.exists(src_dir):
1513                try:
1514                    logging.info('Bundling %s into %s', src_dir, pkg_name)
1515                    temp_dir = autotemp.tempdir(unique_id='autoserv-packager',
1516                                                dir=self.job.tmpdir)
1517                    tarball_path = self.job.pkgmgr.tar_package(
1518                        pkg_name, src_dir, temp_dir.name, " .")
1519                    self.host.send_file(tarball_path, remote_dest)
1520                finally:
1521                    temp_dir.clean()
1522                return
1523
1524
1525    def log_warning(self, msg, warning_type):
1526        """Injects a WARN message into the current status logging stream."""
1527        timestamp = int(time.time())
1528        if self.job.warning_manager.is_valid(timestamp, warning_type):
1529            self.job.record('WARN', None, None, msg)
1530
1531
1532    def write(self, data):
1533        # now start processing the existing buffer and the new data
1534        data = self.leftover + data
1535        lines = data.split('\n')
1536        processed_lines = 0
1537        try:
1538            # process all the buffered data except the last line
1539            # ignore the last line since we may not have all of it yet
1540            for line in lines[:-1]:
1541                self._process_line(line)
1542                processed_lines += 1
1543        finally:
1544            # save any unprocessed lines for future processing
1545            self.leftover = '\n'.join(lines[processed_lines:])
1546
1547
1548    def flush(self):
1549        sys.stdout.flush()
1550
1551
1552    def flush_all_buffers(self):
1553        if self.leftover:
1554            self._process_line(self.leftover)
1555            self.leftover = ""
1556        self._process_logs()
1557        self.flush()
1558
1559
1560    def close(self):
1561        self.flush_all_buffers()
1562