xref: /aosp_15_r20/external/autotest/server/server_job.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2# pylint: disable-msg=C0111
3
4# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7"""
8The main job wrapper for the server side.
9
10This is the core infrastructure. Derived from the client side job.py
11
12Copyright Martin J. Bligh, Andy Whitcroft 2007
13"""
14
15from __future__ import absolute_import
16from __future__ import division
17from __future__ import print_function
18
19import errno
20import fcntl
21import getpass
22import logging
23import os
24import pickle
25import platform
26import re
27import select
28import shutil
29import sys
30import tempfile
31import time
32import traceback
33import uuid
34import warnings
35
36from datetime import datetime
37
38from autotest_lib.client.bin import sysinfo
39from autotest_lib.client.common_lib import base_job
40from autotest_lib.client.common_lib import control_data
41from autotest_lib.client.common_lib import error
42from autotest_lib.client.common_lib import logging_manager
43from autotest_lib.client.common_lib import packages
44from autotest_lib.client.common_lib import utils
45from autotest_lib.client.common_lib import seven
46from autotest_lib.server import profilers
47from autotest_lib.server import site_gtest_runner
48from autotest_lib.server import subcommand
49from autotest_lib.server import test
50from autotest_lib.server.autotest import OFFLOAD_ENVVAR
51from autotest_lib.server import utils as server_utils
52from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
53from autotest_lib.server import hosts
54from autotest_lib.server.hosts import abstract_ssh
55from autotest_lib.server.hosts import afe_store
56from autotest_lib.server.hosts import file_store
57from autotest_lib.server.hosts import shadowing_store
58from autotest_lib.server.hosts import factory as host_factory
59from autotest_lib.server.hosts import host_info
60from autotest_lib.server.hosts import ssh_multiplex
61from autotest_lib.tko import models as tko_models
62from autotest_lib.tko import parser_lib
63from six.moves import zip
64
65try:
66    from autotest_lib.utils.frozen_chromite.lib import metrics
67except ImportError:
68    metrics = utils.metrics_mock
69
70
71def _control_segment_path(name):
72    """Get the pathname of the named control segment file."""
73    server_dir = os.path.dirname(os.path.abspath(__file__))
74    return os.path.join(server_dir, "control_segments", name)
75
76
77CLIENT_CONTROL_FILENAME = 'control'
78SERVER_CONTROL_FILENAME = 'control.srv'
79MACHINES_FILENAME = '.machines'
80DUT_STATEFUL_PATH = "/usr/local"
81
82CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper')
83CLIENT_TRAMPOLINE_CONTROL_FILE = _control_segment_path('client_trampoline')
84CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps')
85CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo')
86CLEANUP_CONTROL_FILE = _control_segment_path('cleanup')
87VERIFY_CONTROL_FILE = _control_segment_path('verify')
88REPAIR_CONTROL_FILE = _control_segment_path('repair')
89PROVISION_CONTROL_FILE = _control_segment_path('provision')
90VERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url')
91RESET_CONTROL_FILE = _control_segment_path('reset')
92GET_NETWORK_STATS_CONTROL_FILE = _control_segment_path('get_network_stats')
93
94
95def get_machine_dicts(machine_names, store_dir, in_lab, use_shadow_store,
96                      host_attributes=None):
97    """Converts a list of machine names to list of dicts.
98
99    TODO(crbug.com/678430): This function temporarily has a side effect of
100    creating files under workdir for backing a FileStore. This side-effect will
101    go away once callers of autoserv start passing in the FileStore.
102
103    @param machine_names: A list of machine names.
104    @param store_dir: A directory to contain store backing files.
105    @param use_shadow_store: If True, we should create a ShadowingStore where
106            actual store is backed by the AFE but we create a backing file to
107            shadow the store. If False, backing file should already exist at:
108            ${store_dir}/${hostname}.store
109    @param in_lab: A boolean indicating whether we're running in lab.
110    @param host_attributes: Optional list of host attributes to add for each
111            host.
112    @returns: A list of dicts. Each dict has the following keys:
113            'hostname': Name of the machine originally in machine_names (str).
114            'afe_host': A frontend.Host object for the machine, or a stub if
115                    in_lab is false.
116            'host_info_store': A host_info.CachingHostInfoStore object to obtain
117                    host information. A stub if in_lab is False.
118            'connection_pool': ssh_multiplex.ConnectionPool instance to share
119                    ssh connection across control scripts. This is set to
120                    None, and should be overridden for connection sharing.
121    """
122    # See autoserv_parser.parse_args. Only one of in_lab or host_attributes can
123    # be provided.
124    if in_lab and host_attributes:
125        raise error.AutoservError(
126                'in_lab and host_attribute are mutually exclusive.')
127
128    machine_dict_list = []
129    for machine in machine_names:
130        if not in_lab:
131            afe_host = server_utils.EmptyAFEHost()
132            host_info_store = _create_file_backed_host_info_store(
133                    store_dir, machine)
134            if host_attributes:
135                afe_host.attributes.update(host_attributes)
136                info = host_info.HostInfo(labels=host_info_store.get().labels,
137                                          attributes=host_attributes)
138                host_info_store.commit(info)
139        elif use_shadow_store:
140            afe_host = _create_afe_host(machine)
141            host_info_store = _create_afe_backed_host_info_store(store_dir,
142                                                                 machine)
143        else:
144            afe_host = server_utils.EmptyAFEHost()
145            host_info_store = _create_file_backed_host_info_store(store_dir,
146                                                                  machine)
147
148        machine_dict_list.append({
149                'hostname' : machine,
150                'afe_host' : afe_host,
151                'host_info_store': host_info_store,
152                'connection_pool': None,
153        })
154
155    return machine_dict_list
156
157
158class status_indenter(base_job.status_indenter):
159    """Provide a simple integer-backed status indenter."""
160    def __init__(self):
161        self._indent = 0
162
163
164    @property
165    def indent(self):
166        return self._indent
167
168
169    def increment(self):
170        self._indent += 1
171
172
173    def decrement(self):
174        self._indent -= 1
175
176
177    def get_context(self):
178        """Returns a context object for use by job.get_record_context."""
179        class context(object):
180            def __init__(self, indenter, indent):
181                self._indenter = indenter
182                self._indent = indent
183            def restore(self):
184                self._indenter._indent = self._indent
185        return context(self, self._indent)
186
187
188class server_job_record_hook(object):
189    """The job.record hook for server job. Used to inject WARN messages from
190    the console or vlm whenever new logs are written, and to echo any logs
191    to INFO level logging. Implemented as a class so that it can use state to
192    block recursive calls, so that the hook can call job.record itself to
193    log WARN messages.
194
195    Depends on job._read_warnings and job._logger.
196    """
197    def __init__(self, job):
198        self._job = job
199        self._being_called = False
200
201
202    def __call__(self, entry):
203        """A wrapper around the 'real' record hook, the _hook method, which
204        prevents recursion. This isn't making any effort to be threadsafe,
205        the intent is to outright block infinite recursion via a
206        job.record->_hook->job.record->_hook->job.record... chain."""
207        if self._being_called:
208            return
209        self._being_called = True
210        try:
211            self._hook(self._job, entry)
212        finally:
213            self._being_called = False
214
215
216    @staticmethod
217    def _hook(job, entry):
218        """The core hook, which can safely call job.record."""
219        entries = []
220        # poll all our warning loggers for new warnings
221        for timestamp, msg in job._read_warnings():
222            warning_entry = base_job.status_log_entry(
223                'WARN', None, None, msg, {}, timestamp=timestamp)
224            entries.append(warning_entry)
225            job.record_entry(warning_entry)
226        # echo rendered versions of all the status logs to info
227        entries.append(entry)
228        for entry in entries:
229            rendered_entry = job._logger.render_entry(entry)
230            logging.info(rendered_entry)
231
232
233class server_job(base_job.base_job):
234    """The server-side concrete implementation of base_job.
235
236    Optional properties provided by this implementation:
237        serverdir
238
239        warning_manager
240        warning_loggers
241    """
242
243    _STATUS_VERSION = 1
244
245    # TODO crbug.com/285395 eliminate ssh_verbosity_flag
246    def __init__(self,
247                 control,
248                 args,
249                 resultdir,
250                 label,
251                 user,
252                 machines,
253                 machine_dict_list,
254                 client=False,
255                 ssh_user=host_factory.DEFAULT_SSH_USER,
256                 ssh_port=host_factory.DEFAULT_SSH_PORT,
257                 ssh_pass=host_factory.DEFAULT_SSH_PASS,
258                 ssh_verbosity_flag=host_factory.DEFAULT_SSH_VERBOSITY,
259                 ssh_options=host_factory.DEFAULT_SSH_OPTIONS,
260                 group_name='',
261                 tag='',
262                 disable_sysinfo=False,
263                 control_filename=SERVER_CONTROL_FILENAME,
264                 parent_job_id=None,
265                 in_lab=False,
266                 use_client_trampoline=False,
267                 sync_offload_dir='',
268                 companion_hosts=None,
269                 dut_servers=None,
270                 is_cft=False,
271                 force_full_log_collection=False):
272        """
273        Create a server side job object.
274
275        @param control: The pathname of the control file.
276        @param args: Passed to the control file.
277        @param resultdir: Where to throw the results.
278        @param label: Description of the job.
279        @param user: Username for the job (email address).
280        @param machines: A list of hostnames of the machines to use for the job.
281        @param machine_dict_list: A list of dicts for each of the machines above
282                as returned by get_machine_dicts.
283        @param client: True if this is a client-side control file.
284        @param ssh_user: The SSH username.  [root]
285        @param ssh_port: The SSH port number.  [22]
286        @param ssh_pass: The SSH passphrase, if needed.
287        @param ssh_verbosity_flag: The SSH verbosity flag, '-v', '-vv',
288                '-vvv', or an empty string if not needed.
289        @param ssh_options: A string giving additional options that will be
290                included in ssh commands.
291        @param group_name: If supplied, this will be written out as
292                host_group_name in the keyvals file for the parser.
293        @param tag: The job execution tag from the scheduler.  [optional]
294        @param disable_sysinfo: Whether we should disable the sysinfo step of
295                tests for a modest shortening of test time.  [optional]
296        @param control_filename: The filename where the server control file
297                should be written in the results directory.
298        @param parent_job_id: Job ID of the parent job. Default to None if the
299                job does not have a parent job.
300        @param in_lab: Boolean that indicates if this is running in the lab
301                environment.
302        @param use_client_trampoline: Boolean that indicates whether to
303                use the client trampoline flow.  If this is True, control
304                is interpreted as the name of the client test to run.
305                The client control file will be client_trampoline.  The
306                test name will be passed to client_trampoline, which will
307                install the test package and re-exec the actual test
308                control file.
309        @param sync_offload_dir: String; relative path to synchronous offload
310                dir, relative to the results directory. Ignored if empty.
311        @param companion_hosts: a str or list of hosts to be used as companions
312                for the and provided to test. NOTE: these are different than
313                machines, where each host is a host that the test would be run
314                on.
315        @param dut_servers: a str or list of hosts to be used as DUT servers
316                provided to test.
317        @param force_full_log_collection: bool; force full log collection even
318                when test passes.
319        """
320        super(server_job, self).__init__(resultdir=resultdir)
321        self.control = control
322        self._uncollected_log_file = os.path.join(self.resultdir,
323                                                  'uncollected_logs')
324        debugdir = os.path.join(self.resultdir, 'debug')
325        if not os.path.exists(debugdir):
326            os.mkdir(debugdir)
327
328        if user:
329            self.user = user
330        else:
331            self.user = getpass.getuser()
332
333        self.args = args
334        self.label = label
335        self.machines = machines
336        self._client = client
337        self.warning_loggers = set()
338        self.warning_manager = warning_manager()
339        self._ssh_user = ssh_user
340        self._ssh_port = ssh_port
341        self._ssh_pass = ssh_pass
342        self._ssh_verbosity_flag = ssh_verbosity_flag
343        self._ssh_options = ssh_options
344        self.tag = tag
345        self.hosts = set()
346        self.drop_caches = False
347        self.drop_caches_between_iterations = False
348        self._control_filename = control_filename
349        self._disable_sysinfo = disable_sysinfo
350        self._use_client_trampoline = use_client_trampoline
351        self._companion_hosts = companion_hosts
352        self._dut_servers = dut_servers
353        self._is_cft = is_cft
354        self.force_full_log_collection = force_full_log_collection
355
356        # Parse the release number from the label to setup sysinfo.
357        version = re.findall('release/R(\d+)-', label)
358        if version:
359            version = int(version[0])
360
361        self.logging = logging_manager.get_logging_manager(
362                manage_stdout_and_stderr=True, redirect_fds=True)
363        subcommand.logging_manager_object = self.logging
364
365        self.sysinfo = sysinfo.sysinfo(self.resultdir, version=version)
366        self.profilers = profilers.profilers(self)
367        self._sync_offload_dir = sync_offload_dir
368
369        job_data = {
370                'user': user,
371                'hostname': ','.join(machines),
372                'drone': platform.node(),
373                'status_version': str(self._STATUS_VERSION),
374                'job_started': str(int(time.time()))
375        }
376
377        # Adhoc/<testname> is the default label, and should not be written,
378        # as this can cause conflicts with results uploading in CFT.
379        # However, some pipelines (such as PVS) do need `label` within the
380        # keyval, which can now by done with the `-l` flag in test_that.
381        if 'adhoc' not in label:
382            job_data['label'] = label
383        # Save parent job id to keyvals, so parser can retrieve the info and
384        # write to tko_jobs record.
385        if parent_job_id:
386            job_data['parent_job_id'] = parent_job_id
387        if group_name:
388            job_data['host_group_name'] = group_name
389
390        # only write these keyvals out on the first job in a resultdir
391        if 'job_started' not in utils.read_keyval(self.resultdir):
392            job_data.update(self._get_job_data())
393            utils.write_keyval(self.resultdir, job_data)
394
395        self.pkgmgr = packages.PackageManager(
396            self.autodir, run_function_dargs={'timeout':600})
397
398        self._register_subcommand_hooks()
399
400        # We no longer parse results as part of the server_job. These arguments
401        # can't be dropped yet because clients haven't all be cleaned up yet.
402        self.num_tests_run = -1
403        self.num_tests_failed = -1
404
405        # set up the status logger
406        self._indenter = status_indenter()
407        self._logger = base_job.status_logger(
408            self, self._indenter, 'status.log', 'status.log',
409            record_hook=server_job_record_hook(self))
410
411        # Initialize a flag to indicate DUT failure during the test, e.g.,
412        # unexpected reboot.
413        self.failed_with_device_error = False
414
415        self._connection_pool = ssh_multiplex.ConnectionPool()
416
417        # List of functions to run after the main job function.
418        self._post_run_hooks = []
419
420        self.parent_job_id = parent_job_id
421        self.in_lab = in_lab
422        self.machine_dict_list = machine_dict_list
423        for machine_dict in self.machine_dict_list:
424            machine_dict['connection_pool'] = self._connection_pool
425
426        # TODO(jrbarnette) The harness attribute is only relevant to
427        # client jobs, but it's required to be present, or we will fail
428        # server job unit tests.  Yes, really.
429        #
430        # TODO(jrbarnette) The utility of the 'harness' attribute even
431        # to client jobs is suspect.  Probably, we should remove it.
432        self.harness = None
433
434        # TODO(ayatane): fast and max_result_size_KB are not set for
435        # client_trampoline jobs.
436        if control and not use_client_trampoline:
437            parsed_control = control_data.parse_control(
438                    control, raise_warnings=False)
439            self.fast = parsed_control.fast
440            self.max_result_size_KB = parsed_control.max_result_size_KB
441            # wrap this in a try to prevent client/SSP issues. Note: if the
442            # except is hit, the timeout will be ignored.
443            try:
444                self.extended_timeout = parsed_control.extended_timeout
445            except AttributeError:
446                self.extended_timeout = None
447        else:
448            self.fast = False
449            # Set the maximum result size to be the default specified in
450            # global config, if the job has no control file associated.
451            self.max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB
452
453
454    @classmethod
455    def _find_base_directories(cls):
456        """
457        Determine locations of autodir, clientdir and serverdir. Assumes
458        that this file is located within serverdir and uses __file__ along
459        with relative paths to resolve the location.
460        """
461        serverdir = os.path.abspath(os.path.dirname(__file__))
462        autodir = os.path.normpath(os.path.join(serverdir, '..'))
463        clientdir = os.path.join(autodir, 'client')
464        return autodir, clientdir, serverdir
465
466
467    def _find_resultdir(self, resultdir, *args, **dargs):
468        """
469        Determine the location of resultdir. For server jobs we expect one to
470        always be explicitly passed in to __init__, so just return that.
471        """
472        if resultdir:
473            return os.path.normpath(resultdir)
474        else:
475            return None
476
477
478    def _get_status_logger(self):
479        """Return a reference to the status logger."""
480        return self._logger
481
482
483    @staticmethod
484    def _load_control_file(path):
485        f = open(path)
486        try:
487            control_file = f.read()
488        finally:
489            f.close()
490        return re.sub('\r', '', control_file)
491
492
493    def _register_subcommand_hooks(self):
494        """
495        Register some hooks into the subcommand modules that allow us
496        to properly clean up self.hosts created in forked subprocesses.
497        """
498        def on_fork(cmd):
499            self._existing_hosts_on_fork = set(self.hosts)
500        def on_join(cmd):
501            new_hosts = self.hosts - self._existing_hosts_on_fork
502            for host in new_hosts:
503                host.close()
504        subcommand.subcommand.register_fork_hook(on_fork)
505        subcommand.subcommand.register_join_hook(on_join)
506
507
508    # TODO crbug.com/285395 add a kwargs parameter.
509    def _make_namespace(self):
510        """Create a namespace dictionary to be passed along to control file.
511
512        Creates a namespace argument populated with standard values:
513        machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag,
514        and ssh_options.
515        """
516        namespace = {'machines' : self.machine_dict_list,
517                     'job' : self,
518                     'ssh_user' : self._ssh_user,
519                     'ssh_port' : self._ssh_port,
520                     'ssh_pass' : self._ssh_pass,
521                     'ssh_verbosity_flag' : self._ssh_verbosity_flag,
522                     'ssh_options' : self._ssh_options}
523        return namespace
524
525
526    def cleanup(self, labels):
527        """Cleanup machines.
528
529        @param labels: Comma separated job labels, will be used to
530                       determine special task actions.
531        """
532        if not self.machines:
533            raise error.AutoservError('No machines specified to cleanup')
534        if self.resultdir:
535            os.chdir(self.resultdir)
536
537        namespace = self._make_namespace()
538        namespace.update({'job_labels': labels, 'args': ''})
539        self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False)
540
541
542    def verify(self, labels):
543        """Verify machines are all ssh-able.
544
545        @param labels: Comma separated job labels, will be used to
546                       determine special task actions.
547        """
548        if not self.machines:
549            raise error.AutoservError('No machines specified to verify')
550        if self.resultdir:
551            os.chdir(self.resultdir)
552
553        namespace = self._make_namespace()
554        namespace.update({'job_labels': labels, 'args': ''})
555        self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)
556
557
558    def reset(self, labels):
559        """Reset machines by first cleanup then verify each machine.
560
561        @param labels: Comma separated job labels, will be used to
562                       determine special task actions.
563        """
564        if not self.machines:
565            raise error.AutoservError('No machines specified to reset.')
566        if self.resultdir:
567            os.chdir(self.resultdir)
568
569        namespace = self._make_namespace()
570        namespace.update({'job_labels': labels, 'args': ''})
571        self._execute_code(RESET_CONTROL_FILE, namespace, protect=False)
572
573
574    def repair(self, labels):
575        """Repair machines.
576
577        @param labels: Comma separated job labels, will be used to
578                       determine special task actions.
579        """
580        if not self.machines:
581            raise error.AutoservError('No machines specified to repair')
582        if self.resultdir:
583            os.chdir(self.resultdir)
584
585        namespace = self._make_namespace()
586        namespace.update({'job_labels': labels, 'args': ''})
587        self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)
588
589
590    def provision(self, labels):
591        """
592        Provision all hosts to match |labels|.
593
594        @param labels: A comma seperated string of labels to provision the
595                       host to.
596
597        """
598        control = self._load_control_file(PROVISION_CONTROL_FILE)
599        self.run(control=control, job_labels=labels)
600
601
602    def precheck(self):
603        """
604        perform any additional checks in derived classes.
605        """
606        pass
607
608
609    def enable_external_logging(self):
610        """
611        Start or restart external logging mechanism.
612        """
613        pass
614
615
616    def disable_external_logging(self):
617        """
618        Pause or stop external logging mechanism.
619        """
620        pass
621
622
623    def use_external_logging(self):
624        """
625        Return True if external logging should be used.
626        """
627        return False
628
629
630    def _make_parallel_wrapper(self, function, machines, log):
631        """Wrap function as appropriate for calling by parallel_simple."""
632        # machines could be a list of dictionaries, e.g.,
633        # [{'host_attributes': {}, 'hostname': '100.96.51.226'}]
634        # The dictionary is generated in server_job.__init__, refer to
635        # variable machine_dict_list, then passed in with namespace, see method
636        # server_job._make_namespace.
637        # To compare the machinese to self.machines, which is a list of machine
638        # hostname, we need to convert machines back to a list of hostnames.
639        if (machines and isinstance(machines, list)
640            and isinstance(machines[0], dict)):
641            machines = [m['hostname'] for m in machines]
642        if len(machines) > 1 and log:
643            def wrapper(machine):
644                hostname = server_utils.get_hostname_from_machine(machine)
645                self.push_execution_context(hostname)
646                os.chdir(self.resultdir)
647                machine_data = {'hostname' : hostname,
648                                'status_version' : str(self._STATUS_VERSION)}
649                utils.write_keyval(self.resultdir, machine_data)
650                result = function(machine)
651                return result
652        else:
653            wrapper = function
654        return wrapper
655
656
657    def parallel_simple(self, function, machines, log=True, timeout=None,
658                        return_results=False):
659        """
660        Run 'function' using parallel_simple, with an extra wrapper to handle
661        the necessary setup for continuous parsing, if possible. If continuous
662        parsing is already properly initialized then this should just work.
663
664        @param function: A callable to run in parallel given each machine.
665        @param machines: A list of machine names to be passed one per subcommand
666                invocation of function.
667        @param log: If True, output will be written to output in a subdirectory
668                named after each machine.
669        @param timeout: Seconds after which the function call should timeout.
670        @param return_results: If True instead of an AutoServError being raised
671                on any error a list of the results|exceptions from the function
672                called on each arg is returned.  [default: False]
673
674        @raises error.AutotestError: If any of the functions failed.
675        """
676        wrapper = self._make_parallel_wrapper(function, machines, log)
677        return subcommand.parallel_simple(
678                wrapper, machines,
679                subdir_name_constructor=server_utils.get_hostname_from_machine,
680                log=log, timeout=timeout, return_results=return_results)
681
682
683    def parallel_on_machines(self, function, machines, timeout=None):
684        """
685        @param function: Called in parallel with one machine as its argument.
686        @param machines: A list of machines to call function(machine) on.
687        @param timeout: Seconds after which the function call should timeout.
688
689        @returns A list of machines on which function(machine) returned
690                without raising an exception.
691        """
692        results = self.parallel_simple(function, machines, timeout=timeout,
693                                       return_results=True)
694        success_machines = []
695        for result, machine in zip(results, machines):
696            if not isinstance(result, Exception):
697                success_machines.append(machine)
698        return success_machines
699
700
701    def record_skipped_test(self, skipped_test, message=None):
702        """Insert a failure record into status.log for this test."""
703        msg = message
704        if msg is None:
705            msg = 'No valid machines found for test %s.' % skipped_test
706        logging.info(msg)
707        self.record('START', None, skipped_test.test_name)
708        self.record('INFO', None, skipped_test.test_name, msg)
709        self.record('END TEST_NA', None, skipped_test.test_name, msg)
710
711
712    def _has_failed_tests(self):
713        """Parse status log for failed tests.
714
715        This checks the current working directory and is intended only for use
716        by the run() method.
717
718        @return boolean
719        """
720        path = os.getcwd()
721
722        # TODO(ayatane): Copied from tko/parse.py.  Needs extensive refactor to
723        # make code reuse plausible.
724        job_keyval = tko_models.job.read_keyval(path)
725        status_version = job_keyval.get("status_version", 0)
726
727        # parse out the job
728        parser = parser_lib.parser(status_version)
729        job = parser.make_job(path)
730        status_log = os.path.join(path, "status.log")
731        if not os.path.exists(status_log):
732            status_log = os.path.join(path, "status")
733        if not os.path.exists(status_log):
734            logging.warning("! Unable to parse job, no status file")
735            return True
736
737        # parse the status logs
738        status_lines = open(status_log).readlines()
739        parser.start(job)
740        tests = parser.end(status_lines)
741
742        # parser.end can return the same object multiple times, so filter out
743        # dups
744        job.tests = []
745        already_added = set()
746        for test in tests:
747            if test not in already_added:
748                already_added.add(test)
749                job.tests.append(test)
750
751        failed = False
752        for test in job.tests:
753            # The current job is still running and shouldn't count as failed.
754            # The parser will fail to parse the exit status of the job since it
755            # hasn't exited yet (this running right now is the job).
756            failed = failed or (test.status != 'GOOD'
757                                and not _is_current_server_job(test))
758        return failed
759
760
761    def _collect_crashes(self, namespace, collect_crashinfo):
762        """Collect crashes.
763
764        @param namespace: namespace dict.
765        @param collect_crashinfo: whether to collect crashinfo in addition to
766                dumps
767        """
768        if collect_crashinfo:
769            # includes crashdumps
770            crash_control_file = CRASHINFO_CONTROL_FILE
771        else:
772            crash_control_file = CRASHDUMPS_CONTROL_FILE
773        self._execute_code(crash_control_file, namespace)
774
775
776    _USE_TEMP_DIR = object()
777    def run(self, collect_crashdumps=True, namespace={}, control=None,
778            control_file_dir=None, verify_job_repo_url=False,
779            only_collect_crashinfo=False, skip_crash_collection=False,
780            job_labels='', use_packaging=True):
781        # for a normal job, make sure the uncollected logs file exists
782        # for a crashinfo-only run it should already exist, bail out otherwise
783        created_uncollected_logs = False
784        logging.info("I am PID %s", os.getpid())
785        if self.resultdir and not os.path.exists(self._uncollected_log_file):
786            if only_collect_crashinfo:
787                # if this is a crashinfo-only run, and there were no existing
788                # uncollected logs, just bail out early
789                logging.info("No existing uncollected logs, "
790                             "skipping crashinfo collection")
791                return
792            else:
793                log_file = open(self._uncollected_log_file, "wb")
794                pickle.dump([], log_file)
795                log_file.close()
796                created_uncollected_logs = True
797
798        # use a copy so changes don't affect the original dictionary
799        namespace = namespace.copy()
800        machines = self.machines
801        if control is None:
802            if self.control is None:
803                control = ''
804            elif self._use_client_trampoline:
805                # Some tests must be loaded and staged before they can be run,
806                # see crbug.com/883403#c42 and #c46 for details.
807                control = self._load_control_file(
808                        CLIENT_TRAMPOLINE_CONTROL_FILE)
809                # repr of a string is safe for eval.
810                control = (('trampoline_testname = %r\n' % str(self.control))
811                           + control)
812            else:
813                control = self._load_control_file(self.control)
814        if control_file_dir is None:
815            control_file_dir = self.resultdir
816
817        self.aborted = False
818        namespace.update(self._make_namespace())
819        namespace.update({
820                'args': self.args,
821                'job_labels': job_labels,
822                'gtest_runner': site_gtest_runner.gtest_runner(),
823        })
824        test_start_time = int(time.time())
825
826        if self.resultdir:
827            os.chdir(self.resultdir)
828            # touch status.log so that the parser knows a job is running here
829            open(self.get_status_log_path(), 'a').close()
830            self.enable_external_logging()
831
832        collect_crashinfo = True
833        temp_control_file_dir = None
834        try:
835            try:
836                if not self.fast:
837                    with metrics.SecondsTimer(
838                            'chromeos/autotest/job/get_network_stats',
839                            fields = {'stage': 'start'}):
840                        namespace['network_stats_label'] = 'at-start'
841                        self._execute_code(GET_NETWORK_STATS_CONTROL_FILE,
842                                           namespace)
843
844                if only_collect_crashinfo:
845                    return
846
847                # If the verify_job_repo_url option is set but we're unable
848                # to actually verify that the job_repo_url contains the autotest
849                # package, this job will fail.
850                if verify_job_repo_url:
851                    self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE,
852                                       namespace)
853                else:
854                    logging.warning('Not checking if job_repo_url contains '
855                                    'autotest packages on %s', machines)
856
857                # determine the dir to write the control files to
858                cfd_specified = (control_file_dir
859                                 and control_file_dir is not self._USE_TEMP_DIR)
860                if cfd_specified:
861                    temp_control_file_dir = None
862                else:
863                    temp_control_file_dir = tempfile.mkdtemp(
864                        suffix='temp_control_file_dir')
865                    control_file_dir = temp_control_file_dir
866                server_control_file = os.path.join(control_file_dir,
867                                                   self._control_filename)
868                client_control_file = os.path.join(control_file_dir,
869                                                   CLIENT_CONTROL_FILENAME)
870                if self._client:
871                    namespace['control'] = control
872                    utils.open_write_close(client_control_file, control)
873                    shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,
874                                    server_control_file)
875                else:
876                    utils.open_write_close(server_control_file, control)
877
878                sync_dir = self._offload_dir_target_path()
879                if self._sync_offload_dir:
880                    logging.info("Preparing synchronous offload dir")
881                    self.create_marker_file()
882                    logging.info("Offload dir and marker file ready")
883                    logging.debug("Results dir is %s", self.resultdir)
884                    logging.debug("Synchronous offload dir is %s", sync_dir)
885                logging.info("Processing control file")
886                if self._companion_hosts:
887                    namespace['companion_hosts'] = self._companion_hosts
888                if self._dut_servers:
889                    namespace['dut_servers'] = self._dut_servers
890                namespace['use_packaging'] = use_packaging
891                namespace['synchronous_offload_dir'] = sync_dir
892                namespace['extended_timeout'] = self.extended_timeout
893                namespace['is_cft'] = self._is_cft
894                os.environ[OFFLOAD_ENVVAR] = sync_dir
895                self._execute_code(server_control_file, namespace)
896                logging.info("Finished processing control file")
897                self._maybe_retrieve_client_offload_dirs()
898                # If no device error occurred, no need to collect crashinfo.
899                collect_crashinfo = self.failed_with_device_error
900            except Exception as e:
901                try:
902                    logging.exception(
903                            'Exception escaped control file, job aborting:')
904                    reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX,
905                                    ' ', str(e))
906                    self.record('INFO', None, None, str(e),
907                                {'job_abort_reason': reason})
908                except:
909                    pass # don't let logging exceptions here interfere
910                raise
911        finally:
912            if temp_control_file_dir:
913                # Clean up temp directory used for copies of the control files
914                try:
915                    shutil.rmtree(temp_control_file_dir)
916                except Exception as e:
917                    logging.warning('Could not remove temp directory %s: %s',
918                                 temp_control_file_dir, e)
919
920            if machines and (collect_crashdumps or collect_crashinfo):
921                if skip_crash_collection or self.fast:
922                    logging.info('Skipping crash dump/info collection '
923                                 'as requested.')
924                else:
925                    with metrics.SecondsTimer(
926                            'chromeos/autotest/job/collect_crashinfo'):
927                        namespace['test_start_time'] = test_start_time
928                        # Remove crash files for passing tests.
929                        # TODO(ayatane): Tests that create crash files should be
930                        # reported.
931                        namespace['has_failed_tests'] = self._has_failed_tests()
932                        self._collect_crashes(namespace, collect_crashinfo)
933            self.disable_external_logging()
934            if self._uncollected_log_file and created_uncollected_logs:
935                os.remove(self._uncollected_log_file)
936
937            if not self.fast:
938                with metrics.SecondsTimer(
939                        'chromeos/autotest/job/get_network_stats',
940                        fields = {'stage': 'end'}):
941                    namespace['network_stats_label'] = 'at-end'
942                    self._execute_code(GET_NETWORK_STATS_CONTROL_FILE,
943                                       namespace)
944
945    def _server_offload_dir_path(self):
946        return os.path.join(self.resultdir, self._sync_offload_dir)
947
948    def _offload_dir_target_path(self):
949        if not self._sync_offload_dir:
950            return ''
951        if self._client:
952            return os.path.join(DUT_STATEFUL_PATH, self._sync_offload_dir)
953        return os.path.join(self.resultdir, self._sync_offload_dir)
954
955    def _maybe_retrieve_client_offload_dirs(self):
956        if not(self._sync_offload_dir and self._client):
957            logging.info("No client dir to retrieve.")
958            return ''
959        logging.info("Retrieving synchronous offload dir from client")
960        server_path = self._server_offload_dir_path()
961        client_path = self._offload_dir_target_path()
962        def serial(machine):
963            host = hosts.create_host(machine)
964            server_subpath = os.path.join(server_path, host.hostname)
965            # Empty final piece ensures that get_file gets a trailing slash,
966            #  which makes it copy dir contents rather than the dir itself.
967            client_subpath = os.path.join(client_path, '')
968            logging.debug("Client dir to retrieve is %s", client_subpath)
969            os.makedirs(server_subpath)
970            host.get_file(client_subpath, server_subpath)
971        self.parallel_simple(serial, self.machines)
972        logging.debug("Synchronous offload dir retrieved to %s", server_path)
973        return server_path
974
975    def _create_client_offload_dirs(self):
976        marker_string = "client %s%s" % (
977            "in SSP " if utils.is_in_container() else "",
978            str(datetime.utcnow())
979        )
980        offload_path = self._offload_dir_target_path()
981        _, file_path = tempfile.mkstemp()
982        def serial(machine):
983            host = hosts.create_host(machine)
984            marker_path = os.path.join(offload_path, "sync_offloads_marker")
985            host.run(("mkdir -p %s" % offload_path), ignore_status=False)
986            host.send_file(file_path, marker_path)
987
988        try:
989            utils.open_write_close(file_path, marker_string)
990            self.parallel_simple(serial, self.machines)
991        finally:
992            os.remove(file_path)
993
994
995    def create_marker_file(self):
996        """Create a marker file in the leaf task's synchronous offload dir.
997
998        This ensures that we will get some results offloaded if the test fails
999        to create output properly, distinguishing offload errs from test errs.
1000        @obj_param _client: Boolean, whether the control file is client-side.
1001        @obj_param _sync_offload_dir: rel. path from results dir to offload dir.
1002
1003        @returns: path to offload dir on the machine of the leaf task
1004        """
1005        # Make the server-side directory regardless
1006        try:
1007            # 2.7 makedirs doesn't have an option for pre-existing directories
1008            os.makedirs(self._server_offload_dir_path())
1009        except OSError as e:
1010            if e.errno != errno.EEXIST:
1011                raise
1012        if not self._client:
1013            offload_path = self._offload_dir_target_path()
1014            marker_string = "server %s%s" % (
1015                "in SSP " if utils.is_in_container() else "",
1016                str(datetime.utcnow())
1017            )
1018            utils.open_write_close(
1019                os.path.join(offload_path, "sync_offloads_marker"),
1020                marker_string
1021            )
1022            return offload_path
1023        return self._create_client_offload_dirs()
1024
1025    def run_test(self, url, *args, **dargs):
1026        """
1027        Summon a test object and run it.
1028
1029        tag
1030                tag to add to testname
1031        url
1032                url of the test to run
1033        """
1034        if self._disable_sysinfo:
1035            dargs['disable_sysinfo'] = True
1036
1037        group, testname = self.pkgmgr.get_package_name(url, 'test')
1038        testname, subdir, tag = self._build_tagged_test_name(testname, dargs)
1039        outputdir = self._make_test_outputdir(subdir)
1040
1041        def group_func():
1042            try:
1043                test.runtest(self, url, tag, args, dargs)
1044            except error.TestBaseException as e:
1045                self.record(e.exit_status, subdir, testname, str(e))
1046                raise
1047            except Exception as e:
1048                info = str(e) + "\n" + traceback.format_exc()
1049                self.record('FAIL', subdir, testname, info)
1050                raise
1051            else:
1052                self.record('GOOD', subdir, testname, 'completed successfully')
1053
1054        try:
1055            result = self._run_group(testname, subdir, group_func)
1056        except error.TestBaseException as e:
1057            return False
1058        else:
1059            return True
1060
1061
1062    def _run_group(self, name, subdir, function, *args, **dargs):
1063        """Underlying method for running something inside of a group."""
1064        result, exc_info = None, None
1065        try:
1066            self.record('START', subdir, name)
1067            result = function(*args, **dargs)
1068        except error.TestBaseException as e:
1069            self.record("END %s" % e.exit_status, subdir, name)
1070            raise
1071        except Exception as e:
1072            err_msg = str(e) + '\n'
1073            err_msg += traceback.format_exc()
1074            self.record('END ABORT', subdir, name, err_msg)
1075            raise error.JobError(name + ' failed\n' + traceback.format_exc())
1076        else:
1077            self.record('END GOOD', subdir, name)
1078        finally:
1079            for hook in self._post_run_hooks:
1080                hook()
1081
1082        return result
1083
1084
1085    def run_group(self, function, *args, **dargs):
1086        """\
1087        @param function: subroutine to run
1088        @returns: (result, exc_info). When the call succeeds, result contains
1089                the return value of |function| and exc_info is None. If
1090                |function| raises an exception, exc_info contains the tuple
1091                returned by sys.exc_info(), and result is None.
1092        """
1093
1094        name = function.__name__
1095        # Allow the tag for the group to be specified.
1096        tag = dargs.pop('tag', None)
1097        if tag:
1098            name = tag
1099
1100        try:
1101            result = self._run_group(name, None, function, *args, **dargs)[0]
1102        except error.TestBaseException:
1103            return None, sys.exc_info()
1104        return result, None
1105
1106
1107    def run_op(self, op, op_func, get_kernel_func):
1108        """\
1109        A specialization of run_group meant specifically for handling
1110        management operation. Includes support for capturing the kernel version
1111        after the operation.
1112
1113        Args:
1114           op: name of the operation.
1115           op_func: a function that carries out the operation (reboot, suspend)
1116           get_kernel_func: a function that returns a string
1117                            representing the kernel version.
1118        """
1119        try:
1120            self.record('START', None, op)
1121            op_func()
1122        except Exception as e:
1123            err_msg = str(e) + '\n' + traceback.format_exc()
1124            self.record('END FAIL', None, op, err_msg)
1125            raise
1126        else:
1127            kernel = get_kernel_func()
1128            self.record('END GOOD', None, op,
1129                        optional_fields={"kernel": kernel})
1130
1131
1132    def run_control(self, path):
1133        """Execute a control file found at path (relative to the autotest
1134        path). Intended for executing a control file within a control file,
1135        not for running the top-level job control file."""
1136        path = os.path.join(self.autodir, path)
1137        control_file = self._load_control_file(path)
1138        self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)
1139
1140
1141    def add_sysinfo_command(self, command, logfile=None, on_every_test=False):
1142        self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),
1143                                   on_every_test)
1144
1145
1146    def add_sysinfo_logfile(self, file, on_every_test=False):
1147        self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)
1148
1149
1150    def _add_sysinfo_loggable(self, loggable, on_every_test):
1151        if on_every_test:
1152            self.sysinfo.test_loggables.add(loggable)
1153        else:
1154            self.sysinfo.boot_loggables.add(loggable)
1155
1156
1157    def _read_warnings(self):
1158        """Poll all the warning loggers and extract any new warnings that have
1159        been logged. If the warnings belong to a category that is currently
1160        disabled, this method will discard them and they will no longer be
1161        retrievable.
1162
1163        Returns a list of (timestamp, message) tuples, where timestamp is an
1164        integer epoch timestamp."""
1165        warnings = []
1166        while True:
1167            # pull in a line of output from every logger that has
1168            # output ready to be read
1169            loggers, _, _ = select.select(self.warning_loggers, [], [], 0)
1170            closed_loggers = set()
1171            for logger in loggers:
1172                line = logger.readline()
1173                # record any broken pipes (aka line == empty)
1174                if len(line) == 0:
1175                    closed_loggers.add(logger)
1176                    continue
1177                # parse out the warning
1178                timestamp, msgtype, msg = line.split('\t', 2)
1179                timestamp = int(timestamp)
1180                # if the warning is valid, add it to the results
1181                if self.warning_manager.is_valid(timestamp, msgtype):
1182                    warnings.append((timestamp, msg.strip()))
1183
1184            # stop listening to loggers that are closed
1185            self.warning_loggers -= closed_loggers
1186
1187            # stop if none of the loggers have any output left
1188            if not loggers:
1189                break
1190
1191        # sort into timestamp order
1192        warnings.sort()
1193        return warnings
1194
1195
1196    def _unique_subdirectory(self, base_subdirectory_name):
1197        """Compute a unique results subdirectory based on the given name.
1198
1199        Appends base_subdirectory_name with a number as necessary to find a
1200        directory name that doesn't already exist.
1201        """
1202        subdirectory = base_subdirectory_name
1203        counter = 1
1204        while os.path.exists(os.path.join(self.resultdir, subdirectory)):
1205            subdirectory = base_subdirectory_name + '.' + str(counter)
1206            counter += 1
1207        return subdirectory
1208
1209
1210    def get_record_context(self):
1211        """Returns an object representing the current job.record context.
1212
1213        The object returned is an opaque object with a 0-arg restore method
1214        which can be called to restore the job.record context (i.e. indentation)
1215        to the current level. The intention is that it should be used when
1216        something external which generate job.record calls (e.g. an autotest
1217        client) can fail catastrophically and the server job record state
1218        needs to be reset to its original "known good" state.
1219
1220        @return: A context object with a 0-arg restore() method."""
1221        return self._indenter.get_context()
1222
1223
1224    def record_summary(self, status_code, test_name, reason='', attributes=None,
1225                       distinguishing_attributes=(), child_test_ids=None):
1226        """Record a summary test result.
1227
1228        @param status_code: status code string, see
1229                common_lib.log.is_valid_status()
1230        @param test_name: name of the test
1231        @param reason: (optional) string providing detailed reason for test
1232                outcome
1233        @param attributes: (optional) dict of string keyvals to associate with
1234                this result
1235        @param distinguishing_attributes: (optional) list of attribute names
1236                that should be used to distinguish identically-named test
1237                results.  These attributes should be present in the attributes
1238                parameter.  This is used to generate user-friendly subdirectory
1239                names.
1240        @param child_test_ids: (optional) list of test indices for test results
1241                used in generating this result.
1242        """
1243        subdirectory_name_parts = [test_name]
1244        for attribute in distinguishing_attributes:
1245            assert attributes
1246            assert attribute in attributes, '%s not in %s' % (attribute,
1247                                                              attributes)
1248            subdirectory_name_parts.append(attributes[attribute])
1249        base_subdirectory_name = '.'.join(subdirectory_name_parts)
1250
1251        subdirectory = self._unique_subdirectory(base_subdirectory_name)
1252        subdirectory_path = os.path.join(self.resultdir, subdirectory)
1253        os.mkdir(subdirectory_path)
1254
1255        self.record(status_code, subdirectory, test_name,
1256                    status=reason, optional_fields={'is_summary': True})
1257
1258        if attributes:
1259            utils.write_keyval(subdirectory_path, attributes)
1260
1261        if child_test_ids:
1262            ids_string = ','.join(str(test_id) for test_id in child_test_ids)
1263            summary_data = {'child_test_ids': ids_string}
1264            utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'),
1265                               summary_data)
1266
1267
1268    def add_post_run_hook(self, hook):
1269        """
1270        Registers a hook to run after the main job function.
1271
1272        This provides a mechanism by which tests that perform multiple tests of
1273        their own can write additional top-level results to the TKO status.log
1274        file.
1275
1276        @param hook: Function to invoke (without any args) after the main job
1277            function completes and the job status is logged.
1278        """
1279        self._post_run_hooks.append(hook)
1280
1281
1282    def disable_warnings(self, warning_type):
1283        self.warning_manager.disable_warnings(warning_type)
1284        self.record("INFO", None, None,
1285                    "disabling %s warnings" % warning_type,
1286                    {"warnings.disable": warning_type})
1287
1288
1289    def enable_warnings(self, warning_type):
1290        self.warning_manager.enable_warnings(warning_type)
1291        self.record("INFO", None, None,
1292                    "enabling %s warnings" % warning_type,
1293                    {"warnings.enable": warning_type})
1294
1295
1296    def get_status_log_path(self, subdir=None):
1297        """Return the path to the job status log.
1298
1299        @param subdir - Optional paramter indicating that you want the path
1300            to a subdirectory status log.
1301
1302        @returns The path where the status log should be.
1303        """
1304        if self.resultdir:
1305            if subdir:
1306                return os.path.join(self.resultdir, subdir, "status.log")
1307            else:
1308                return os.path.join(self.resultdir, "status.log")
1309        else:
1310            return None
1311
1312
1313    def _update_uncollected_logs_list(self, update_func):
1314        """Updates the uncollected logs list in a multi-process safe manner.
1315
1316        @param update_func - a function that updates the list of uncollected
1317            logs. Should take one parameter, the list to be updated.
1318        """
1319        # Skip log collection if file _uncollected_log_file does not exist.
1320        if not (self._uncollected_log_file and
1321                os.path.exists(self._uncollected_log_file)):
1322            return
1323        if self._uncollected_log_file:
1324            log_file = open(self._uncollected_log_file, "rb+")
1325            fcntl.flock(log_file, fcntl.LOCK_EX)
1326        try:
1327            uncollected_logs = pickle.load(log_file)
1328            update_func(uncollected_logs)
1329            log_file.seek(0)
1330            log_file.truncate()
1331            pickle.dump(uncollected_logs, log_file)
1332            log_file.flush()
1333        finally:
1334            fcntl.flock(log_file, fcntl.LOCK_UN)
1335            log_file.close()
1336
1337
1338    def add_client_log(self, hostname, remote_path, local_path):
1339        """Adds a new set of client logs to the list of uncollected logs,
1340        to allow for future log recovery.
1341
1342        @param host - the hostname of the machine holding the logs
1343        @param remote_path - the directory on the remote machine holding logs
1344        @param local_path - the local directory to copy the logs into
1345        """
1346        def update_func(logs_list):
1347            logs_list.append((hostname, remote_path, local_path))
1348        self._update_uncollected_logs_list(update_func)
1349
1350
1351    def remove_client_log(self, hostname, remote_path, local_path):
1352        """Removes a set of client logs from the list of uncollected logs,
1353        to allow for future log recovery.
1354
1355        @param host - the hostname of the machine holding the logs
1356        @param remote_path - the directory on the remote machine holding logs
1357        @param local_path - the local directory to copy the logs into
1358        """
1359        def update_func(logs_list):
1360            logs_list.remove((hostname, remote_path, local_path))
1361        self._update_uncollected_logs_list(update_func)
1362
1363
1364    def get_client_logs(self):
1365        """Retrieves the list of uncollected logs, if it exists.
1366
1367        @returns A list of (host, remote_path, local_path) tuples. Returns
1368                 an empty list if no uncollected logs file exists.
1369        """
1370        log_exists = (self._uncollected_log_file and
1371                      os.path.exists(self._uncollected_log_file))
1372        if log_exists:
1373            return pickle.load(open(self._uncollected_log_file))
1374        else:
1375            return []
1376
1377
1378    def _fill_server_control_namespace(self, namespace, protect=True):
1379        """
1380        Prepare a namespace to be used when executing server control files.
1381
1382        This sets up the control file API by importing modules and making them
1383        available under the appropriate names within namespace.
1384
1385        For use by _execute_code().
1386
1387        Args:
1388          namespace: The namespace dictionary to fill in.
1389          protect: Boolean.  If True (the default) any operation that would
1390              clobber an existing entry in namespace will cause an error.
1391        Raises:
1392          error.AutoservError: When a name would be clobbered by import.
1393        """
1394        def _import_names(module_name, names=()):
1395            """
1396            Import a module and assign named attributes into namespace.
1397
1398            Args:
1399                module_name: The string module name.
1400                names: A limiting list of names to import from module_name.  If
1401                    empty (the default), all names are imported from the module
1402                    similar to a "from foo.bar import *" statement.
1403            Raises:
1404                error.AutoservError: When a name being imported would clobber
1405                    a name already in namespace.
1406            """
1407            module = __import__(module_name, {}, {}, names)
1408
1409            # No names supplied?  Import * from the lowest level module.
1410            # (Ugh, why do I have to implement this part myself?)
1411            if not names:
1412                for submodule_name in module_name.split('.')[1:]:
1413                    module = getattr(module, submodule_name)
1414                if hasattr(module, '__all__'):
1415                    names = getattr(module, '__all__')
1416                else:
1417                    names = dir(module)
1418
1419            # Install each name into namespace, checking to make sure it
1420            # doesn't override anything that already exists.
1421            for name in names:
1422                # Check for conflicts to help prevent future problems.
1423                if name in namespace and protect:
1424                    if namespace[name] is not getattr(module, name):
1425                        raise error.AutoservError('importing name '
1426                                '%s from %s %r would override %r' %
1427                                (name, module_name, getattr(module, name),
1428                                 namespace[name]))
1429                    else:
1430                        # Encourage cleanliness and the use of __all__ for a
1431                        # more concrete API with less surprises on '*' imports.
1432                        warnings.warn('%s (%r) being imported from %s for use '
1433                                      'in server control files is not the '
1434                                      'first occurrence of that import.' %
1435                                      (name, namespace[name], module_name))
1436
1437                namespace[name] = getattr(module, name)
1438
1439
1440        # This is the equivalent of prepending a bunch of import statements to
1441        # the front of the control script.
1442        namespace.update(os=os, sys=sys, logging=logging)
1443        _import_names('autotest_lib.server',
1444                ('hosts', 'autotest', 'standalone_profiler'))
1445        _import_names('autotest_lib.server.subcommand',
1446                      ('parallel', 'parallel_simple', 'subcommand'))
1447        _import_names('autotest_lib.server.utils',
1448                      ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))
1449        _import_names('autotest_lib.client.common_lib.error')
1450        _import_names('autotest_lib.client.common_lib.barrier', ('barrier',))
1451
1452        # Inject ourself as the job object into other classes within the API.
1453        # (Yuck, this injection is a gross thing be part of a public API. -gps)
1454        #
1455        # XXX Autotest does not appear to use .job.  Who does?
1456        namespace['autotest'].Autotest.job = self
1457        # server.hosts.base_classes.Host uses .job.
1458        namespace['hosts'].Host.job = self
1459        namespace['hosts'].factory.ssh_user = self._ssh_user
1460        namespace['hosts'].factory.ssh_port = self._ssh_port
1461        namespace['hosts'].factory.ssh_pass = self._ssh_pass
1462        namespace['hosts'].factory.ssh_verbosity_flag = (
1463                self._ssh_verbosity_flag)
1464        namespace['hosts'].factory.ssh_options = self._ssh_options
1465
1466
1467    def _execute_code(self, code_file, namespace, protect=True):
1468        """
1469        Execute code using a copy of namespace as a server control script.
1470
1471        Unless protect_namespace is explicitly set to False, the dict will not
1472        be modified.
1473
1474        Args:
1475          code_file: The filename of the control file to execute.
1476          namespace: A dict containing names to make available during execution.
1477          protect: Boolean.  If True (the default) a copy of the namespace dict
1478              is used during execution to prevent the code from modifying its
1479              contents outside of this function.  If False the raw dict is
1480              passed in and modifications will be allowed.
1481        """
1482        if protect:
1483            namespace = namespace.copy()
1484        self._fill_server_control_namespace(namespace, protect=protect)
1485        # TODO: Simplify and get rid of the special cases for only 1 machine.
1486        if len(self.machines) > 1:
1487            machines_text = '\n'.join(self.machines) + '\n'
1488            # Only rewrite the file if it does not match our machine list.
1489            try:
1490                machines_f = open(MACHINES_FILENAME, 'r')
1491                existing_machines_text = machines_f.read()
1492                machines_f.close()
1493            except EnvironmentError:
1494                existing_machines_text = None
1495            if machines_text != existing_machines_text:
1496                utils.open_write_close(MACHINES_FILENAME, machines_text)
1497        seven.exec_file(code_file, locals_=namespace, globals_=namespace)
1498
1499
1500    def preprocess_client_state(self):
1501        """
1502        Produce a state file for initializing the state of a client job.
1503
1504        Creates a new client state file with all the current server state, as
1505        well as some pre-set client state.
1506
1507        @returns The path of the file the state was written into.
1508        """
1509        # initialize the sysinfo state
1510        self._state.set('client', 'sysinfo', self.sysinfo.serialize())
1511
1512        # dump the state out to a tempfile
1513        fd, file_path = tempfile.mkstemp(dir=self.tmpdir)
1514        os.close(fd)
1515
1516        # write_to_file doesn't need locking, we exclusively own file_path
1517        self._state.write_to_file(file_path)
1518        return file_path
1519
1520
1521    def postprocess_client_state(self, state_path):
1522        """
1523        Update the state of this job with the state from a client job.
1524
1525        Updates the state of the server side of a job with the final state
1526        of a client job that was run. Updates the non-client-specific state,
1527        pulls in some specific bits from the client-specific state, and then
1528        discards the rest. Removes the state file afterwards
1529
1530        @param state_file A path to the state file from the client.
1531        """
1532        # update the on-disk state
1533        try:
1534            self._state.read_from_file(state_path)
1535            os.remove(state_path)
1536        except OSError as e:
1537            # ignore file-not-found errors
1538            if e.errno != errno.ENOENT:
1539                raise
1540            else:
1541                logging.debug('Client state file %s not found', state_path)
1542
1543        # update the sysinfo state
1544        if self._state.has('client', 'sysinfo'):
1545            self.sysinfo.deserialize(self._state.get('client', 'sysinfo'))
1546
1547        # drop all the client-specific state
1548        self._state.discard_namespace('client')
1549
1550
1551    def clear_all_known_hosts(self):
1552        """Clears known hosts files for all AbstractSSHHosts."""
1553        for host in self.hosts:
1554            if isinstance(host, abstract_ssh.AbstractSSHHost):
1555                host.clear_known_hosts()
1556
1557
1558    def close(self):
1559        """Closes this job's operation."""
1560
1561        # Use shallow copy, because host.close() internally discards itself.
1562        for host in list(self.hosts):
1563            host.close()
1564        assert not self.hosts
1565        self._connection_pool.shutdown()
1566
1567
1568    def _get_job_data(self):
1569        """Add custom data to the job keyval info.
1570
1571        When multiple machines are used in a job, change the hostname to
1572        the platform of the first machine instead of machine1,machine2,...  This
1573        makes the job reports easier to read and keeps the tko_machines table from
1574        growing too large.
1575
1576        Returns:
1577            keyval dictionary with new hostname value, or empty dictionary.
1578        """
1579        job_data = {}
1580        # Only modify hostname on multimachine jobs. Assume all host have the same
1581        # platform.
1582        if len(self.machines) > 1:
1583            # Search through machines for first machine with a platform.
1584            for host in self.machines:
1585                keyval_path = os.path.join(self.resultdir, 'host_keyvals', host)
1586                keyvals = utils.read_keyval(keyval_path)
1587                host_plat = keyvals.get('platform', None)
1588                if not host_plat:
1589                    continue
1590                job_data['hostname'] = host_plat
1591                break
1592        return job_data
1593
1594
1595class warning_manager(object):
1596    """Class for controlling warning logs. Manages the enabling and disabling
1597    of warnings."""
1598    def __init__(self):
1599        # a map of warning types to a list of disabled time intervals
1600        self.disabled_warnings = {}
1601
1602
1603    def is_valid(self, timestamp, warning_type):
1604        """Indicates if a warning (based on the time it occured and its type)
1605        is a valid warning. A warning is considered "invalid" if this type of
1606        warning was marked as "disabled" at the time the warning occured."""
1607        disabled_intervals = self.disabled_warnings.get(warning_type, [])
1608        for start, end in disabled_intervals:
1609            if timestamp >= start and (end is None or timestamp < end):
1610                return False
1611        return True
1612
1613
1614    def disable_warnings(self, warning_type, current_time_func=time.time):
1615        """As of now, disables all further warnings of this type."""
1616        intervals = self.disabled_warnings.setdefault(warning_type, [])
1617        if not intervals or intervals[-1][1] is not None:
1618            intervals.append((int(current_time_func()), None))
1619
1620
1621    def enable_warnings(self, warning_type, current_time_func=time.time):
1622        """As of now, enables all further warnings of this type."""
1623        intervals = self.disabled_warnings.get(warning_type, [])
1624        if intervals and intervals[-1][1] is None:
1625            intervals[-1] = (intervals[-1][0], int(current_time_func()))
1626
1627
1628def _is_current_server_job(test):
1629    """Return True if parsed test is the currently running job.
1630
1631    @param test: test instance from tko parser.
1632    """
1633    return test.testname == 'SERVER_JOB'
1634
1635
1636def _create_afe_host(hostname):
1637    """Create an afe_host object backed by the AFE.
1638
1639    @param hostname: Name of the host for which we want the Host object.
1640    @returns: An object of type frontend.AFE
1641    """
1642    afe = frontend_wrappers.RetryingAFE(timeout_min=5, delay_sec=10)
1643    hosts = afe.get_hosts(hostname=hostname)
1644    if not hosts:
1645        raise error.AutoservError('No hosts named %s found' % hostname)
1646
1647    return hosts[0]
1648
1649
1650def _create_file_backed_host_info_store(store_dir, hostname):
1651    """Create a CachingHostInfoStore backed by an existing file.
1652
1653    @param store_dir: A directory to contain store backing files.
1654    @param hostname: Name of the host for which we want the store.
1655
1656    @returns: An object of type CachingHostInfoStore.
1657    """
1658    backing_file_path = os.path.join(store_dir, '%s.store' % hostname)
1659    if not os.path.isfile(backing_file_path):
1660        raise error.AutoservError(
1661                'Requested FileStore but no backing file at %s'
1662                % backing_file_path
1663        )
1664    return file_store.FileStore(backing_file_path)
1665
1666
1667def _create_afe_backed_host_info_store(store_dir, hostname):
1668    """Create a CachingHostInfoStore backed by the AFE.
1669
1670    @param store_dir: A directory to contain store backing files.
1671    @param hostname: Name of the host for which we want the store.
1672
1673    @returns: An object of type CachingHostInfoStore.
1674    """
1675    primary_store = afe_store.AfeStore(hostname)
1676    try:
1677        primary_store.get(force_refresh=True)
1678    except host_info.StoreError:
1679        raise error.AutoservError(
1680                'Could not obtain HostInfo for hostname %s' % hostname)
1681    # Since the store wasn't initialized external to autoserv, we must
1682    # ensure that the store we create is unique within store_dir.
1683    backing_file_path = os.path.join(
1684            _make_unique_subdir(store_dir),
1685            '%s.store' % hostname,
1686    )
1687    logging.info('Shadowing AFE store with a FileStore at %s',
1688                 backing_file_path)
1689    shadow_store = file_store.FileStore(backing_file_path)
1690    return shadowing_store.ShadowingStore(primary_store, shadow_store)
1691
1692
1693def _make_unique_subdir(workdir):
1694    """Creates a new subdir within workdir and returns the path to it."""
1695    store_dir = os.path.join(workdir, 'dir_%s' % uuid.uuid4())
1696    _make_dirs_if_needed(store_dir)
1697    return store_dir
1698
1699
1700def _make_dirs_if_needed(path):
1701    """os.makedirs, but ignores failure because the leaf directory exists"""
1702    try:
1703        os.makedirs(path)
1704    except OSError as e:
1705        if e.errno != errno.EEXIST:
1706            raise
1707