1# Lint as: python2, python3 2# pylint: disable-msg=C0111 3 4# Copyright (c) 2011 The Chromium OS Authors. All rights reserved. 5# Use of this source code is governed by a BSD-style license that can be 6# found in the LICENSE file. 7""" 8The main job wrapper for the server side. 9 10This is the core infrastructure. Derived from the client side job.py 11 12Copyright Martin J. Bligh, Andy Whitcroft 2007 13""" 14 15from __future__ import absolute_import 16from __future__ import division 17from __future__ import print_function 18 19import errno 20import fcntl 21import getpass 22import logging 23import os 24import pickle 25import platform 26import re 27import select 28import shutil 29import sys 30import tempfile 31import time 32import traceback 33import uuid 34import warnings 35 36from datetime import datetime 37 38from autotest_lib.client.bin import sysinfo 39from autotest_lib.client.common_lib import base_job 40from autotest_lib.client.common_lib import control_data 41from autotest_lib.client.common_lib import error 42from autotest_lib.client.common_lib import logging_manager 43from autotest_lib.client.common_lib import packages 44from autotest_lib.client.common_lib import utils 45from autotest_lib.client.common_lib import seven 46from autotest_lib.server import profilers 47from autotest_lib.server import site_gtest_runner 48from autotest_lib.server import subcommand 49from autotest_lib.server import test 50from autotest_lib.server.autotest import OFFLOAD_ENVVAR 51from autotest_lib.server import utils as server_utils 52from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 53from autotest_lib.server import hosts 54from autotest_lib.server.hosts import abstract_ssh 55from autotest_lib.server.hosts import afe_store 56from autotest_lib.server.hosts import file_store 57from autotest_lib.server.hosts import shadowing_store 58from autotest_lib.server.hosts import factory as host_factory 59from autotest_lib.server.hosts import host_info 60from autotest_lib.server.hosts import ssh_multiplex 61from autotest_lib.tko import models as tko_models 62from autotest_lib.tko import parser_lib 63from six.moves import zip 64 65try: 66 from autotest_lib.utils.frozen_chromite.lib import metrics 67except ImportError: 68 metrics = utils.metrics_mock 69 70 71def _control_segment_path(name): 72 """Get the pathname of the named control segment file.""" 73 server_dir = os.path.dirname(os.path.abspath(__file__)) 74 return os.path.join(server_dir, "control_segments", name) 75 76 77CLIENT_CONTROL_FILENAME = 'control' 78SERVER_CONTROL_FILENAME = 'control.srv' 79MACHINES_FILENAME = '.machines' 80DUT_STATEFUL_PATH = "/usr/local" 81 82CLIENT_WRAPPER_CONTROL_FILE = _control_segment_path('client_wrapper') 83CLIENT_TRAMPOLINE_CONTROL_FILE = _control_segment_path('client_trampoline') 84CRASHDUMPS_CONTROL_FILE = _control_segment_path('crashdumps') 85CRASHINFO_CONTROL_FILE = _control_segment_path('crashinfo') 86CLEANUP_CONTROL_FILE = _control_segment_path('cleanup') 87VERIFY_CONTROL_FILE = _control_segment_path('verify') 88REPAIR_CONTROL_FILE = _control_segment_path('repair') 89PROVISION_CONTROL_FILE = _control_segment_path('provision') 90VERIFY_JOB_REPO_URL_CONTROL_FILE = _control_segment_path('verify_job_repo_url') 91RESET_CONTROL_FILE = _control_segment_path('reset') 92GET_NETWORK_STATS_CONTROL_FILE = _control_segment_path('get_network_stats') 93 94 95def get_machine_dicts(machine_names, store_dir, in_lab, use_shadow_store, 96 host_attributes=None): 97 """Converts a list of machine names to list of dicts. 98 99 TODO(crbug.com/678430): This function temporarily has a side effect of 100 creating files under workdir for backing a FileStore. This side-effect will 101 go away once callers of autoserv start passing in the FileStore. 102 103 @param machine_names: A list of machine names. 104 @param store_dir: A directory to contain store backing files. 105 @param use_shadow_store: If True, we should create a ShadowingStore where 106 actual store is backed by the AFE but we create a backing file to 107 shadow the store. If False, backing file should already exist at: 108 ${store_dir}/${hostname}.store 109 @param in_lab: A boolean indicating whether we're running in lab. 110 @param host_attributes: Optional list of host attributes to add for each 111 host. 112 @returns: A list of dicts. Each dict has the following keys: 113 'hostname': Name of the machine originally in machine_names (str). 114 'afe_host': A frontend.Host object for the machine, or a stub if 115 in_lab is false. 116 'host_info_store': A host_info.CachingHostInfoStore object to obtain 117 host information. A stub if in_lab is False. 118 'connection_pool': ssh_multiplex.ConnectionPool instance to share 119 ssh connection across control scripts. This is set to 120 None, and should be overridden for connection sharing. 121 """ 122 # See autoserv_parser.parse_args. Only one of in_lab or host_attributes can 123 # be provided. 124 if in_lab and host_attributes: 125 raise error.AutoservError( 126 'in_lab and host_attribute are mutually exclusive.') 127 128 machine_dict_list = [] 129 for machine in machine_names: 130 if not in_lab: 131 afe_host = server_utils.EmptyAFEHost() 132 host_info_store = _create_file_backed_host_info_store( 133 store_dir, machine) 134 if host_attributes: 135 afe_host.attributes.update(host_attributes) 136 info = host_info.HostInfo(labels=host_info_store.get().labels, 137 attributes=host_attributes) 138 host_info_store.commit(info) 139 elif use_shadow_store: 140 afe_host = _create_afe_host(machine) 141 host_info_store = _create_afe_backed_host_info_store(store_dir, 142 machine) 143 else: 144 afe_host = server_utils.EmptyAFEHost() 145 host_info_store = _create_file_backed_host_info_store(store_dir, 146 machine) 147 148 machine_dict_list.append({ 149 'hostname' : machine, 150 'afe_host' : afe_host, 151 'host_info_store': host_info_store, 152 'connection_pool': None, 153 }) 154 155 return machine_dict_list 156 157 158class status_indenter(base_job.status_indenter): 159 """Provide a simple integer-backed status indenter.""" 160 def __init__(self): 161 self._indent = 0 162 163 164 @property 165 def indent(self): 166 return self._indent 167 168 169 def increment(self): 170 self._indent += 1 171 172 173 def decrement(self): 174 self._indent -= 1 175 176 177 def get_context(self): 178 """Returns a context object for use by job.get_record_context.""" 179 class context(object): 180 def __init__(self, indenter, indent): 181 self._indenter = indenter 182 self._indent = indent 183 def restore(self): 184 self._indenter._indent = self._indent 185 return context(self, self._indent) 186 187 188class server_job_record_hook(object): 189 """The job.record hook for server job. Used to inject WARN messages from 190 the console or vlm whenever new logs are written, and to echo any logs 191 to INFO level logging. Implemented as a class so that it can use state to 192 block recursive calls, so that the hook can call job.record itself to 193 log WARN messages. 194 195 Depends on job._read_warnings and job._logger. 196 """ 197 def __init__(self, job): 198 self._job = job 199 self._being_called = False 200 201 202 def __call__(self, entry): 203 """A wrapper around the 'real' record hook, the _hook method, which 204 prevents recursion. This isn't making any effort to be threadsafe, 205 the intent is to outright block infinite recursion via a 206 job.record->_hook->job.record->_hook->job.record... chain.""" 207 if self._being_called: 208 return 209 self._being_called = True 210 try: 211 self._hook(self._job, entry) 212 finally: 213 self._being_called = False 214 215 216 @staticmethod 217 def _hook(job, entry): 218 """The core hook, which can safely call job.record.""" 219 entries = [] 220 # poll all our warning loggers for new warnings 221 for timestamp, msg in job._read_warnings(): 222 warning_entry = base_job.status_log_entry( 223 'WARN', None, None, msg, {}, timestamp=timestamp) 224 entries.append(warning_entry) 225 job.record_entry(warning_entry) 226 # echo rendered versions of all the status logs to info 227 entries.append(entry) 228 for entry in entries: 229 rendered_entry = job._logger.render_entry(entry) 230 logging.info(rendered_entry) 231 232 233class server_job(base_job.base_job): 234 """The server-side concrete implementation of base_job. 235 236 Optional properties provided by this implementation: 237 serverdir 238 239 warning_manager 240 warning_loggers 241 """ 242 243 _STATUS_VERSION = 1 244 245 # TODO crbug.com/285395 eliminate ssh_verbosity_flag 246 def __init__(self, 247 control, 248 args, 249 resultdir, 250 label, 251 user, 252 machines, 253 machine_dict_list, 254 client=False, 255 ssh_user=host_factory.DEFAULT_SSH_USER, 256 ssh_port=host_factory.DEFAULT_SSH_PORT, 257 ssh_pass=host_factory.DEFAULT_SSH_PASS, 258 ssh_verbosity_flag=host_factory.DEFAULT_SSH_VERBOSITY, 259 ssh_options=host_factory.DEFAULT_SSH_OPTIONS, 260 group_name='', 261 tag='', 262 disable_sysinfo=False, 263 control_filename=SERVER_CONTROL_FILENAME, 264 parent_job_id=None, 265 in_lab=False, 266 use_client_trampoline=False, 267 sync_offload_dir='', 268 companion_hosts=None, 269 dut_servers=None, 270 is_cft=False, 271 force_full_log_collection=False): 272 """ 273 Create a server side job object. 274 275 @param control: The pathname of the control file. 276 @param args: Passed to the control file. 277 @param resultdir: Where to throw the results. 278 @param label: Description of the job. 279 @param user: Username for the job (email address). 280 @param machines: A list of hostnames of the machines to use for the job. 281 @param machine_dict_list: A list of dicts for each of the machines above 282 as returned by get_machine_dicts. 283 @param client: True if this is a client-side control file. 284 @param ssh_user: The SSH username. [root] 285 @param ssh_port: The SSH port number. [22] 286 @param ssh_pass: The SSH passphrase, if needed. 287 @param ssh_verbosity_flag: The SSH verbosity flag, '-v', '-vv', 288 '-vvv', or an empty string if not needed. 289 @param ssh_options: A string giving additional options that will be 290 included in ssh commands. 291 @param group_name: If supplied, this will be written out as 292 host_group_name in the keyvals file for the parser. 293 @param tag: The job execution tag from the scheduler. [optional] 294 @param disable_sysinfo: Whether we should disable the sysinfo step of 295 tests for a modest shortening of test time. [optional] 296 @param control_filename: The filename where the server control file 297 should be written in the results directory. 298 @param parent_job_id: Job ID of the parent job. Default to None if the 299 job does not have a parent job. 300 @param in_lab: Boolean that indicates if this is running in the lab 301 environment. 302 @param use_client_trampoline: Boolean that indicates whether to 303 use the client trampoline flow. If this is True, control 304 is interpreted as the name of the client test to run. 305 The client control file will be client_trampoline. The 306 test name will be passed to client_trampoline, which will 307 install the test package and re-exec the actual test 308 control file. 309 @param sync_offload_dir: String; relative path to synchronous offload 310 dir, relative to the results directory. Ignored if empty. 311 @param companion_hosts: a str or list of hosts to be used as companions 312 for the and provided to test. NOTE: these are different than 313 machines, where each host is a host that the test would be run 314 on. 315 @param dut_servers: a str or list of hosts to be used as DUT servers 316 provided to test. 317 @param force_full_log_collection: bool; force full log collection even 318 when test passes. 319 """ 320 super(server_job, self).__init__(resultdir=resultdir) 321 self.control = control 322 self._uncollected_log_file = os.path.join(self.resultdir, 323 'uncollected_logs') 324 debugdir = os.path.join(self.resultdir, 'debug') 325 if not os.path.exists(debugdir): 326 os.mkdir(debugdir) 327 328 if user: 329 self.user = user 330 else: 331 self.user = getpass.getuser() 332 333 self.args = args 334 self.label = label 335 self.machines = machines 336 self._client = client 337 self.warning_loggers = set() 338 self.warning_manager = warning_manager() 339 self._ssh_user = ssh_user 340 self._ssh_port = ssh_port 341 self._ssh_pass = ssh_pass 342 self._ssh_verbosity_flag = ssh_verbosity_flag 343 self._ssh_options = ssh_options 344 self.tag = tag 345 self.hosts = set() 346 self.drop_caches = False 347 self.drop_caches_between_iterations = False 348 self._control_filename = control_filename 349 self._disable_sysinfo = disable_sysinfo 350 self._use_client_trampoline = use_client_trampoline 351 self._companion_hosts = companion_hosts 352 self._dut_servers = dut_servers 353 self._is_cft = is_cft 354 self.force_full_log_collection = force_full_log_collection 355 356 # Parse the release number from the label to setup sysinfo. 357 version = re.findall('release/R(\d+)-', label) 358 if version: 359 version = int(version[0]) 360 361 self.logging = logging_manager.get_logging_manager( 362 manage_stdout_and_stderr=True, redirect_fds=True) 363 subcommand.logging_manager_object = self.logging 364 365 self.sysinfo = sysinfo.sysinfo(self.resultdir, version=version) 366 self.profilers = profilers.profilers(self) 367 self._sync_offload_dir = sync_offload_dir 368 369 job_data = { 370 'user': user, 371 'hostname': ','.join(machines), 372 'drone': platform.node(), 373 'status_version': str(self._STATUS_VERSION), 374 'job_started': str(int(time.time())) 375 } 376 377 # Adhoc/<testname> is the default label, and should not be written, 378 # as this can cause conflicts with results uploading in CFT. 379 # However, some pipelines (such as PVS) do need `label` within the 380 # keyval, which can now by done with the `-l` flag in test_that. 381 if 'adhoc' not in label: 382 job_data['label'] = label 383 # Save parent job id to keyvals, so parser can retrieve the info and 384 # write to tko_jobs record. 385 if parent_job_id: 386 job_data['parent_job_id'] = parent_job_id 387 if group_name: 388 job_data['host_group_name'] = group_name 389 390 # only write these keyvals out on the first job in a resultdir 391 if 'job_started' not in utils.read_keyval(self.resultdir): 392 job_data.update(self._get_job_data()) 393 utils.write_keyval(self.resultdir, job_data) 394 395 self.pkgmgr = packages.PackageManager( 396 self.autodir, run_function_dargs={'timeout':600}) 397 398 self._register_subcommand_hooks() 399 400 # We no longer parse results as part of the server_job. These arguments 401 # can't be dropped yet because clients haven't all be cleaned up yet. 402 self.num_tests_run = -1 403 self.num_tests_failed = -1 404 405 # set up the status logger 406 self._indenter = status_indenter() 407 self._logger = base_job.status_logger( 408 self, self._indenter, 'status.log', 'status.log', 409 record_hook=server_job_record_hook(self)) 410 411 # Initialize a flag to indicate DUT failure during the test, e.g., 412 # unexpected reboot. 413 self.failed_with_device_error = False 414 415 self._connection_pool = ssh_multiplex.ConnectionPool() 416 417 # List of functions to run after the main job function. 418 self._post_run_hooks = [] 419 420 self.parent_job_id = parent_job_id 421 self.in_lab = in_lab 422 self.machine_dict_list = machine_dict_list 423 for machine_dict in self.machine_dict_list: 424 machine_dict['connection_pool'] = self._connection_pool 425 426 # TODO(jrbarnette) The harness attribute is only relevant to 427 # client jobs, but it's required to be present, or we will fail 428 # server job unit tests. Yes, really. 429 # 430 # TODO(jrbarnette) The utility of the 'harness' attribute even 431 # to client jobs is suspect. Probably, we should remove it. 432 self.harness = None 433 434 # TODO(ayatane): fast and max_result_size_KB are not set for 435 # client_trampoline jobs. 436 if control and not use_client_trampoline: 437 parsed_control = control_data.parse_control( 438 control, raise_warnings=False) 439 self.fast = parsed_control.fast 440 self.max_result_size_KB = parsed_control.max_result_size_KB 441 # wrap this in a try to prevent client/SSP issues. Note: if the 442 # except is hit, the timeout will be ignored. 443 try: 444 self.extended_timeout = parsed_control.extended_timeout 445 except AttributeError: 446 self.extended_timeout = None 447 else: 448 self.fast = False 449 # Set the maximum result size to be the default specified in 450 # global config, if the job has no control file associated. 451 self.max_result_size_KB = control_data.DEFAULT_MAX_RESULT_SIZE_KB 452 453 454 @classmethod 455 def _find_base_directories(cls): 456 """ 457 Determine locations of autodir, clientdir and serverdir. Assumes 458 that this file is located within serverdir and uses __file__ along 459 with relative paths to resolve the location. 460 """ 461 serverdir = os.path.abspath(os.path.dirname(__file__)) 462 autodir = os.path.normpath(os.path.join(serverdir, '..')) 463 clientdir = os.path.join(autodir, 'client') 464 return autodir, clientdir, serverdir 465 466 467 def _find_resultdir(self, resultdir, *args, **dargs): 468 """ 469 Determine the location of resultdir. For server jobs we expect one to 470 always be explicitly passed in to __init__, so just return that. 471 """ 472 if resultdir: 473 return os.path.normpath(resultdir) 474 else: 475 return None 476 477 478 def _get_status_logger(self): 479 """Return a reference to the status logger.""" 480 return self._logger 481 482 483 @staticmethod 484 def _load_control_file(path): 485 f = open(path) 486 try: 487 control_file = f.read() 488 finally: 489 f.close() 490 return re.sub('\r', '', control_file) 491 492 493 def _register_subcommand_hooks(self): 494 """ 495 Register some hooks into the subcommand modules that allow us 496 to properly clean up self.hosts created in forked subprocesses. 497 """ 498 def on_fork(cmd): 499 self._existing_hosts_on_fork = set(self.hosts) 500 def on_join(cmd): 501 new_hosts = self.hosts - self._existing_hosts_on_fork 502 for host in new_hosts: 503 host.close() 504 subcommand.subcommand.register_fork_hook(on_fork) 505 subcommand.subcommand.register_join_hook(on_join) 506 507 508 # TODO crbug.com/285395 add a kwargs parameter. 509 def _make_namespace(self): 510 """Create a namespace dictionary to be passed along to control file. 511 512 Creates a namespace argument populated with standard values: 513 machines, job, ssh_user, ssh_port, ssh_pass, ssh_verbosity_flag, 514 and ssh_options. 515 """ 516 namespace = {'machines' : self.machine_dict_list, 517 'job' : self, 518 'ssh_user' : self._ssh_user, 519 'ssh_port' : self._ssh_port, 520 'ssh_pass' : self._ssh_pass, 521 'ssh_verbosity_flag' : self._ssh_verbosity_flag, 522 'ssh_options' : self._ssh_options} 523 return namespace 524 525 526 def cleanup(self, labels): 527 """Cleanup machines. 528 529 @param labels: Comma separated job labels, will be used to 530 determine special task actions. 531 """ 532 if not self.machines: 533 raise error.AutoservError('No machines specified to cleanup') 534 if self.resultdir: 535 os.chdir(self.resultdir) 536 537 namespace = self._make_namespace() 538 namespace.update({'job_labels': labels, 'args': ''}) 539 self._execute_code(CLEANUP_CONTROL_FILE, namespace, protect=False) 540 541 542 def verify(self, labels): 543 """Verify machines are all ssh-able. 544 545 @param labels: Comma separated job labels, will be used to 546 determine special task actions. 547 """ 548 if not self.machines: 549 raise error.AutoservError('No machines specified to verify') 550 if self.resultdir: 551 os.chdir(self.resultdir) 552 553 namespace = self._make_namespace() 554 namespace.update({'job_labels': labels, 'args': ''}) 555 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False) 556 557 558 def reset(self, labels): 559 """Reset machines by first cleanup then verify each machine. 560 561 @param labels: Comma separated job labels, will be used to 562 determine special task actions. 563 """ 564 if not self.machines: 565 raise error.AutoservError('No machines specified to reset.') 566 if self.resultdir: 567 os.chdir(self.resultdir) 568 569 namespace = self._make_namespace() 570 namespace.update({'job_labels': labels, 'args': ''}) 571 self._execute_code(RESET_CONTROL_FILE, namespace, protect=False) 572 573 574 def repair(self, labels): 575 """Repair machines. 576 577 @param labels: Comma separated job labels, will be used to 578 determine special task actions. 579 """ 580 if not self.machines: 581 raise error.AutoservError('No machines specified to repair') 582 if self.resultdir: 583 os.chdir(self.resultdir) 584 585 namespace = self._make_namespace() 586 namespace.update({'job_labels': labels, 'args': ''}) 587 self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False) 588 589 590 def provision(self, labels): 591 """ 592 Provision all hosts to match |labels|. 593 594 @param labels: A comma seperated string of labels to provision the 595 host to. 596 597 """ 598 control = self._load_control_file(PROVISION_CONTROL_FILE) 599 self.run(control=control, job_labels=labels) 600 601 602 def precheck(self): 603 """ 604 perform any additional checks in derived classes. 605 """ 606 pass 607 608 609 def enable_external_logging(self): 610 """ 611 Start or restart external logging mechanism. 612 """ 613 pass 614 615 616 def disable_external_logging(self): 617 """ 618 Pause or stop external logging mechanism. 619 """ 620 pass 621 622 623 def use_external_logging(self): 624 """ 625 Return True if external logging should be used. 626 """ 627 return False 628 629 630 def _make_parallel_wrapper(self, function, machines, log): 631 """Wrap function as appropriate for calling by parallel_simple.""" 632 # machines could be a list of dictionaries, e.g., 633 # [{'host_attributes': {}, 'hostname': '100.96.51.226'}] 634 # The dictionary is generated in server_job.__init__, refer to 635 # variable machine_dict_list, then passed in with namespace, see method 636 # server_job._make_namespace. 637 # To compare the machinese to self.machines, which is a list of machine 638 # hostname, we need to convert machines back to a list of hostnames. 639 if (machines and isinstance(machines, list) 640 and isinstance(machines[0], dict)): 641 machines = [m['hostname'] for m in machines] 642 if len(machines) > 1 and log: 643 def wrapper(machine): 644 hostname = server_utils.get_hostname_from_machine(machine) 645 self.push_execution_context(hostname) 646 os.chdir(self.resultdir) 647 machine_data = {'hostname' : hostname, 648 'status_version' : str(self._STATUS_VERSION)} 649 utils.write_keyval(self.resultdir, machine_data) 650 result = function(machine) 651 return result 652 else: 653 wrapper = function 654 return wrapper 655 656 657 def parallel_simple(self, function, machines, log=True, timeout=None, 658 return_results=False): 659 """ 660 Run 'function' using parallel_simple, with an extra wrapper to handle 661 the necessary setup for continuous parsing, if possible. If continuous 662 parsing is already properly initialized then this should just work. 663 664 @param function: A callable to run in parallel given each machine. 665 @param machines: A list of machine names to be passed one per subcommand 666 invocation of function. 667 @param log: If True, output will be written to output in a subdirectory 668 named after each machine. 669 @param timeout: Seconds after which the function call should timeout. 670 @param return_results: If True instead of an AutoServError being raised 671 on any error a list of the results|exceptions from the function 672 called on each arg is returned. [default: False] 673 674 @raises error.AutotestError: If any of the functions failed. 675 """ 676 wrapper = self._make_parallel_wrapper(function, machines, log) 677 return subcommand.parallel_simple( 678 wrapper, machines, 679 subdir_name_constructor=server_utils.get_hostname_from_machine, 680 log=log, timeout=timeout, return_results=return_results) 681 682 683 def parallel_on_machines(self, function, machines, timeout=None): 684 """ 685 @param function: Called in parallel with one machine as its argument. 686 @param machines: A list of machines to call function(machine) on. 687 @param timeout: Seconds after which the function call should timeout. 688 689 @returns A list of machines on which function(machine) returned 690 without raising an exception. 691 """ 692 results = self.parallel_simple(function, machines, timeout=timeout, 693 return_results=True) 694 success_machines = [] 695 for result, machine in zip(results, machines): 696 if not isinstance(result, Exception): 697 success_machines.append(machine) 698 return success_machines 699 700 701 def record_skipped_test(self, skipped_test, message=None): 702 """Insert a failure record into status.log for this test.""" 703 msg = message 704 if msg is None: 705 msg = 'No valid machines found for test %s.' % skipped_test 706 logging.info(msg) 707 self.record('START', None, skipped_test.test_name) 708 self.record('INFO', None, skipped_test.test_name, msg) 709 self.record('END TEST_NA', None, skipped_test.test_name, msg) 710 711 712 def _has_failed_tests(self): 713 """Parse status log for failed tests. 714 715 This checks the current working directory and is intended only for use 716 by the run() method. 717 718 @return boolean 719 """ 720 path = os.getcwd() 721 722 # TODO(ayatane): Copied from tko/parse.py. Needs extensive refactor to 723 # make code reuse plausible. 724 job_keyval = tko_models.job.read_keyval(path) 725 status_version = job_keyval.get("status_version", 0) 726 727 # parse out the job 728 parser = parser_lib.parser(status_version) 729 job = parser.make_job(path) 730 status_log = os.path.join(path, "status.log") 731 if not os.path.exists(status_log): 732 status_log = os.path.join(path, "status") 733 if not os.path.exists(status_log): 734 logging.warning("! Unable to parse job, no status file") 735 return True 736 737 # parse the status logs 738 status_lines = open(status_log).readlines() 739 parser.start(job) 740 tests = parser.end(status_lines) 741 742 # parser.end can return the same object multiple times, so filter out 743 # dups 744 job.tests = [] 745 already_added = set() 746 for test in tests: 747 if test not in already_added: 748 already_added.add(test) 749 job.tests.append(test) 750 751 failed = False 752 for test in job.tests: 753 # The current job is still running and shouldn't count as failed. 754 # The parser will fail to parse the exit status of the job since it 755 # hasn't exited yet (this running right now is the job). 756 failed = failed or (test.status != 'GOOD' 757 and not _is_current_server_job(test)) 758 return failed 759 760 761 def _collect_crashes(self, namespace, collect_crashinfo): 762 """Collect crashes. 763 764 @param namespace: namespace dict. 765 @param collect_crashinfo: whether to collect crashinfo in addition to 766 dumps 767 """ 768 if collect_crashinfo: 769 # includes crashdumps 770 crash_control_file = CRASHINFO_CONTROL_FILE 771 else: 772 crash_control_file = CRASHDUMPS_CONTROL_FILE 773 self._execute_code(crash_control_file, namespace) 774 775 776 _USE_TEMP_DIR = object() 777 def run(self, collect_crashdumps=True, namespace={}, control=None, 778 control_file_dir=None, verify_job_repo_url=False, 779 only_collect_crashinfo=False, skip_crash_collection=False, 780 job_labels='', use_packaging=True): 781 # for a normal job, make sure the uncollected logs file exists 782 # for a crashinfo-only run it should already exist, bail out otherwise 783 created_uncollected_logs = False 784 logging.info("I am PID %s", os.getpid()) 785 if self.resultdir and not os.path.exists(self._uncollected_log_file): 786 if only_collect_crashinfo: 787 # if this is a crashinfo-only run, and there were no existing 788 # uncollected logs, just bail out early 789 logging.info("No existing uncollected logs, " 790 "skipping crashinfo collection") 791 return 792 else: 793 log_file = open(self._uncollected_log_file, "wb") 794 pickle.dump([], log_file) 795 log_file.close() 796 created_uncollected_logs = True 797 798 # use a copy so changes don't affect the original dictionary 799 namespace = namespace.copy() 800 machines = self.machines 801 if control is None: 802 if self.control is None: 803 control = '' 804 elif self._use_client_trampoline: 805 # Some tests must be loaded and staged before they can be run, 806 # see crbug.com/883403#c42 and #c46 for details. 807 control = self._load_control_file( 808 CLIENT_TRAMPOLINE_CONTROL_FILE) 809 # repr of a string is safe for eval. 810 control = (('trampoline_testname = %r\n' % str(self.control)) 811 + control) 812 else: 813 control = self._load_control_file(self.control) 814 if control_file_dir is None: 815 control_file_dir = self.resultdir 816 817 self.aborted = False 818 namespace.update(self._make_namespace()) 819 namespace.update({ 820 'args': self.args, 821 'job_labels': job_labels, 822 'gtest_runner': site_gtest_runner.gtest_runner(), 823 }) 824 test_start_time = int(time.time()) 825 826 if self.resultdir: 827 os.chdir(self.resultdir) 828 # touch status.log so that the parser knows a job is running here 829 open(self.get_status_log_path(), 'a').close() 830 self.enable_external_logging() 831 832 collect_crashinfo = True 833 temp_control_file_dir = None 834 try: 835 try: 836 if not self.fast: 837 with metrics.SecondsTimer( 838 'chromeos/autotest/job/get_network_stats', 839 fields = {'stage': 'start'}): 840 namespace['network_stats_label'] = 'at-start' 841 self._execute_code(GET_NETWORK_STATS_CONTROL_FILE, 842 namespace) 843 844 if only_collect_crashinfo: 845 return 846 847 # If the verify_job_repo_url option is set but we're unable 848 # to actually verify that the job_repo_url contains the autotest 849 # package, this job will fail. 850 if verify_job_repo_url: 851 self._execute_code(VERIFY_JOB_REPO_URL_CONTROL_FILE, 852 namespace) 853 else: 854 logging.warning('Not checking if job_repo_url contains ' 855 'autotest packages on %s', machines) 856 857 # determine the dir to write the control files to 858 cfd_specified = (control_file_dir 859 and control_file_dir is not self._USE_TEMP_DIR) 860 if cfd_specified: 861 temp_control_file_dir = None 862 else: 863 temp_control_file_dir = tempfile.mkdtemp( 864 suffix='temp_control_file_dir') 865 control_file_dir = temp_control_file_dir 866 server_control_file = os.path.join(control_file_dir, 867 self._control_filename) 868 client_control_file = os.path.join(control_file_dir, 869 CLIENT_CONTROL_FILENAME) 870 if self._client: 871 namespace['control'] = control 872 utils.open_write_close(client_control_file, control) 873 shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE, 874 server_control_file) 875 else: 876 utils.open_write_close(server_control_file, control) 877 878 sync_dir = self._offload_dir_target_path() 879 if self._sync_offload_dir: 880 logging.info("Preparing synchronous offload dir") 881 self.create_marker_file() 882 logging.info("Offload dir and marker file ready") 883 logging.debug("Results dir is %s", self.resultdir) 884 logging.debug("Synchronous offload dir is %s", sync_dir) 885 logging.info("Processing control file") 886 if self._companion_hosts: 887 namespace['companion_hosts'] = self._companion_hosts 888 if self._dut_servers: 889 namespace['dut_servers'] = self._dut_servers 890 namespace['use_packaging'] = use_packaging 891 namespace['synchronous_offload_dir'] = sync_dir 892 namespace['extended_timeout'] = self.extended_timeout 893 namespace['is_cft'] = self._is_cft 894 os.environ[OFFLOAD_ENVVAR] = sync_dir 895 self._execute_code(server_control_file, namespace) 896 logging.info("Finished processing control file") 897 self._maybe_retrieve_client_offload_dirs() 898 # If no device error occurred, no need to collect crashinfo. 899 collect_crashinfo = self.failed_with_device_error 900 except Exception as e: 901 try: 902 logging.exception( 903 'Exception escaped control file, job aborting:') 904 reason = re.sub(base_job.status_log_entry.BAD_CHAR_REGEX, 905 ' ', str(e)) 906 self.record('INFO', None, None, str(e), 907 {'job_abort_reason': reason}) 908 except: 909 pass # don't let logging exceptions here interfere 910 raise 911 finally: 912 if temp_control_file_dir: 913 # Clean up temp directory used for copies of the control files 914 try: 915 shutil.rmtree(temp_control_file_dir) 916 except Exception as e: 917 logging.warning('Could not remove temp directory %s: %s', 918 temp_control_file_dir, e) 919 920 if machines and (collect_crashdumps or collect_crashinfo): 921 if skip_crash_collection or self.fast: 922 logging.info('Skipping crash dump/info collection ' 923 'as requested.') 924 else: 925 with metrics.SecondsTimer( 926 'chromeos/autotest/job/collect_crashinfo'): 927 namespace['test_start_time'] = test_start_time 928 # Remove crash files for passing tests. 929 # TODO(ayatane): Tests that create crash files should be 930 # reported. 931 namespace['has_failed_tests'] = self._has_failed_tests() 932 self._collect_crashes(namespace, collect_crashinfo) 933 self.disable_external_logging() 934 if self._uncollected_log_file and created_uncollected_logs: 935 os.remove(self._uncollected_log_file) 936 937 if not self.fast: 938 with metrics.SecondsTimer( 939 'chromeos/autotest/job/get_network_stats', 940 fields = {'stage': 'end'}): 941 namespace['network_stats_label'] = 'at-end' 942 self._execute_code(GET_NETWORK_STATS_CONTROL_FILE, 943 namespace) 944 945 def _server_offload_dir_path(self): 946 return os.path.join(self.resultdir, self._sync_offload_dir) 947 948 def _offload_dir_target_path(self): 949 if not self._sync_offload_dir: 950 return '' 951 if self._client: 952 return os.path.join(DUT_STATEFUL_PATH, self._sync_offload_dir) 953 return os.path.join(self.resultdir, self._sync_offload_dir) 954 955 def _maybe_retrieve_client_offload_dirs(self): 956 if not(self._sync_offload_dir and self._client): 957 logging.info("No client dir to retrieve.") 958 return '' 959 logging.info("Retrieving synchronous offload dir from client") 960 server_path = self._server_offload_dir_path() 961 client_path = self._offload_dir_target_path() 962 def serial(machine): 963 host = hosts.create_host(machine) 964 server_subpath = os.path.join(server_path, host.hostname) 965 # Empty final piece ensures that get_file gets a trailing slash, 966 # which makes it copy dir contents rather than the dir itself. 967 client_subpath = os.path.join(client_path, '') 968 logging.debug("Client dir to retrieve is %s", client_subpath) 969 os.makedirs(server_subpath) 970 host.get_file(client_subpath, server_subpath) 971 self.parallel_simple(serial, self.machines) 972 logging.debug("Synchronous offload dir retrieved to %s", server_path) 973 return server_path 974 975 def _create_client_offload_dirs(self): 976 marker_string = "client %s%s" % ( 977 "in SSP " if utils.is_in_container() else "", 978 str(datetime.utcnow()) 979 ) 980 offload_path = self._offload_dir_target_path() 981 _, file_path = tempfile.mkstemp() 982 def serial(machine): 983 host = hosts.create_host(machine) 984 marker_path = os.path.join(offload_path, "sync_offloads_marker") 985 host.run(("mkdir -p %s" % offload_path), ignore_status=False) 986 host.send_file(file_path, marker_path) 987 988 try: 989 utils.open_write_close(file_path, marker_string) 990 self.parallel_simple(serial, self.machines) 991 finally: 992 os.remove(file_path) 993 994 995 def create_marker_file(self): 996 """Create a marker file in the leaf task's synchronous offload dir. 997 998 This ensures that we will get some results offloaded if the test fails 999 to create output properly, distinguishing offload errs from test errs. 1000 @obj_param _client: Boolean, whether the control file is client-side. 1001 @obj_param _sync_offload_dir: rel. path from results dir to offload dir. 1002 1003 @returns: path to offload dir on the machine of the leaf task 1004 """ 1005 # Make the server-side directory regardless 1006 try: 1007 # 2.7 makedirs doesn't have an option for pre-existing directories 1008 os.makedirs(self._server_offload_dir_path()) 1009 except OSError as e: 1010 if e.errno != errno.EEXIST: 1011 raise 1012 if not self._client: 1013 offload_path = self._offload_dir_target_path() 1014 marker_string = "server %s%s" % ( 1015 "in SSP " if utils.is_in_container() else "", 1016 str(datetime.utcnow()) 1017 ) 1018 utils.open_write_close( 1019 os.path.join(offload_path, "sync_offloads_marker"), 1020 marker_string 1021 ) 1022 return offload_path 1023 return self._create_client_offload_dirs() 1024 1025 def run_test(self, url, *args, **dargs): 1026 """ 1027 Summon a test object and run it. 1028 1029 tag 1030 tag to add to testname 1031 url 1032 url of the test to run 1033 """ 1034 if self._disable_sysinfo: 1035 dargs['disable_sysinfo'] = True 1036 1037 group, testname = self.pkgmgr.get_package_name(url, 'test') 1038 testname, subdir, tag = self._build_tagged_test_name(testname, dargs) 1039 outputdir = self._make_test_outputdir(subdir) 1040 1041 def group_func(): 1042 try: 1043 test.runtest(self, url, tag, args, dargs) 1044 except error.TestBaseException as e: 1045 self.record(e.exit_status, subdir, testname, str(e)) 1046 raise 1047 except Exception as e: 1048 info = str(e) + "\n" + traceback.format_exc() 1049 self.record('FAIL', subdir, testname, info) 1050 raise 1051 else: 1052 self.record('GOOD', subdir, testname, 'completed successfully') 1053 1054 try: 1055 result = self._run_group(testname, subdir, group_func) 1056 except error.TestBaseException as e: 1057 return False 1058 else: 1059 return True 1060 1061 1062 def _run_group(self, name, subdir, function, *args, **dargs): 1063 """Underlying method for running something inside of a group.""" 1064 result, exc_info = None, None 1065 try: 1066 self.record('START', subdir, name) 1067 result = function(*args, **dargs) 1068 except error.TestBaseException as e: 1069 self.record("END %s" % e.exit_status, subdir, name) 1070 raise 1071 except Exception as e: 1072 err_msg = str(e) + '\n' 1073 err_msg += traceback.format_exc() 1074 self.record('END ABORT', subdir, name, err_msg) 1075 raise error.JobError(name + ' failed\n' + traceback.format_exc()) 1076 else: 1077 self.record('END GOOD', subdir, name) 1078 finally: 1079 for hook in self._post_run_hooks: 1080 hook() 1081 1082 return result 1083 1084 1085 def run_group(self, function, *args, **dargs): 1086 """\ 1087 @param function: subroutine to run 1088 @returns: (result, exc_info). When the call succeeds, result contains 1089 the return value of |function| and exc_info is None. If 1090 |function| raises an exception, exc_info contains the tuple 1091 returned by sys.exc_info(), and result is None. 1092 """ 1093 1094 name = function.__name__ 1095 # Allow the tag for the group to be specified. 1096 tag = dargs.pop('tag', None) 1097 if tag: 1098 name = tag 1099 1100 try: 1101 result = self._run_group(name, None, function, *args, **dargs)[0] 1102 except error.TestBaseException: 1103 return None, sys.exc_info() 1104 return result, None 1105 1106 1107 def run_op(self, op, op_func, get_kernel_func): 1108 """\ 1109 A specialization of run_group meant specifically for handling 1110 management operation. Includes support for capturing the kernel version 1111 after the operation. 1112 1113 Args: 1114 op: name of the operation. 1115 op_func: a function that carries out the operation (reboot, suspend) 1116 get_kernel_func: a function that returns a string 1117 representing the kernel version. 1118 """ 1119 try: 1120 self.record('START', None, op) 1121 op_func() 1122 except Exception as e: 1123 err_msg = str(e) + '\n' + traceback.format_exc() 1124 self.record('END FAIL', None, op, err_msg) 1125 raise 1126 else: 1127 kernel = get_kernel_func() 1128 self.record('END GOOD', None, op, 1129 optional_fields={"kernel": kernel}) 1130 1131 1132 def run_control(self, path): 1133 """Execute a control file found at path (relative to the autotest 1134 path). Intended for executing a control file within a control file, 1135 not for running the top-level job control file.""" 1136 path = os.path.join(self.autodir, path) 1137 control_file = self._load_control_file(path) 1138 self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR) 1139 1140 1141 def add_sysinfo_command(self, command, logfile=None, on_every_test=False): 1142 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile), 1143 on_every_test) 1144 1145 1146 def add_sysinfo_logfile(self, file, on_every_test=False): 1147 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test) 1148 1149 1150 def _add_sysinfo_loggable(self, loggable, on_every_test): 1151 if on_every_test: 1152 self.sysinfo.test_loggables.add(loggable) 1153 else: 1154 self.sysinfo.boot_loggables.add(loggable) 1155 1156 1157 def _read_warnings(self): 1158 """Poll all the warning loggers and extract any new warnings that have 1159 been logged. If the warnings belong to a category that is currently 1160 disabled, this method will discard them and they will no longer be 1161 retrievable. 1162 1163 Returns a list of (timestamp, message) tuples, where timestamp is an 1164 integer epoch timestamp.""" 1165 warnings = [] 1166 while True: 1167 # pull in a line of output from every logger that has 1168 # output ready to be read 1169 loggers, _, _ = select.select(self.warning_loggers, [], [], 0) 1170 closed_loggers = set() 1171 for logger in loggers: 1172 line = logger.readline() 1173 # record any broken pipes (aka line == empty) 1174 if len(line) == 0: 1175 closed_loggers.add(logger) 1176 continue 1177 # parse out the warning 1178 timestamp, msgtype, msg = line.split('\t', 2) 1179 timestamp = int(timestamp) 1180 # if the warning is valid, add it to the results 1181 if self.warning_manager.is_valid(timestamp, msgtype): 1182 warnings.append((timestamp, msg.strip())) 1183 1184 # stop listening to loggers that are closed 1185 self.warning_loggers -= closed_loggers 1186 1187 # stop if none of the loggers have any output left 1188 if not loggers: 1189 break 1190 1191 # sort into timestamp order 1192 warnings.sort() 1193 return warnings 1194 1195 1196 def _unique_subdirectory(self, base_subdirectory_name): 1197 """Compute a unique results subdirectory based on the given name. 1198 1199 Appends base_subdirectory_name with a number as necessary to find a 1200 directory name that doesn't already exist. 1201 """ 1202 subdirectory = base_subdirectory_name 1203 counter = 1 1204 while os.path.exists(os.path.join(self.resultdir, subdirectory)): 1205 subdirectory = base_subdirectory_name + '.' + str(counter) 1206 counter += 1 1207 return subdirectory 1208 1209 1210 def get_record_context(self): 1211 """Returns an object representing the current job.record context. 1212 1213 The object returned is an opaque object with a 0-arg restore method 1214 which can be called to restore the job.record context (i.e. indentation) 1215 to the current level. The intention is that it should be used when 1216 something external which generate job.record calls (e.g. an autotest 1217 client) can fail catastrophically and the server job record state 1218 needs to be reset to its original "known good" state. 1219 1220 @return: A context object with a 0-arg restore() method.""" 1221 return self._indenter.get_context() 1222 1223 1224 def record_summary(self, status_code, test_name, reason='', attributes=None, 1225 distinguishing_attributes=(), child_test_ids=None): 1226 """Record a summary test result. 1227 1228 @param status_code: status code string, see 1229 common_lib.log.is_valid_status() 1230 @param test_name: name of the test 1231 @param reason: (optional) string providing detailed reason for test 1232 outcome 1233 @param attributes: (optional) dict of string keyvals to associate with 1234 this result 1235 @param distinguishing_attributes: (optional) list of attribute names 1236 that should be used to distinguish identically-named test 1237 results. These attributes should be present in the attributes 1238 parameter. This is used to generate user-friendly subdirectory 1239 names. 1240 @param child_test_ids: (optional) list of test indices for test results 1241 used in generating this result. 1242 """ 1243 subdirectory_name_parts = [test_name] 1244 for attribute in distinguishing_attributes: 1245 assert attributes 1246 assert attribute in attributes, '%s not in %s' % (attribute, 1247 attributes) 1248 subdirectory_name_parts.append(attributes[attribute]) 1249 base_subdirectory_name = '.'.join(subdirectory_name_parts) 1250 1251 subdirectory = self._unique_subdirectory(base_subdirectory_name) 1252 subdirectory_path = os.path.join(self.resultdir, subdirectory) 1253 os.mkdir(subdirectory_path) 1254 1255 self.record(status_code, subdirectory, test_name, 1256 status=reason, optional_fields={'is_summary': True}) 1257 1258 if attributes: 1259 utils.write_keyval(subdirectory_path, attributes) 1260 1261 if child_test_ids: 1262 ids_string = ','.join(str(test_id) for test_id in child_test_ids) 1263 summary_data = {'child_test_ids': ids_string} 1264 utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'), 1265 summary_data) 1266 1267 1268 def add_post_run_hook(self, hook): 1269 """ 1270 Registers a hook to run after the main job function. 1271 1272 This provides a mechanism by which tests that perform multiple tests of 1273 their own can write additional top-level results to the TKO status.log 1274 file. 1275 1276 @param hook: Function to invoke (without any args) after the main job 1277 function completes and the job status is logged. 1278 """ 1279 self._post_run_hooks.append(hook) 1280 1281 1282 def disable_warnings(self, warning_type): 1283 self.warning_manager.disable_warnings(warning_type) 1284 self.record("INFO", None, None, 1285 "disabling %s warnings" % warning_type, 1286 {"warnings.disable": warning_type}) 1287 1288 1289 def enable_warnings(self, warning_type): 1290 self.warning_manager.enable_warnings(warning_type) 1291 self.record("INFO", None, None, 1292 "enabling %s warnings" % warning_type, 1293 {"warnings.enable": warning_type}) 1294 1295 1296 def get_status_log_path(self, subdir=None): 1297 """Return the path to the job status log. 1298 1299 @param subdir - Optional paramter indicating that you want the path 1300 to a subdirectory status log. 1301 1302 @returns The path where the status log should be. 1303 """ 1304 if self.resultdir: 1305 if subdir: 1306 return os.path.join(self.resultdir, subdir, "status.log") 1307 else: 1308 return os.path.join(self.resultdir, "status.log") 1309 else: 1310 return None 1311 1312 1313 def _update_uncollected_logs_list(self, update_func): 1314 """Updates the uncollected logs list in a multi-process safe manner. 1315 1316 @param update_func - a function that updates the list of uncollected 1317 logs. Should take one parameter, the list to be updated. 1318 """ 1319 # Skip log collection if file _uncollected_log_file does not exist. 1320 if not (self._uncollected_log_file and 1321 os.path.exists(self._uncollected_log_file)): 1322 return 1323 if self._uncollected_log_file: 1324 log_file = open(self._uncollected_log_file, "rb+") 1325 fcntl.flock(log_file, fcntl.LOCK_EX) 1326 try: 1327 uncollected_logs = pickle.load(log_file) 1328 update_func(uncollected_logs) 1329 log_file.seek(0) 1330 log_file.truncate() 1331 pickle.dump(uncollected_logs, log_file) 1332 log_file.flush() 1333 finally: 1334 fcntl.flock(log_file, fcntl.LOCK_UN) 1335 log_file.close() 1336 1337 1338 def add_client_log(self, hostname, remote_path, local_path): 1339 """Adds a new set of client logs to the list of uncollected logs, 1340 to allow for future log recovery. 1341 1342 @param host - the hostname of the machine holding the logs 1343 @param remote_path - the directory on the remote machine holding logs 1344 @param local_path - the local directory to copy the logs into 1345 """ 1346 def update_func(logs_list): 1347 logs_list.append((hostname, remote_path, local_path)) 1348 self._update_uncollected_logs_list(update_func) 1349 1350 1351 def remove_client_log(self, hostname, remote_path, local_path): 1352 """Removes a set of client logs from the list of uncollected logs, 1353 to allow for future log recovery. 1354 1355 @param host - the hostname of the machine holding the logs 1356 @param remote_path - the directory on the remote machine holding logs 1357 @param local_path - the local directory to copy the logs into 1358 """ 1359 def update_func(logs_list): 1360 logs_list.remove((hostname, remote_path, local_path)) 1361 self._update_uncollected_logs_list(update_func) 1362 1363 1364 def get_client_logs(self): 1365 """Retrieves the list of uncollected logs, if it exists. 1366 1367 @returns A list of (host, remote_path, local_path) tuples. Returns 1368 an empty list if no uncollected logs file exists. 1369 """ 1370 log_exists = (self._uncollected_log_file and 1371 os.path.exists(self._uncollected_log_file)) 1372 if log_exists: 1373 return pickle.load(open(self._uncollected_log_file)) 1374 else: 1375 return [] 1376 1377 1378 def _fill_server_control_namespace(self, namespace, protect=True): 1379 """ 1380 Prepare a namespace to be used when executing server control files. 1381 1382 This sets up the control file API by importing modules and making them 1383 available under the appropriate names within namespace. 1384 1385 For use by _execute_code(). 1386 1387 Args: 1388 namespace: The namespace dictionary to fill in. 1389 protect: Boolean. If True (the default) any operation that would 1390 clobber an existing entry in namespace will cause an error. 1391 Raises: 1392 error.AutoservError: When a name would be clobbered by import. 1393 """ 1394 def _import_names(module_name, names=()): 1395 """ 1396 Import a module and assign named attributes into namespace. 1397 1398 Args: 1399 module_name: The string module name. 1400 names: A limiting list of names to import from module_name. If 1401 empty (the default), all names are imported from the module 1402 similar to a "from foo.bar import *" statement. 1403 Raises: 1404 error.AutoservError: When a name being imported would clobber 1405 a name already in namespace. 1406 """ 1407 module = __import__(module_name, {}, {}, names) 1408 1409 # No names supplied? Import * from the lowest level module. 1410 # (Ugh, why do I have to implement this part myself?) 1411 if not names: 1412 for submodule_name in module_name.split('.')[1:]: 1413 module = getattr(module, submodule_name) 1414 if hasattr(module, '__all__'): 1415 names = getattr(module, '__all__') 1416 else: 1417 names = dir(module) 1418 1419 # Install each name into namespace, checking to make sure it 1420 # doesn't override anything that already exists. 1421 for name in names: 1422 # Check for conflicts to help prevent future problems. 1423 if name in namespace and protect: 1424 if namespace[name] is not getattr(module, name): 1425 raise error.AutoservError('importing name ' 1426 '%s from %s %r would override %r' % 1427 (name, module_name, getattr(module, name), 1428 namespace[name])) 1429 else: 1430 # Encourage cleanliness and the use of __all__ for a 1431 # more concrete API with less surprises on '*' imports. 1432 warnings.warn('%s (%r) being imported from %s for use ' 1433 'in server control files is not the ' 1434 'first occurrence of that import.' % 1435 (name, namespace[name], module_name)) 1436 1437 namespace[name] = getattr(module, name) 1438 1439 1440 # This is the equivalent of prepending a bunch of import statements to 1441 # the front of the control script. 1442 namespace.update(os=os, sys=sys, logging=logging) 1443 _import_names('autotest_lib.server', 1444 ('hosts', 'autotest', 'standalone_profiler')) 1445 _import_names('autotest_lib.server.subcommand', 1446 ('parallel', 'parallel_simple', 'subcommand')) 1447 _import_names('autotest_lib.server.utils', 1448 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine')) 1449 _import_names('autotest_lib.client.common_lib.error') 1450 _import_names('autotest_lib.client.common_lib.barrier', ('barrier',)) 1451 1452 # Inject ourself as the job object into other classes within the API. 1453 # (Yuck, this injection is a gross thing be part of a public API. -gps) 1454 # 1455 # XXX Autotest does not appear to use .job. Who does? 1456 namespace['autotest'].Autotest.job = self 1457 # server.hosts.base_classes.Host uses .job. 1458 namespace['hosts'].Host.job = self 1459 namespace['hosts'].factory.ssh_user = self._ssh_user 1460 namespace['hosts'].factory.ssh_port = self._ssh_port 1461 namespace['hosts'].factory.ssh_pass = self._ssh_pass 1462 namespace['hosts'].factory.ssh_verbosity_flag = ( 1463 self._ssh_verbosity_flag) 1464 namespace['hosts'].factory.ssh_options = self._ssh_options 1465 1466 1467 def _execute_code(self, code_file, namespace, protect=True): 1468 """ 1469 Execute code using a copy of namespace as a server control script. 1470 1471 Unless protect_namespace is explicitly set to False, the dict will not 1472 be modified. 1473 1474 Args: 1475 code_file: The filename of the control file to execute. 1476 namespace: A dict containing names to make available during execution. 1477 protect: Boolean. If True (the default) a copy of the namespace dict 1478 is used during execution to prevent the code from modifying its 1479 contents outside of this function. If False the raw dict is 1480 passed in and modifications will be allowed. 1481 """ 1482 if protect: 1483 namespace = namespace.copy() 1484 self._fill_server_control_namespace(namespace, protect=protect) 1485 # TODO: Simplify and get rid of the special cases for only 1 machine. 1486 if len(self.machines) > 1: 1487 machines_text = '\n'.join(self.machines) + '\n' 1488 # Only rewrite the file if it does not match our machine list. 1489 try: 1490 machines_f = open(MACHINES_FILENAME, 'r') 1491 existing_machines_text = machines_f.read() 1492 machines_f.close() 1493 except EnvironmentError: 1494 existing_machines_text = None 1495 if machines_text != existing_machines_text: 1496 utils.open_write_close(MACHINES_FILENAME, machines_text) 1497 seven.exec_file(code_file, locals_=namespace, globals_=namespace) 1498 1499 1500 def preprocess_client_state(self): 1501 """ 1502 Produce a state file for initializing the state of a client job. 1503 1504 Creates a new client state file with all the current server state, as 1505 well as some pre-set client state. 1506 1507 @returns The path of the file the state was written into. 1508 """ 1509 # initialize the sysinfo state 1510 self._state.set('client', 'sysinfo', self.sysinfo.serialize()) 1511 1512 # dump the state out to a tempfile 1513 fd, file_path = tempfile.mkstemp(dir=self.tmpdir) 1514 os.close(fd) 1515 1516 # write_to_file doesn't need locking, we exclusively own file_path 1517 self._state.write_to_file(file_path) 1518 return file_path 1519 1520 1521 def postprocess_client_state(self, state_path): 1522 """ 1523 Update the state of this job with the state from a client job. 1524 1525 Updates the state of the server side of a job with the final state 1526 of a client job that was run. Updates the non-client-specific state, 1527 pulls in some specific bits from the client-specific state, and then 1528 discards the rest. Removes the state file afterwards 1529 1530 @param state_file A path to the state file from the client. 1531 """ 1532 # update the on-disk state 1533 try: 1534 self._state.read_from_file(state_path) 1535 os.remove(state_path) 1536 except OSError as e: 1537 # ignore file-not-found errors 1538 if e.errno != errno.ENOENT: 1539 raise 1540 else: 1541 logging.debug('Client state file %s not found', state_path) 1542 1543 # update the sysinfo state 1544 if self._state.has('client', 'sysinfo'): 1545 self.sysinfo.deserialize(self._state.get('client', 'sysinfo')) 1546 1547 # drop all the client-specific state 1548 self._state.discard_namespace('client') 1549 1550 1551 def clear_all_known_hosts(self): 1552 """Clears known hosts files for all AbstractSSHHosts.""" 1553 for host in self.hosts: 1554 if isinstance(host, abstract_ssh.AbstractSSHHost): 1555 host.clear_known_hosts() 1556 1557 1558 def close(self): 1559 """Closes this job's operation.""" 1560 1561 # Use shallow copy, because host.close() internally discards itself. 1562 for host in list(self.hosts): 1563 host.close() 1564 assert not self.hosts 1565 self._connection_pool.shutdown() 1566 1567 1568 def _get_job_data(self): 1569 """Add custom data to the job keyval info. 1570 1571 When multiple machines are used in a job, change the hostname to 1572 the platform of the first machine instead of machine1,machine2,... This 1573 makes the job reports easier to read and keeps the tko_machines table from 1574 growing too large. 1575 1576 Returns: 1577 keyval dictionary with new hostname value, or empty dictionary. 1578 """ 1579 job_data = {} 1580 # Only modify hostname on multimachine jobs. Assume all host have the same 1581 # platform. 1582 if len(self.machines) > 1: 1583 # Search through machines for first machine with a platform. 1584 for host in self.machines: 1585 keyval_path = os.path.join(self.resultdir, 'host_keyvals', host) 1586 keyvals = utils.read_keyval(keyval_path) 1587 host_plat = keyvals.get('platform', None) 1588 if not host_plat: 1589 continue 1590 job_data['hostname'] = host_plat 1591 break 1592 return job_data 1593 1594 1595class warning_manager(object): 1596 """Class for controlling warning logs. Manages the enabling and disabling 1597 of warnings.""" 1598 def __init__(self): 1599 # a map of warning types to a list of disabled time intervals 1600 self.disabled_warnings = {} 1601 1602 1603 def is_valid(self, timestamp, warning_type): 1604 """Indicates if a warning (based on the time it occured and its type) 1605 is a valid warning. A warning is considered "invalid" if this type of 1606 warning was marked as "disabled" at the time the warning occured.""" 1607 disabled_intervals = self.disabled_warnings.get(warning_type, []) 1608 for start, end in disabled_intervals: 1609 if timestamp >= start and (end is None or timestamp < end): 1610 return False 1611 return True 1612 1613 1614 def disable_warnings(self, warning_type, current_time_func=time.time): 1615 """As of now, disables all further warnings of this type.""" 1616 intervals = self.disabled_warnings.setdefault(warning_type, []) 1617 if not intervals or intervals[-1][1] is not None: 1618 intervals.append((int(current_time_func()), None)) 1619 1620 1621 def enable_warnings(self, warning_type, current_time_func=time.time): 1622 """As of now, enables all further warnings of this type.""" 1623 intervals = self.disabled_warnings.get(warning_type, []) 1624 if intervals and intervals[-1][1] is None: 1625 intervals[-1] = (intervals[-1][0], int(current_time_func())) 1626 1627 1628def _is_current_server_job(test): 1629 """Return True if parsed test is the currently running job. 1630 1631 @param test: test instance from tko parser. 1632 """ 1633 return test.testname == 'SERVER_JOB' 1634 1635 1636def _create_afe_host(hostname): 1637 """Create an afe_host object backed by the AFE. 1638 1639 @param hostname: Name of the host for which we want the Host object. 1640 @returns: An object of type frontend.AFE 1641 """ 1642 afe = frontend_wrappers.RetryingAFE(timeout_min=5, delay_sec=10) 1643 hosts = afe.get_hosts(hostname=hostname) 1644 if not hosts: 1645 raise error.AutoservError('No hosts named %s found' % hostname) 1646 1647 return hosts[0] 1648 1649 1650def _create_file_backed_host_info_store(store_dir, hostname): 1651 """Create a CachingHostInfoStore backed by an existing file. 1652 1653 @param store_dir: A directory to contain store backing files. 1654 @param hostname: Name of the host for which we want the store. 1655 1656 @returns: An object of type CachingHostInfoStore. 1657 """ 1658 backing_file_path = os.path.join(store_dir, '%s.store' % hostname) 1659 if not os.path.isfile(backing_file_path): 1660 raise error.AutoservError( 1661 'Requested FileStore but no backing file at %s' 1662 % backing_file_path 1663 ) 1664 return file_store.FileStore(backing_file_path) 1665 1666 1667def _create_afe_backed_host_info_store(store_dir, hostname): 1668 """Create a CachingHostInfoStore backed by the AFE. 1669 1670 @param store_dir: A directory to contain store backing files. 1671 @param hostname: Name of the host for which we want the store. 1672 1673 @returns: An object of type CachingHostInfoStore. 1674 """ 1675 primary_store = afe_store.AfeStore(hostname) 1676 try: 1677 primary_store.get(force_refresh=True) 1678 except host_info.StoreError: 1679 raise error.AutoservError( 1680 'Could not obtain HostInfo for hostname %s' % hostname) 1681 # Since the store wasn't initialized external to autoserv, we must 1682 # ensure that the store we create is unique within store_dir. 1683 backing_file_path = os.path.join( 1684 _make_unique_subdir(store_dir), 1685 '%s.store' % hostname, 1686 ) 1687 logging.info('Shadowing AFE store with a FileStore at %s', 1688 backing_file_path) 1689 shadow_store = file_store.FileStore(backing_file_path) 1690 return shadowing_store.ShadowingStore(primary_store, shadow_store) 1691 1692 1693def _make_unique_subdir(workdir): 1694 """Creates a new subdir within workdir and returns the path to it.""" 1695 store_dir = os.path.join(workdir, 'dir_%s' % uuid.uuid4()) 1696 _make_dirs_if_needed(store_dir) 1697 return store_dir 1698 1699 1700def _make_dirs_if_needed(path): 1701 """os.makedirs, but ignores failure because the leaf directory exists""" 1702 try: 1703 os.makedirs(path) 1704 except OSError as e: 1705 if e.errno != errno.EEXIST: 1706 raise 1707