1*9c5db199SXin Li# pylint: disable=missing-docstring 2*9c5db199SXin Li""" 3*9c5db199SXin LiUtility functions for rpc_interface.py. We keep them in a separate file so that 4*9c5db199SXin Lionly RPC interface functions go into that file. 5*9c5db199SXin Li""" 6*9c5db199SXin Li 7*9c5db199SXin Li__author__ = '[email protected] (Steve Howard)' 8*9c5db199SXin Li 9*9c5db199SXin Liimport collections 10*9c5db199SXin Liimport datetime 11*9c5db199SXin Liimport inspect 12*9c5db199SXin Liimport logging 13*9c5db199SXin Liimport os 14*9c5db199SXin Lifrom functools import wraps 15*9c5db199SXin Li 16*9c5db199SXin Liimport django.db.utils 17*9c5db199SXin Liimport django.http 18*9c5db199SXin Liimport six 19*9c5db199SXin Lifrom autotest_lib.client.common_lib import (control_data, error, global_config, 20*9c5db199SXin Li time_utils) 21*9c5db199SXin Lifrom autotest_lib.client.common_lib.cros import dev_server 22*9c5db199SXin Lifrom autotest_lib.frontend import thread_local 23*9c5db199SXin Lifrom autotest_lib.frontend.afe import model_logic, models 24*9c5db199SXin Lifrom autotest_lib.server import utils as server_utils 25*9c5db199SXin Lifrom autotest_lib.server.cros import provision 26*9c5db199SXin Lifrom autotest_lib.server.cros.dynamic_suite import frontend_wrappers 27*9c5db199SXin Li 28*9c5db199SXin LiNULL_DATETIME = datetime.datetime.max 29*9c5db199SXin LiNULL_DATE = datetime.date.max 30*9c5db199SXin LiDUPLICATE_KEY_MSG = 'Duplicate entry' 31*9c5db199SXin LiRESPECT_STATIC_LABELS = global_config.global_config.get_config_value( 32*9c5db199SXin Li 'SKYLAB', 'respect_static_labels', type=bool, default=False) 33*9c5db199SXin Li 34*9c5db199SXin Lidef prepare_for_serialization(objects): 35*9c5db199SXin Li """ 36*9c5db199SXin Li Prepare Python objects to be returned via RPC. 37*9c5db199SXin Li @param objects: objects to be prepared. 38*9c5db199SXin Li """ 39*9c5db199SXin Li if (isinstance(objects, list) and len(objects) and 40*9c5db199SXin Li isinstance(objects[0], dict) and 'id' in objects[0]): 41*9c5db199SXin Li objects = _gather_unique_dicts(objects) 42*9c5db199SXin Li return _prepare_data(objects) 43*9c5db199SXin Li 44*9c5db199SXin Li 45*9c5db199SXin Lidef prepare_rows_as_nested_dicts(query, nested_dict_column_names): 46*9c5db199SXin Li """ 47*9c5db199SXin Li Prepare a Django query to be returned via RPC as a sequence of nested 48*9c5db199SXin Li dictionaries. 49*9c5db199SXin Li 50*9c5db199SXin Li @param query - A Django model query object with a select_related() method. 51*9c5db199SXin Li @param nested_dict_column_names - A list of column/attribute names for the 52*9c5db199SXin Li rows returned by query to expand into nested dictionaries using 53*9c5db199SXin Li their get_object_dict() method when not None. 54*9c5db199SXin Li 55*9c5db199SXin Li @returns An list suitable to returned in an RPC. 56*9c5db199SXin Li """ 57*9c5db199SXin Li all_dicts = [] 58*9c5db199SXin Li for row in query.select_related(): 59*9c5db199SXin Li row_dict = row.get_object_dict() 60*9c5db199SXin Li for column in nested_dict_column_names: 61*9c5db199SXin Li if row_dict[column] is not None: 62*9c5db199SXin Li row_dict[column] = getattr(row, column).get_object_dict() 63*9c5db199SXin Li all_dicts.append(row_dict) 64*9c5db199SXin Li return prepare_for_serialization(all_dicts) 65*9c5db199SXin Li 66*9c5db199SXin Li 67*9c5db199SXin Lidef _prepare_data(data): 68*9c5db199SXin Li """ 69*9c5db199SXin Li Recursively process data structures, performing necessary type 70*9c5db199SXin Li conversions to values in data to allow for RPC serialization: 71*9c5db199SXin Li -convert datetimes to strings 72*9c5db199SXin Li -convert tuples and sets to lists 73*9c5db199SXin Li """ 74*9c5db199SXin Li if isinstance(data, dict): 75*9c5db199SXin Li new_data = {} 76*9c5db199SXin Li for key, value in six.iteritems(data): 77*9c5db199SXin Li new_data[key] = _prepare_data(value) 78*9c5db199SXin Li return new_data 79*9c5db199SXin Li elif (isinstance(data, list) or isinstance(data, tuple) or 80*9c5db199SXin Li isinstance(data, set)): 81*9c5db199SXin Li return [_prepare_data(item) for item in data] 82*9c5db199SXin Li elif isinstance(data, datetime.date): 83*9c5db199SXin Li if data is NULL_DATETIME or data is NULL_DATE: 84*9c5db199SXin Li return None 85*9c5db199SXin Li return str(data) 86*9c5db199SXin Li else: 87*9c5db199SXin Li return data 88*9c5db199SXin Li 89*9c5db199SXin Li 90*9c5db199SXin Lidef fetchall_as_list_of_dicts(cursor): 91*9c5db199SXin Li """ 92*9c5db199SXin Li Converts each row in the cursor to a dictionary so that values can be read 93*9c5db199SXin Li by using the column name. 94*9c5db199SXin Li @param cursor: The database cursor to read from. 95*9c5db199SXin Li @returns: A list of each row in the cursor as a dictionary. 96*9c5db199SXin Li """ 97*9c5db199SXin Li desc = cursor.description 98*9c5db199SXin Li return [ dict(zip([col[0] for col in desc], row)) 99*9c5db199SXin Li for row in cursor.fetchall() ] 100*9c5db199SXin Li 101*9c5db199SXin Li 102*9c5db199SXin Lidef raw_http_response(response_data, content_type=None): 103*9c5db199SXin Li response = django.http.HttpResponse(response_data, mimetype=content_type) 104*9c5db199SXin Li response['Content-length'] = str(len(response.content)) 105*9c5db199SXin Li return response 106*9c5db199SXin Li 107*9c5db199SXin Li 108*9c5db199SXin Lidef _gather_unique_dicts(dict_iterable): 109*9c5db199SXin Li """\ 110*9c5db199SXin Li Pick out unique objects (by ID) from an iterable of object dicts. 111*9c5db199SXin Li """ 112*9c5db199SXin Li objects = collections.OrderedDict() 113*9c5db199SXin Li for obj in dict_iterable: 114*9c5db199SXin Li objects.setdefault(obj['id'], obj) 115*9c5db199SXin Li return list(objects.values()) 116*9c5db199SXin Li 117*9c5db199SXin Li 118*9c5db199SXin Lidef extra_job_status_filters(not_yet_run=False, running=False, finished=False): 119*9c5db199SXin Li """\ 120*9c5db199SXin Li Generate a SQL WHERE clause for job status filtering, and return it in 121*9c5db199SXin Li a dict of keyword args to pass to query.extra(). 122*9c5db199SXin Li * not_yet_run: all HQEs are Queued 123*9c5db199SXin Li * finished: all HQEs are complete 124*9c5db199SXin Li * running: everything else 125*9c5db199SXin Li """ 126*9c5db199SXin Li if not (not_yet_run or running or finished): 127*9c5db199SXin Li return {} 128*9c5db199SXin Li not_queued = ('(SELECT job_id FROM afe_host_queue_entries ' 129*9c5db199SXin Li 'WHERE status != "%s")' 130*9c5db199SXin Li % models.HostQueueEntry.Status.QUEUED) 131*9c5db199SXin Li not_finished = ('(SELECT job_id FROM afe_host_queue_entries ' 132*9c5db199SXin Li 'WHERE not complete)') 133*9c5db199SXin Li 134*9c5db199SXin Li where = [] 135*9c5db199SXin Li if not_yet_run: 136*9c5db199SXin Li where.append('id NOT IN ' + not_queued) 137*9c5db199SXin Li if running: 138*9c5db199SXin Li where.append('(id IN %s) AND (id IN %s)' % (not_queued, not_finished)) 139*9c5db199SXin Li if finished: 140*9c5db199SXin Li where.append('id NOT IN ' + not_finished) 141*9c5db199SXin Li return {'where': [' OR '.join(['(%s)' % x for x in where])]} 142*9c5db199SXin Li 143*9c5db199SXin Li 144*9c5db199SXin Lidef extra_job_type_filters(extra_args, suite=False, 145*9c5db199SXin Li sub=False, standalone=False): 146*9c5db199SXin Li """\ 147*9c5db199SXin Li Generate a SQL WHERE clause for job status filtering, and return it in 148*9c5db199SXin Li a dict of keyword args to pass to query.extra(). 149*9c5db199SXin Li 150*9c5db199SXin Li param extra_args: a dict of existing extra_args. 151*9c5db199SXin Li 152*9c5db199SXin Li No more than one of the parameters should be passed as True: 153*9c5db199SXin Li * suite: job which is parent of other jobs 154*9c5db199SXin Li * sub: job with a parent job 155*9c5db199SXin Li * standalone: job with no child or parent jobs 156*9c5db199SXin Li """ 157*9c5db199SXin Li assert not ((suite and sub) or 158*9c5db199SXin Li (suite and standalone) or 159*9c5db199SXin Li (sub and standalone)), ('Cannot specify more than one ' 160*9c5db199SXin Li 'filter to this function') 161*9c5db199SXin Li 162*9c5db199SXin Li where = extra_args.get('where', []) 163*9c5db199SXin Li parent_job_id = ('DISTINCT parent_job_id') 164*9c5db199SXin Li child_job_id = ('id') 165*9c5db199SXin Li filter_common = ('(SELECT %s FROM afe_jobs ' 166*9c5db199SXin Li 'WHERE parent_job_id IS NOT NULL)') 167*9c5db199SXin Li 168*9c5db199SXin Li if suite: 169*9c5db199SXin Li where.append('id IN ' + filter_common % parent_job_id) 170*9c5db199SXin Li elif sub: 171*9c5db199SXin Li where.append('id IN ' + filter_common % child_job_id) 172*9c5db199SXin Li elif standalone: 173*9c5db199SXin Li where.append('NOT EXISTS (SELECT 1 from afe_jobs AS sub_query ' 174*9c5db199SXin Li 'WHERE parent_job_id IS NOT NULL' 175*9c5db199SXin Li ' AND (sub_query.parent_job_id=afe_jobs.id' 176*9c5db199SXin Li ' OR sub_query.id=afe_jobs.id))') 177*9c5db199SXin Li else: 178*9c5db199SXin Li return extra_args 179*9c5db199SXin Li 180*9c5db199SXin Li extra_args['where'] = where 181*9c5db199SXin Li return extra_args 182*9c5db199SXin Li 183*9c5db199SXin Li 184*9c5db199SXin Lidef get_host_query(multiple_labels, exclude_only_if_needed_labels, 185*9c5db199SXin Li valid_only, filter_data): 186*9c5db199SXin Li """ 187*9c5db199SXin Li @param exclude_only_if_needed_labels: Deprecated. By default it's false. 188*9c5db199SXin Li """ 189*9c5db199SXin Li if valid_only: 190*9c5db199SXin Li initial_query = models.Host.valid_objects.all() 191*9c5db199SXin Li else: 192*9c5db199SXin Li initial_query = models.Host.objects.all() 193*9c5db199SXin Li 194*9c5db199SXin Li try: 195*9c5db199SXin Li hosts = models.Host.get_hosts_with_labels( 196*9c5db199SXin Li multiple_labels, initial_query) 197*9c5db199SXin Li if not hosts: 198*9c5db199SXin Li return hosts 199*9c5db199SXin Li 200*9c5db199SXin Li return models.Host.query_objects(filter_data, initial_query=hosts) 201*9c5db199SXin Li except models.Label.DoesNotExist: 202*9c5db199SXin Li return models.Host.objects.none() 203*9c5db199SXin Li 204*9c5db199SXin Li 205*9c5db199SXin Liclass InconsistencyException(Exception): 206*9c5db199SXin Li 'Raised when a list of objects does not have a consistent value' 207*9c5db199SXin Li 208*9c5db199SXin Li 209*9c5db199SXin Lidef get_consistent_value(objects, field): 210*9c5db199SXin Li if not objects: 211*9c5db199SXin Li # well a list of nothing is consistent 212*9c5db199SXin Li return None 213*9c5db199SXin Li 214*9c5db199SXin Li value = getattr(objects[0], field) 215*9c5db199SXin Li for obj in objects: 216*9c5db199SXin Li this_value = getattr(obj, field) 217*9c5db199SXin Li if this_value != value: 218*9c5db199SXin Li raise InconsistencyException(objects[0], obj) 219*9c5db199SXin Li return value 220*9c5db199SXin Li 221*9c5db199SXin Li 222*9c5db199SXin Lidef afe_test_dict_to_test_object(test_dict): 223*9c5db199SXin Li if not isinstance(test_dict, dict): 224*9c5db199SXin Li return test_dict 225*9c5db199SXin Li 226*9c5db199SXin Li numerized_dict = {} 227*9c5db199SXin Li for key, value in six.iteritems(test_dict): 228*9c5db199SXin Li try: 229*9c5db199SXin Li numerized_dict[key] = int(value) 230*9c5db199SXin Li except (ValueError, TypeError): 231*9c5db199SXin Li numerized_dict[key] = value 232*9c5db199SXin Li 233*9c5db199SXin Li return type('TestObject', (object,), numerized_dict) 234*9c5db199SXin Li 235*9c5db199SXin Li 236*9c5db199SXin Lidef _check_is_server_test(test_type): 237*9c5db199SXin Li """Checks if the test type is a server test. 238*9c5db199SXin Li 239*9c5db199SXin Li @param test_type The test type in enum integer or string. 240*9c5db199SXin Li 241*9c5db199SXin Li @returns A boolean to identify if the test type is server test. 242*9c5db199SXin Li """ 243*9c5db199SXin Li if test_type is not None: 244*9c5db199SXin Li if isinstance(test_type, six.string_types): 245*9c5db199SXin Li try: 246*9c5db199SXin Li test_type = control_data.CONTROL_TYPE.get_value(test_type) 247*9c5db199SXin Li except AttributeError: 248*9c5db199SXin Li return False 249*9c5db199SXin Li return (test_type == control_data.CONTROL_TYPE.SERVER) 250*9c5db199SXin Li return False 251*9c5db199SXin Li 252*9c5db199SXin Li 253*9c5db199SXin Lidef prepare_generate_control_file(tests, profilers, db_tests=True): 254*9c5db199SXin Li if db_tests: 255*9c5db199SXin Li test_objects = [models.Test.smart_get(test) for test in tests] 256*9c5db199SXin Li else: 257*9c5db199SXin Li test_objects = [afe_test_dict_to_test_object(test) for test in tests] 258*9c5db199SXin Li 259*9c5db199SXin Li profiler_objects = [models.Profiler.smart_get(profiler) 260*9c5db199SXin Li for profiler in profilers] 261*9c5db199SXin Li # ensure tests are all the same type 262*9c5db199SXin Li try: 263*9c5db199SXin Li test_type = get_consistent_value(test_objects, 'test_type') 264*9c5db199SXin Li except InconsistencyException as exc: 265*9c5db199SXin Li test1, test2 = exc.args 266*9c5db199SXin Li raise model_logic.ValidationError( 267*9c5db199SXin Li {'tests' : 'You cannot run both test_suites and server-side ' 268*9c5db199SXin Li 'tests together (tests %s and %s differ' % ( 269*9c5db199SXin Li test1.name, test2.name)}) 270*9c5db199SXin Li 271*9c5db199SXin Li is_server = _check_is_server_test(test_type) 272*9c5db199SXin Li if test_objects: 273*9c5db199SXin Li synch_count = max(test.sync_count for test in test_objects) 274*9c5db199SXin Li else: 275*9c5db199SXin Li synch_count = 1 276*9c5db199SXin Li 277*9c5db199SXin Li if db_tests: 278*9c5db199SXin Li dependencies = set(label.name for label 279*9c5db199SXin Li in models.Label.objects.filter(test__in=test_objects)) 280*9c5db199SXin Li else: 281*9c5db199SXin Li dependencies = reduce( 282*9c5db199SXin Li set.union, [set(test.dependencies) for test in test_objects]) 283*9c5db199SXin Li 284*9c5db199SXin Li cf_info = dict(is_server=is_server, synch_count=synch_count, 285*9c5db199SXin Li dependencies=list(dependencies)) 286*9c5db199SXin Li return cf_info, test_objects, profiler_objects 287*9c5db199SXin Li 288*9c5db199SXin Li 289*9c5db199SXin Lidef check_job_dependencies(host_objects, job_dependencies): 290*9c5db199SXin Li """ 291*9c5db199SXin Li Check that a set of machines satisfies a job's dependencies. 292*9c5db199SXin Li host_objects: list of models.Host objects 293*9c5db199SXin Li job_dependencies: list of names of labels 294*9c5db199SXin Li """ 295*9c5db199SXin Li # check that hosts satisfy dependencies 296*9c5db199SXin Li host_ids = [host.id for host in host_objects] 297*9c5db199SXin Li hosts_in_job = models.Host.objects.filter(id__in=host_ids) 298*9c5db199SXin Li ok_hosts = hosts_in_job 299*9c5db199SXin Li for index, dependency in enumerate(job_dependencies): 300*9c5db199SXin Li if not provision.is_for_special_action(dependency): 301*9c5db199SXin Li try: 302*9c5db199SXin Li label = models.Label.smart_get(dependency) 303*9c5db199SXin Li except models.Label.DoesNotExist: 304*9c5db199SXin Li logging.info( 305*9c5db199SXin Li 'Label %r does not exist, so it cannot ' 306*9c5db199SXin Li 'be replaced by static label.', dependency) 307*9c5db199SXin Li label = None 308*9c5db199SXin Li 309*9c5db199SXin Li if label is not None and label.is_replaced_by_static(): 310*9c5db199SXin Li ok_hosts = ok_hosts.filter(static_labels__name=dependency) 311*9c5db199SXin Li else: 312*9c5db199SXin Li ok_hosts = ok_hosts.filter(labels__name=dependency) 313*9c5db199SXin Li 314*9c5db199SXin Li failing_hosts = (set(host.hostname for host in host_objects) - 315*9c5db199SXin Li set(host.hostname for host in ok_hosts)) 316*9c5db199SXin Li if failing_hosts: 317*9c5db199SXin Li raise model_logic.ValidationError( 318*9c5db199SXin Li {'hosts' : 'Host(s) failed to meet job dependencies (' + 319*9c5db199SXin Li (', '.join(job_dependencies)) + '): ' + 320*9c5db199SXin Li (', '.join(failing_hosts))}) 321*9c5db199SXin Li 322*9c5db199SXin Li 323*9c5db199SXin Lidef check_job_metahost_dependencies(metahost_objects, job_dependencies): 324*9c5db199SXin Li """ 325*9c5db199SXin Li Check that at least one machine within the metahost spec satisfies the job's 326*9c5db199SXin Li dependencies. 327*9c5db199SXin Li 328*9c5db199SXin Li @param metahost_objects A list of label objects representing the metahosts. 329*9c5db199SXin Li @param job_dependencies A list of strings of the required label names. 330*9c5db199SXin Li @raises NoEligibleHostException If a metahost cannot run the job. 331*9c5db199SXin Li """ 332*9c5db199SXin Li for metahost in metahost_objects: 333*9c5db199SXin Li if metahost.is_replaced_by_static(): 334*9c5db199SXin Li static_metahost = models.StaticLabel.smart_get(metahost.name) 335*9c5db199SXin Li hosts = models.Host.objects.filter(static_labels=static_metahost) 336*9c5db199SXin Li else: 337*9c5db199SXin Li hosts = models.Host.objects.filter(labels=metahost) 338*9c5db199SXin Li 339*9c5db199SXin Li for label_name in job_dependencies: 340*9c5db199SXin Li if not provision.is_for_special_action(label_name): 341*9c5db199SXin Li try: 342*9c5db199SXin Li label = models.Label.smart_get(label_name) 343*9c5db199SXin Li except models.Label.DoesNotExist: 344*9c5db199SXin Li logging.info('Label %r does not exist, so it cannot ' 345*9c5db199SXin Li 'be replaced by static label.', label_name) 346*9c5db199SXin Li label = None 347*9c5db199SXin Li 348*9c5db199SXin Li if label is not None and label.is_replaced_by_static(): 349*9c5db199SXin Li hosts = hosts.filter(static_labels__name=label_name) 350*9c5db199SXin Li else: 351*9c5db199SXin Li hosts = hosts.filter(labels__name=label_name) 352*9c5db199SXin Li 353*9c5db199SXin Li if not any(hosts): 354*9c5db199SXin Li raise error.NoEligibleHostException("No hosts within %s satisfy %s." 355*9c5db199SXin Li % (metahost.name, ', '.join(job_dependencies))) 356*9c5db199SXin Li 357*9c5db199SXin Li 358*9c5db199SXin Lidef _execution_key_for(host_queue_entry): 359*9c5db199SXin Li return (host_queue_entry.job.id, host_queue_entry.execution_subdir) 360*9c5db199SXin Li 361*9c5db199SXin Li 362*9c5db199SXin Lidef check_abort_synchronous_jobs(host_queue_entries): 363*9c5db199SXin Li # ensure user isn't aborting part of a synchronous autoserv execution 364*9c5db199SXin Li count_per_execution = {} 365*9c5db199SXin Li for queue_entry in host_queue_entries: 366*9c5db199SXin Li key = _execution_key_for(queue_entry) 367*9c5db199SXin Li count_per_execution.setdefault(key, 0) 368*9c5db199SXin Li count_per_execution[key] += 1 369*9c5db199SXin Li 370*9c5db199SXin Li for queue_entry in host_queue_entries: 371*9c5db199SXin Li if not queue_entry.execution_subdir: 372*9c5db199SXin Li continue 373*9c5db199SXin Li execution_count = count_per_execution[_execution_key_for(queue_entry)] 374*9c5db199SXin Li if execution_count < queue_entry.job.synch_count: 375*9c5db199SXin Li raise model_logic.ValidationError( 376*9c5db199SXin Li {'' : 'You cannot abort part of a synchronous job execution ' 377*9c5db199SXin Li '(%d/%s), %d included, %d expected' 378*9c5db199SXin Li % (queue_entry.job.id, queue_entry.execution_subdir, 379*9c5db199SXin Li execution_count, queue_entry.job.synch_count)}) 380*9c5db199SXin Li 381*9c5db199SXin Li 382*9c5db199SXin Lidef check_modify_host(update_data): 383*9c5db199SXin Li """ 384*9c5db199SXin Li Check modify_host* requests. 385*9c5db199SXin Li 386*9c5db199SXin Li @param update_data: A dictionary with the changes to make to a host 387*9c5db199SXin Li or hosts. 388*9c5db199SXin Li """ 389*9c5db199SXin Li # Only the scheduler (monitor_db) is allowed to modify Host status. 390*9c5db199SXin Li # Otherwise race conditions happen as a hosts state is changed out from 391*9c5db199SXin Li # beneath tasks being run on a host. 392*9c5db199SXin Li if 'status' in update_data: 393*9c5db199SXin Li raise model_logic.ValidationError({ 394*9c5db199SXin Li 'status': 'Host status can not be modified by the frontend.'}) 395*9c5db199SXin Li 396*9c5db199SXin Li 397*9c5db199SXin Lidef check_modify_host_locking(host, update_data): 398*9c5db199SXin Li """ 399*9c5db199SXin Li Checks when locking/unlocking has been requested if the host is already 400*9c5db199SXin Li locked/unlocked. 401*9c5db199SXin Li 402*9c5db199SXin Li @param host: models.Host object to be modified 403*9c5db199SXin Li @param update_data: A dictionary with the changes to make to the host. 404*9c5db199SXin Li """ 405*9c5db199SXin Li locked = update_data.get('locked', None) 406*9c5db199SXin Li lock_reason = update_data.get('lock_reason', None) 407*9c5db199SXin Li if locked is not None: 408*9c5db199SXin Li if locked and host.locked: 409*9c5db199SXin Li raise model_logic.ValidationError({ 410*9c5db199SXin Li 'locked': 'Host %s already locked by %s on %s.' % 411*9c5db199SXin Li (host.hostname, host.locked_by, host.lock_time)}) 412*9c5db199SXin Li if not locked and not host.locked: 413*9c5db199SXin Li raise model_logic.ValidationError({ 414*9c5db199SXin Li 'locked': 'Host %s already unlocked.' % host.hostname}) 415*9c5db199SXin Li if locked and not lock_reason and not host.locked: 416*9c5db199SXin Li raise model_logic.ValidationError({ 417*9c5db199SXin Li 'locked': 'Please provide a reason for locking Host %s' % 418*9c5db199SXin Li host.hostname}) 419*9c5db199SXin Li 420*9c5db199SXin Li 421*9c5db199SXin Lidef get_motd(): 422*9c5db199SXin Li dirname = os.path.dirname(__file__) 423*9c5db199SXin Li filename = os.path.join(dirname, "..", "..", "motd.txt") 424*9c5db199SXin Li text = '' 425*9c5db199SXin Li try: 426*9c5db199SXin Li fp = open(filename, "r") 427*9c5db199SXin Li try: 428*9c5db199SXin Li text = fp.read() 429*9c5db199SXin Li finally: 430*9c5db199SXin Li fp.close() 431*9c5db199SXin Li except: 432*9c5db199SXin Li pass 433*9c5db199SXin Li 434*9c5db199SXin Li return text 435*9c5db199SXin Li 436*9c5db199SXin Li 437*9c5db199SXin Lidef _get_metahost_counts(metahost_objects): 438*9c5db199SXin Li metahost_counts = {} 439*9c5db199SXin Li for metahost in metahost_objects: 440*9c5db199SXin Li metahost_counts.setdefault(metahost, 0) 441*9c5db199SXin Li metahost_counts[metahost] += 1 442*9c5db199SXin Li return metahost_counts 443*9c5db199SXin Li 444*9c5db199SXin Li 445*9c5db199SXin Lidef get_job_info(job, preserve_metahosts=False, queue_entry_filter_data=None): 446*9c5db199SXin Li hosts = [] 447*9c5db199SXin Li one_time_hosts = [] 448*9c5db199SXin Li meta_hosts = [] 449*9c5db199SXin Li hostless = False 450*9c5db199SXin Li 451*9c5db199SXin Li queue_entries = job.hostqueueentry_set.all() 452*9c5db199SXin Li if queue_entry_filter_data: 453*9c5db199SXin Li queue_entries = models.HostQueueEntry.query_objects( 454*9c5db199SXin Li queue_entry_filter_data, initial_query=queue_entries) 455*9c5db199SXin Li 456*9c5db199SXin Li for queue_entry in queue_entries: 457*9c5db199SXin Li if (queue_entry.host and (preserve_metahosts or 458*9c5db199SXin Li not queue_entry.meta_host)): 459*9c5db199SXin Li if queue_entry.deleted: 460*9c5db199SXin Li continue 461*9c5db199SXin Li if queue_entry.host.invalid: 462*9c5db199SXin Li one_time_hosts.append(queue_entry.host) 463*9c5db199SXin Li else: 464*9c5db199SXin Li hosts.append(queue_entry.host) 465*9c5db199SXin Li elif queue_entry.meta_host: 466*9c5db199SXin Li meta_hosts.append(queue_entry.meta_host) 467*9c5db199SXin Li else: 468*9c5db199SXin Li hostless = True 469*9c5db199SXin Li 470*9c5db199SXin Li meta_host_counts = _get_metahost_counts(meta_hosts) 471*9c5db199SXin Li 472*9c5db199SXin Li info = dict(dependencies=[label.name for label 473*9c5db199SXin Li in job.dependency_labels.all()], 474*9c5db199SXin Li hosts=hosts, 475*9c5db199SXin Li meta_hosts=meta_hosts, 476*9c5db199SXin Li meta_host_counts=meta_host_counts, 477*9c5db199SXin Li one_time_hosts=one_time_hosts, 478*9c5db199SXin Li hostless=hostless) 479*9c5db199SXin Li return info 480*9c5db199SXin Li 481*9c5db199SXin Li 482*9c5db199SXin Lidef check_for_duplicate_hosts(host_objects): 483*9c5db199SXin Li host_counts = collections.Counter(host_objects) 484*9c5db199SXin Li duplicate_hostnames = { 485*9c5db199SXin Li host.hostname 486*9c5db199SXin Li for host, count in six.iteritems(host_counts) if count > 1 487*9c5db199SXin Li } 488*9c5db199SXin Li if duplicate_hostnames: 489*9c5db199SXin Li raise model_logic.ValidationError( 490*9c5db199SXin Li {'hosts' : 'Duplicate hosts: %s' 491*9c5db199SXin Li % ', '.join(duplicate_hostnames)}) 492*9c5db199SXin Li 493*9c5db199SXin Li 494*9c5db199SXin Lidef create_new_job(owner, options, host_objects, metahost_objects): 495*9c5db199SXin Li all_host_objects = host_objects + metahost_objects 496*9c5db199SXin Li dependencies = options.get('dependencies', []) 497*9c5db199SXin Li synch_count = options.get('synch_count') 498*9c5db199SXin Li 499*9c5db199SXin Li if synch_count is not None and synch_count > len(all_host_objects): 500*9c5db199SXin Li raise model_logic.ValidationError( 501*9c5db199SXin Li {'hosts': 502*9c5db199SXin Li 'only %d hosts provided for job with synch_count = %d' % 503*9c5db199SXin Li (len(all_host_objects), synch_count)}) 504*9c5db199SXin Li 505*9c5db199SXin Li check_for_duplicate_hosts(host_objects) 506*9c5db199SXin Li 507*9c5db199SXin Li for label_name in dependencies: 508*9c5db199SXin Li if provision.is_for_special_action(label_name): 509*9c5db199SXin Li # TODO: We could save a few queries 510*9c5db199SXin Li # if we had a bulk ensure-label-exists function, which used 511*9c5db199SXin Li # a bulk .get() call. The win is probably very small. 512*9c5db199SXin Li _ensure_label_exists(label_name) 513*9c5db199SXin Li 514*9c5db199SXin Li # This only checks targeted hosts, not hosts eligible due to the metahost 515*9c5db199SXin Li check_job_dependencies(host_objects, dependencies) 516*9c5db199SXin Li check_job_metahost_dependencies(metahost_objects, dependencies) 517*9c5db199SXin Li 518*9c5db199SXin Li options['dependencies'] = list( 519*9c5db199SXin Li models.Label.objects.filter(name__in=dependencies)) 520*9c5db199SXin Li 521*9c5db199SXin Li job = models.Job.create(owner=owner, options=options, 522*9c5db199SXin Li hosts=all_host_objects) 523*9c5db199SXin Li job.queue(all_host_objects, 524*9c5db199SXin Li is_template=options.get('is_template', False)) 525*9c5db199SXin Li return job.id 526*9c5db199SXin Li 527*9c5db199SXin Li 528*9c5db199SXin Lidef _ensure_label_exists(name): 529*9c5db199SXin Li """ 530*9c5db199SXin Li Ensure that a label called |name| exists in the Django models. 531*9c5db199SXin Li 532*9c5db199SXin Li This function is to be called from within afe rpcs only, as an 533*9c5db199SXin Li alternative to server.cros.provision.ensure_label_exists(...). It works 534*9c5db199SXin Li by Django model manipulation, rather than by making another create_label 535*9c5db199SXin Li rpc call. 536*9c5db199SXin Li 537*9c5db199SXin Li @param name: the label to check for/create. 538*9c5db199SXin Li @raises ValidationError: There was an error in the response that was 539*9c5db199SXin Li not because the label already existed. 540*9c5db199SXin Li @returns True is a label was created, False otherwise. 541*9c5db199SXin Li """ 542*9c5db199SXin Li # Make sure this function is not called on shards but only on main. 543*9c5db199SXin Li assert not server_utils.is_shard() 544*9c5db199SXin Li try: 545*9c5db199SXin Li models.Label.objects.get(name=name) 546*9c5db199SXin Li except models.Label.DoesNotExist: 547*9c5db199SXin Li try: 548*9c5db199SXin Li new_label = models.Label.objects.create(name=name) 549*9c5db199SXin Li new_label.save() 550*9c5db199SXin Li return True 551*9c5db199SXin Li except django.db.utils.IntegrityError as e: 552*9c5db199SXin Li # It is possible that another suite/test already 553*9c5db199SXin Li # created the label between the check and save. 554*9c5db199SXin Li if DUPLICATE_KEY_MSG in str(e): 555*9c5db199SXin Li return False 556*9c5db199SXin Li else: 557*9c5db199SXin Li raise 558*9c5db199SXin Li return False 559*9c5db199SXin Li 560*9c5db199SXin Li 561*9c5db199SXin Lidef find_platform(hostname, label_list): 562*9c5db199SXin Li """ 563*9c5db199SXin Li Figure out the platform name for the given host 564*9c5db199SXin Li object. If none, the return value for either will be None. 565*9c5db199SXin Li 566*9c5db199SXin Li @param hostname: The hostname to find platform. 567*9c5db199SXin Li @param label_list: The label list to find platform. 568*9c5db199SXin Li 569*9c5db199SXin Li @returns platform name for the given host. 570*9c5db199SXin Li """ 571*9c5db199SXin Li platforms = [label.name for label in label_list if label.platform] 572*9c5db199SXin Li if not platforms: 573*9c5db199SXin Li platform = None 574*9c5db199SXin Li else: 575*9c5db199SXin Li platform = platforms[0] 576*9c5db199SXin Li 577*9c5db199SXin Li if len(platforms) > 1: 578*9c5db199SXin Li raise ValueError('Host %s has more than one platform: %s' % 579*9c5db199SXin Li (hostname, ', '.join(platforms))) 580*9c5db199SXin Li 581*9c5db199SXin Li return platform 582*9c5db199SXin Li 583*9c5db199SXin Li 584*9c5db199SXin Li# support for get_host_queue_entries_and_special_tasks() 585*9c5db199SXin Li 586*9c5db199SXin Lidef _common_entry_to_dict(entry, type, job_dict, exec_path, status, started_on): 587*9c5db199SXin Li return dict(type=type, 588*9c5db199SXin Li host=entry['host'], 589*9c5db199SXin Li job=job_dict, 590*9c5db199SXin Li execution_path=exec_path, 591*9c5db199SXin Li status=status, 592*9c5db199SXin Li started_on=started_on, 593*9c5db199SXin Li id=str(entry['id']) + type, 594*9c5db199SXin Li oid=entry['id']) 595*9c5db199SXin Li 596*9c5db199SXin Li 597*9c5db199SXin Lidef _special_task_to_dict(task, queue_entries): 598*9c5db199SXin Li """Transforms a special task dictionary to another form of dictionary. 599*9c5db199SXin Li 600*9c5db199SXin Li @param task Special task as a dictionary type 601*9c5db199SXin Li @param queue_entries Host queue entries as a list of dictionaries. 602*9c5db199SXin Li 603*9c5db199SXin Li @return Transformed dictionary for a special task. 604*9c5db199SXin Li """ 605*9c5db199SXin Li job_dict = None 606*9c5db199SXin Li if task['queue_entry']: 607*9c5db199SXin Li # Scan queue_entries to get the job detail info. 608*9c5db199SXin Li for qentry in queue_entries: 609*9c5db199SXin Li if task['queue_entry']['id'] == qentry['id']: 610*9c5db199SXin Li job_dict = qentry['job'] 611*9c5db199SXin Li break 612*9c5db199SXin Li # If not found, get it from DB. 613*9c5db199SXin Li if job_dict is None: 614*9c5db199SXin Li job = models.Job.objects.get(id=task['queue_entry']['job']) 615*9c5db199SXin Li job_dict = job.get_object_dict() 616*9c5db199SXin Li 617*9c5db199SXin Li exec_path = server_utils.get_special_task_exec_path( 618*9c5db199SXin Li task['host']['hostname'], task['id'], task['task'], 619*9c5db199SXin Li time_utils.time_string_to_datetime(task['time_requested'])) 620*9c5db199SXin Li status = server_utils.get_special_task_status( 621*9c5db199SXin Li task['is_complete'], task['success'], task['is_active']) 622*9c5db199SXin Li return _common_entry_to_dict(task, task['task'], job_dict, 623*9c5db199SXin Li exec_path, status, task['time_started']) 624*9c5db199SXin Li 625*9c5db199SXin Li 626*9c5db199SXin Lidef _queue_entry_to_dict(queue_entry): 627*9c5db199SXin Li job_dict = queue_entry['job'] 628*9c5db199SXin Li tag = server_utils.get_job_tag(job_dict['id'], job_dict['owner']) 629*9c5db199SXin Li exec_path = server_utils.get_hqe_exec_path(tag, 630*9c5db199SXin Li queue_entry['execution_subdir']) 631*9c5db199SXin Li return _common_entry_to_dict(queue_entry, 'Job', job_dict, exec_path, 632*9c5db199SXin Li queue_entry['status'], queue_entry['started_on']) 633*9c5db199SXin Li 634*9c5db199SXin Li 635*9c5db199SXin Lidef prepare_host_queue_entries_and_special_tasks(interleaved_entries, 636*9c5db199SXin Li queue_entries): 637*9c5db199SXin Li """ 638*9c5db199SXin Li Prepare for serialization the interleaved entries of host queue entries 639*9c5db199SXin Li and special tasks. 640*9c5db199SXin Li Each element in the entries is a dictionary type. 641*9c5db199SXin Li The special task dictionary has only a job id for a job and lacks 642*9c5db199SXin Li the detail of the job while the host queue entry dictionary has. 643*9c5db199SXin Li queue_entries is used to look up the job detail info. 644*9c5db199SXin Li 645*9c5db199SXin Li @param interleaved_entries Host queue entries and special tasks as a list 646*9c5db199SXin Li of dictionaries. 647*9c5db199SXin Li @param queue_entries Host queue entries as a list of dictionaries. 648*9c5db199SXin Li 649*9c5db199SXin Li @return A post-processed list of dictionaries that is to be serialized. 650*9c5db199SXin Li """ 651*9c5db199SXin Li dict_list = [] 652*9c5db199SXin Li for e in interleaved_entries: 653*9c5db199SXin Li # Distinguish the two mixed entries based on the existence of 654*9c5db199SXin Li # the key "task". If an entry has the key, the entry is for 655*9c5db199SXin Li # special task. Otherwise, host queue entry. 656*9c5db199SXin Li if 'task' in e: 657*9c5db199SXin Li dict_list.append(_special_task_to_dict(e, queue_entries)) 658*9c5db199SXin Li else: 659*9c5db199SXin Li dict_list.append(_queue_entry_to_dict(e)) 660*9c5db199SXin Li return prepare_for_serialization(dict_list) 661*9c5db199SXin Li 662*9c5db199SXin Li 663*9c5db199SXin Lidef _compute_next_job_for_tasks(queue_entries, special_tasks): 664*9c5db199SXin Li """ 665*9c5db199SXin Li For each task, try to figure out the next job that ran after that task. 666*9c5db199SXin Li This is done using two pieces of information: 667*9c5db199SXin Li * if the task has a queue entry, we can use that entry's job ID. 668*9c5db199SXin Li * if the task has a time_started, we can try to compare that against the 669*9c5db199SXin Li started_on field of queue_entries. this isn't guaranteed to work perfectly 670*9c5db199SXin Li since queue_entries may also have null started_on values. 671*9c5db199SXin Li * if the task has neither, or if use of time_started fails, just use the 672*9c5db199SXin Li last computed job ID. 673*9c5db199SXin Li 674*9c5db199SXin Li @param queue_entries Host queue entries as a list of dictionaries. 675*9c5db199SXin Li @param special_tasks Special tasks as a list of dictionaries. 676*9c5db199SXin Li """ 677*9c5db199SXin Li next_job_id = None # most recently computed next job 678*9c5db199SXin Li hqe_index = 0 # index for scanning by started_on times 679*9c5db199SXin Li for task in special_tasks: 680*9c5db199SXin Li if task['queue_entry']: 681*9c5db199SXin Li next_job_id = task['queue_entry']['job'] 682*9c5db199SXin Li elif task['time_started'] is not None: 683*9c5db199SXin Li for queue_entry in queue_entries[hqe_index:]: 684*9c5db199SXin Li if queue_entry['started_on'] is None: 685*9c5db199SXin Li continue 686*9c5db199SXin Li t1 = time_utils.time_string_to_datetime( 687*9c5db199SXin Li queue_entry['started_on']) 688*9c5db199SXin Li t2 = time_utils.time_string_to_datetime(task['time_started']) 689*9c5db199SXin Li if t1 < t2: 690*9c5db199SXin Li break 691*9c5db199SXin Li next_job_id = queue_entry['job']['id'] 692*9c5db199SXin Li 693*9c5db199SXin Li task['next_job_id'] = next_job_id 694*9c5db199SXin Li 695*9c5db199SXin Li # advance hqe_index to just after next_job_id 696*9c5db199SXin Li if next_job_id is not None: 697*9c5db199SXin Li for queue_entry in queue_entries[hqe_index:]: 698*9c5db199SXin Li if queue_entry['job']['id'] < next_job_id: 699*9c5db199SXin Li break 700*9c5db199SXin Li hqe_index += 1 701*9c5db199SXin Li 702*9c5db199SXin Li 703*9c5db199SXin Lidef interleave_entries(queue_entries, special_tasks): 704*9c5db199SXin Li """ 705*9c5db199SXin Li Both lists should be ordered by descending ID. 706*9c5db199SXin Li """ 707*9c5db199SXin Li _compute_next_job_for_tasks(queue_entries, special_tasks) 708*9c5db199SXin Li 709*9c5db199SXin Li # start with all special tasks that've run since the last job 710*9c5db199SXin Li interleaved_entries = [] 711*9c5db199SXin Li for task in special_tasks: 712*9c5db199SXin Li if task['next_job_id'] is not None: 713*9c5db199SXin Li break 714*9c5db199SXin Li interleaved_entries.append(task) 715*9c5db199SXin Li 716*9c5db199SXin Li # now interleave queue entries with the remaining special tasks 717*9c5db199SXin Li special_task_index = len(interleaved_entries) 718*9c5db199SXin Li for queue_entry in queue_entries: 719*9c5db199SXin Li interleaved_entries.append(queue_entry) 720*9c5db199SXin Li # add all tasks that ran between this job and the previous one 721*9c5db199SXin Li for task in special_tasks[special_task_index:]: 722*9c5db199SXin Li if task['next_job_id'] < queue_entry['job']['id']: 723*9c5db199SXin Li break 724*9c5db199SXin Li interleaved_entries.append(task) 725*9c5db199SXin Li special_task_index += 1 726*9c5db199SXin Li 727*9c5db199SXin Li return interleaved_entries 728*9c5db199SXin Li 729*9c5db199SXin Li 730*9c5db199SXin Lidef bucket_hosts_by_shard(host_objs): 731*9c5db199SXin Li """Figure out which hosts are on which shards. 732*9c5db199SXin Li 733*9c5db199SXin Li @param host_objs: A list of host objects. 734*9c5db199SXin Li 735*9c5db199SXin Li @return: A map of shard hostname: list of hosts on the shard. 736*9c5db199SXin Li """ 737*9c5db199SXin Li shard_host_map = collections.defaultdict(list) 738*9c5db199SXin Li for host in host_objs: 739*9c5db199SXin Li if host.shard: 740*9c5db199SXin Li shard_host_map[host.shard.hostname].append(host.hostname) 741*9c5db199SXin Li return shard_host_map 742*9c5db199SXin Li 743*9c5db199SXin Li 744*9c5db199SXin Lidef create_job_common( 745*9c5db199SXin Li name, 746*9c5db199SXin Li priority, 747*9c5db199SXin Li control_type, 748*9c5db199SXin Li control_file=None, 749*9c5db199SXin Li hosts=(), 750*9c5db199SXin Li meta_hosts=(), 751*9c5db199SXin Li one_time_hosts=(), 752*9c5db199SXin Li synch_count=None, 753*9c5db199SXin Li is_template=False, 754*9c5db199SXin Li timeout=None, 755*9c5db199SXin Li timeout_mins=None, 756*9c5db199SXin Li max_runtime_mins=None, 757*9c5db199SXin Li run_verify=True, 758*9c5db199SXin Li email_list='', 759*9c5db199SXin Li dependencies=(), 760*9c5db199SXin Li reboot_before=None, 761*9c5db199SXin Li reboot_after=None, 762*9c5db199SXin Li parse_failed_repair=None, 763*9c5db199SXin Li hostless=False, 764*9c5db199SXin Li keyvals=None, 765*9c5db199SXin Li drone_set=None, 766*9c5db199SXin Li parent_job_id=None, 767*9c5db199SXin Li run_reset=True, 768*9c5db199SXin Li require_ssp=None): 769*9c5db199SXin Li #pylint: disable-msg=C0111 770*9c5db199SXin Li """ 771*9c5db199SXin Li Common code between creating "standard" jobs and creating parameterized jobs 772*9c5db199SXin Li """ 773*9c5db199SXin Li # input validation 774*9c5db199SXin Li host_args_passed = any((hosts, meta_hosts, one_time_hosts)) 775*9c5db199SXin Li if hostless: 776*9c5db199SXin Li if host_args_passed: 777*9c5db199SXin Li raise model_logic.ValidationError({ 778*9c5db199SXin Li 'hostless': 'Hostless jobs cannot include any hosts!'}) 779*9c5db199SXin Li if control_type != control_data.CONTROL_TYPE_NAMES.SERVER: 780*9c5db199SXin Li raise model_logic.ValidationError({ 781*9c5db199SXin Li 'control_type': 'Hostless jobs cannot use client-side ' 782*9c5db199SXin Li 'control files'}) 783*9c5db199SXin Li elif not host_args_passed: 784*9c5db199SXin Li raise model_logic.ValidationError({ 785*9c5db199SXin Li 'arguments' : "For host jobs, you must pass at least one of" 786*9c5db199SXin Li " 'hosts', 'meta_hosts', 'one_time_hosts'." 787*9c5db199SXin Li }) 788*9c5db199SXin Li label_objects = list(models.Label.objects.filter(name__in=meta_hosts)) 789*9c5db199SXin Li 790*9c5db199SXin Li # convert hostnames & meta hosts to host/label objects 791*9c5db199SXin Li host_objects = models.Host.smart_get_bulk(hosts) 792*9c5db199SXin Li _validate_host_job_sharding(host_objects) 793*9c5db199SXin Li for host in one_time_hosts: 794*9c5db199SXin Li this_host = models.Host.create_one_time_host(host) 795*9c5db199SXin Li host_objects.append(this_host) 796*9c5db199SXin Li 797*9c5db199SXin Li metahost_objects = [] 798*9c5db199SXin Li meta_host_labels_by_name = {label.name: label for label in label_objects} 799*9c5db199SXin Li for label_name in meta_hosts: 800*9c5db199SXin Li if label_name in meta_host_labels_by_name: 801*9c5db199SXin Li metahost_objects.append(meta_host_labels_by_name[label_name]) 802*9c5db199SXin Li else: 803*9c5db199SXin Li raise model_logic.ValidationError( 804*9c5db199SXin Li {'meta_hosts' : 'Label "%s" not found' % label_name}) 805*9c5db199SXin Li 806*9c5db199SXin Li options = dict(name=name, 807*9c5db199SXin Li priority=priority, 808*9c5db199SXin Li control_file=control_file, 809*9c5db199SXin Li control_type=control_type, 810*9c5db199SXin Li is_template=is_template, 811*9c5db199SXin Li timeout=timeout, 812*9c5db199SXin Li timeout_mins=timeout_mins, 813*9c5db199SXin Li max_runtime_mins=max_runtime_mins, 814*9c5db199SXin Li synch_count=synch_count, 815*9c5db199SXin Li run_verify=run_verify, 816*9c5db199SXin Li email_list=email_list, 817*9c5db199SXin Li dependencies=dependencies, 818*9c5db199SXin Li reboot_before=reboot_before, 819*9c5db199SXin Li reboot_after=reboot_after, 820*9c5db199SXin Li parse_failed_repair=parse_failed_repair, 821*9c5db199SXin Li keyvals=keyvals, 822*9c5db199SXin Li drone_set=drone_set, 823*9c5db199SXin Li parent_job_id=parent_job_id, 824*9c5db199SXin Li # TODO(crbug.com/873716) DEPRECATED. Remove entirely. 825*9c5db199SXin Li test_retry=0, 826*9c5db199SXin Li run_reset=run_reset, 827*9c5db199SXin Li require_ssp=require_ssp) 828*9c5db199SXin Li 829*9c5db199SXin Li return create_new_job(owner=models.User.current_user().login, 830*9c5db199SXin Li options=options, 831*9c5db199SXin Li host_objects=host_objects, 832*9c5db199SXin Li metahost_objects=metahost_objects) 833*9c5db199SXin Li 834*9c5db199SXin Li 835*9c5db199SXin Lidef _validate_host_job_sharding(host_objects): 836*9c5db199SXin Li """Check that the hosts obey job sharding rules.""" 837*9c5db199SXin Li if not (server_utils.is_shard() 838*9c5db199SXin Li or _allowed_hosts_for_main_job(host_objects)): 839*9c5db199SXin Li shard_host_map = bucket_hosts_by_shard(host_objects) 840*9c5db199SXin Li raise ValueError( 841*9c5db199SXin Li 'The following hosts are on shard(s), please create ' 842*9c5db199SXin Li 'seperate jobs for hosts on each shard: %s ' % 843*9c5db199SXin Li shard_host_map) 844*9c5db199SXin Li 845*9c5db199SXin Li 846*9c5db199SXin Lidef _allowed_hosts_for_main_job(host_objects): 847*9c5db199SXin Li """Check that the hosts are allowed for a job on main.""" 848*9c5db199SXin Li # We disallow the following jobs on main: 849*9c5db199SXin Li # num_shards > 1: this is a job spanning across multiple shards. 850*9c5db199SXin Li # num_shards == 1 but number of hosts on shard is less 851*9c5db199SXin Li # than total number of hosts: this is a job that spans across 852*9c5db199SXin Li # one shard and the main. 853*9c5db199SXin Li shard_host_map = bucket_hosts_by_shard(host_objects) 854*9c5db199SXin Li num_shards = len(shard_host_map) 855*9c5db199SXin Li if num_shards > 1: 856*9c5db199SXin Li return False 857*9c5db199SXin Li if num_shards == 1: 858*9c5db199SXin Li hosts_on_shard = list(shard_host_map.values())[0] 859*9c5db199SXin Li assert len(hosts_on_shard) <= len(host_objects) 860*9c5db199SXin Li return len(hosts_on_shard) == len(host_objects) 861*9c5db199SXin Li else: 862*9c5db199SXin Li return True 863*9c5db199SXin Li 864*9c5db199SXin Li 865*9c5db199SXin Lidef encode_ascii(control_file): 866*9c5db199SXin Li """Force a control file to only contain ascii characters. 867*9c5db199SXin Li 868*9c5db199SXin Li @param control_file: Control file to encode. 869*9c5db199SXin Li 870*9c5db199SXin Li @returns the control file in an ascii encoding. 871*9c5db199SXin Li 872*9c5db199SXin Li @raises error.ControlFileMalformed: if encoding fails. 873*9c5db199SXin Li """ 874*9c5db199SXin Li try: 875*9c5db199SXin Li return control_file.encode('ascii') 876*9c5db199SXin Li except UnicodeDecodeError as e: 877*9c5db199SXin Li raise error.ControlFileMalformed(str(e)) 878*9c5db199SXin Li 879*9c5db199SXin Li 880*9c5db199SXin Lidef get_wmatrix_url(): 881*9c5db199SXin Li """Get wmatrix url from config file. 882*9c5db199SXin Li 883*9c5db199SXin Li @returns the wmatrix url or an empty string. 884*9c5db199SXin Li """ 885*9c5db199SXin Li return global_config.global_config.get_config_value('AUTOTEST_WEB', 886*9c5db199SXin Li 'wmatrix_url', 887*9c5db199SXin Li default='') 888*9c5db199SXin Li 889*9c5db199SXin Li 890*9c5db199SXin Lidef get_stainless_url(): 891*9c5db199SXin Li """Get stainless url from config file. 892*9c5db199SXin Li 893*9c5db199SXin Li @returns the stainless url or an empty string. 894*9c5db199SXin Li """ 895*9c5db199SXin Li return global_config.global_config.get_config_value('AUTOTEST_WEB', 896*9c5db199SXin Li 'stainless_url', 897*9c5db199SXin Li default='') 898*9c5db199SXin Li 899*9c5db199SXin Li 900*9c5db199SXin Lidef inject_times_to_filter(start_time_key=None, end_time_key=None, 901*9c5db199SXin Li start_time_value=None, end_time_value=None, 902*9c5db199SXin Li **filter_data): 903*9c5db199SXin Li """Inject the key value pairs of start and end time if provided. 904*9c5db199SXin Li 905*9c5db199SXin Li @param start_time_key: A string represents the filter key of start_time. 906*9c5db199SXin Li @param end_time_key: A string represents the filter key of end_time. 907*9c5db199SXin Li @param start_time_value: Start_time value. 908*9c5db199SXin Li @param end_time_value: End_time value. 909*9c5db199SXin Li 910*9c5db199SXin Li @returns the injected filter_data. 911*9c5db199SXin Li """ 912*9c5db199SXin Li if start_time_value: 913*9c5db199SXin Li filter_data[start_time_key] = start_time_value 914*9c5db199SXin Li if end_time_value: 915*9c5db199SXin Li filter_data[end_time_key] = end_time_value 916*9c5db199SXin Li return filter_data 917*9c5db199SXin Li 918*9c5db199SXin Li 919*9c5db199SXin Lidef inject_times_to_hqe_special_tasks_filters(filter_data_common, 920*9c5db199SXin Li start_time, end_time): 921*9c5db199SXin Li """Inject start and end time to hqe and special tasks filters. 922*9c5db199SXin Li 923*9c5db199SXin Li @param filter_data_common: Common filter for hqe and special tasks. 924*9c5db199SXin Li @param start_time_key: A string represents the filter key of start_time. 925*9c5db199SXin Li @param end_time_key: A string represents the filter key of end_time. 926*9c5db199SXin Li 927*9c5db199SXin Li @returns a pair of hqe and special tasks filters. 928*9c5db199SXin Li """ 929*9c5db199SXin Li filter_data_special_tasks = filter_data_common.copy() 930*9c5db199SXin Li return (inject_times_to_filter('started_on__gte', 'started_on__lte', 931*9c5db199SXin Li start_time, end_time, **filter_data_common), 932*9c5db199SXin Li inject_times_to_filter('time_started__gte', 'time_started__lte', 933*9c5db199SXin Li start_time, end_time, 934*9c5db199SXin Li **filter_data_special_tasks)) 935*9c5db199SXin Li 936*9c5db199SXin Li 937*9c5db199SXin Lidef retrieve_shard(shard_hostname): 938*9c5db199SXin Li """ 939*9c5db199SXin Li Retrieves the shard with the given hostname from the database. 940*9c5db199SXin Li 941*9c5db199SXin Li @param shard_hostname: Hostname of the shard to retrieve 942*9c5db199SXin Li 943*9c5db199SXin Li @raises models.Shard.DoesNotExist, if no shard with this hostname was found. 944*9c5db199SXin Li 945*9c5db199SXin Li @returns: Shard object 946*9c5db199SXin Li """ 947*9c5db199SXin Li return models.Shard.smart_get(shard_hostname) 948*9c5db199SXin Li 949*9c5db199SXin Li 950*9c5db199SXin Lidef find_records_for_shard(shard, known_job_ids, known_host_ids): 951*9c5db199SXin Li """Find records that should be sent to a shard. 952*9c5db199SXin Li 953*9c5db199SXin Li @param shard: Shard to find records for. 954*9c5db199SXin Li @param known_job_ids: List of ids of jobs the shard already has. 955*9c5db199SXin Li @param known_host_ids: List of ids of hosts the shard already has. 956*9c5db199SXin Li 957*9c5db199SXin Li @returns: Tuple of lists: 958*9c5db199SXin Li (hosts, jobs, suite_job_keyvals, invalid_host_ids) 959*9c5db199SXin Li """ 960*9c5db199SXin Li hosts, invalid_host_ids = models.Host.assign_to_shard( 961*9c5db199SXin Li shard, known_host_ids) 962*9c5db199SXin Li jobs = models.Job.assign_to_shard(shard, known_job_ids) 963*9c5db199SXin Li parent_job_ids = [job.parent_job_id for job in jobs] 964*9c5db199SXin Li suite_job_keyvals = models.JobKeyval.objects.filter( 965*9c5db199SXin Li job_id__in=parent_job_ids) 966*9c5db199SXin Li return hosts, jobs, suite_job_keyvals, invalid_host_ids 967*9c5db199SXin Li 968*9c5db199SXin Li 969*9c5db199SXin Lidef _persist_records_with_type_sent_from_shard( 970*9c5db199SXin Li shard, records, record_type, *args, **kwargs): 971*9c5db199SXin Li """ 972*9c5db199SXin Li Handle records of a specified type that were sent to the shard main. 973*9c5db199SXin Li 974*9c5db199SXin Li @param shard: The shard the records were sent from. 975*9c5db199SXin Li @param records: The records sent in their serialized format. 976*9c5db199SXin Li @param record_type: Type of the objects represented by records. 977*9c5db199SXin Li @param args: Additional arguments that will be passed on to the checks. 978*9c5db199SXin Li @param kwargs: Additional arguments that will be passed on to the checks. 979*9c5db199SXin Li 980*9c5db199SXin Li @raises error.UnallowedRecordsSentToMain if any of the checks fail. 981*9c5db199SXin Li 982*9c5db199SXin Li @returns: List of primary keys of the processed records. 983*9c5db199SXin Li """ 984*9c5db199SXin Li pks = [] 985*9c5db199SXin Li for serialized_record in records: 986*9c5db199SXin Li pk = serialized_record['id'] 987*9c5db199SXin Li try: 988*9c5db199SXin Li current_record = record_type.objects.get(pk=pk) 989*9c5db199SXin Li except record_type.DoesNotExist: 990*9c5db199SXin Li raise error.UnallowedRecordsSentToMain( 991*9c5db199SXin Li 'Object with pk %s of type %s does not exist on main.' % ( 992*9c5db199SXin Li pk, record_type)) 993*9c5db199SXin Li 994*9c5db199SXin Li try: 995*9c5db199SXin Li current_record._check_update_from_shard( 996*9c5db199SXin Li shard, serialized_record, *args, **kwargs) 997*9c5db199SXin Li except error.IgnorableUnallowedRecordsSentToMain: 998*9c5db199SXin Li # An illegal record change was attempted, but it was of a non-fatal 999*9c5db199SXin Li # variety. Silently skip this record. 1000*9c5db199SXin Li pass 1001*9c5db199SXin Li else: 1002*9c5db199SXin Li current_record.update_from_serialized(serialized_record) 1003*9c5db199SXin Li pks.append(pk) 1004*9c5db199SXin Li 1005*9c5db199SXin Li return pks 1006*9c5db199SXin Li 1007*9c5db199SXin Li 1008*9c5db199SXin Lidef persist_records_sent_from_shard(shard, jobs, hqes): 1009*9c5db199SXin Li """ 1010*9c5db199SXin Li Checking then saving serialized records sent to main from shard. 1011*9c5db199SXin Li 1012*9c5db199SXin Li During heartbeats shards upload jobs and hostqueuentries. This performs 1013*9c5db199SXin Li some checks on these and then updates the existing records for those 1014*9c5db199SXin Li entries with the updated ones from the heartbeat. 1015*9c5db199SXin Li 1016*9c5db199SXin Li The checks include: 1017*9c5db199SXin Li - Checking if the objects sent already exist on the main. 1018*9c5db199SXin Li - Checking if the objects sent were assigned to this shard. 1019*9c5db199SXin Li - hostqueueentries must be sent together with their jobs. 1020*9c5db199SXin Li 1021*9c5db199SXin Li @param shard: The shard the records were sent from. 1022*9c5db199SXin Li @param jobs: The jobs the shard sent. 1023*9c5db199SXin Li @param hqes: The hostqueuentries the shart sent. 1024*9c5db199SXin Li 1025*9c5db199SXin Li @raises error.UnallowedRecordsSentToMain if any of the checks fail. 1026*9c5db199SXin Li """ 1027*9c5db199SXin Li job_ids_persisted = _persist_records_with_type_sent_from_shard( 1028*9c5db199SXin Li shard, jobs, models.Job) 1029*9c5db199SXin Li _persist_records_with_type_sent_from_shard( 1030*9c5db199SXin Li shard, hqes, models.HostQueueEntry, 1031*9c5db199SXin Li job_ids_sent=job_ids_persisted) 1032*9c5db199SXin Li 1033*9c5db199SXin Li 1034*9c5db199SXin Lidef forward_single_host_rpc_to_shard(func): 1035*9c5db199SXin Li """This decorator forwards rpc calls that modify a host to a shard. 1036*9c5db199SXin Li 1037*9c5db199SXin Li If a host is assigned to a shard, rpcs that change the host attributes should be 1038*9c5db199SXin Li forwarded to the shard. 1039*9c5db199SXin Li 1040*9c5db199SXin Li This assumes the first argument of the function represents a host id. 1041*9c5db199SXin Li 1042*9c5db199SXin Li @param func: The function to decorate 1043*9c5db199SXin Li 1044*9c5db199SXin Li @returns: The function to replace func with. 1045*9c5db199SXin Li """ 1046*9c5db199SXin Li def replacement(**kwargs): 1047*9c5db199SXin Li # Only keyword arguments can be accepted here, as we need the argument 1048*9c5db199SXin Li # names to send the rpc. serviceHandler always provides arguments with 1049*9c5db199SXin Li # their keywords, so this is not a problem. 1050*9c5db199SXin Li 1051*9c5db199SXin Li # A host record (identified by kwargs['id']) can be deleted in 1052*9c5db199SXin Li # func(). Therefore, we should save the data that can be needed later 1053*9c5db199SXin Li # before func() is called. 1054*9c5db199SXin Li shard_hostname = None 1055*9c5db199SXin Li host = models.Host.smart_get(kwargs['id']) 1056*9c5db199SXin Li if host and host.shard: 1057*9c5db199SXin Li shard_hostname = host.shard.hostname 1058*9c5db199SXin Li ret = func(**kwargs) 1059*9c5db199SXin Li if shard_hostname and not server_utils.is_shard(): 1060*9c5db199SXin Li run_rpc_on_multiple_hostnames(func.__name__, [shard_hostname], 1061*9c5db199SXin Li **kwargs) 1062*9c5db199SXin Li return ret 1063*9c5db199SXin Li 1064*9c5db199SXin Li return replacement 1065*9c5db199SXin Li 1066*9c5db199SXin Li 1067*9c5db199SXin Lidef fanout_rpc(host_objs, rpc_name, include_hostnames=True, **kwargs): 1068*9c5db199SXin Li """Fanout the given rpc to shards of given hosts. 1069*9c5db199SXin Li 1070*9c5db199SXin Li @param host_objs: Host objects for the rpc. 1071*9c5db199SXin Li @param rpc_name: The name of the rpc. 1072*9c5db199SXin Li @param include_hostnames: If True, include the hostnames in the kwargs. 1073*9c5db199SXin Li Hostnames are not always necessary, this functions is designed to 1074*9c5db199SXin Li send rpcs to the shard a host is on, the rpcs themselves could be 1075*9c5db199SXin Li related to labels, acls etc. 1076*9c5db199SXin Li @param kwargs: The kwargs for the rpc. 1077*9c5db199SXin Li """ 1078*9c5db199SXin Li # Figure out which hosts are on which shards. 1079*9c5db199SXin Li shard_host_map = bucket_hosts_by_shard(host_objs) 1080*9c5db199SXin Li 1081*9c5db199SXin Li # Execute the rpc against the appropriate shards. 1082*9c5db199SXin Li for shard, hostnames in six.iteritems(shard_host_map): 1083*9c5db199SXin Li if include_hostnames: 1084*9c5db199SXin Li kwargs['hosts'] = hostnames 1085*9c5db199SXin Li try: 1086*9c5db199SXin Li run_rpc_on_multiple_hostnames(rpc_name, [shard], **kwargs) 1087*9c5db199SXin Li except Exception as e: 1088*9c5db199SXin Li raise error.RPCException('RPC %s failed on shard %s due to %s' % 1089*9c5db199SXin Li (rpc_name, shard, e)) 1090*9c5db199SXin Li 1091*9c5db199SXin Li 1092*9c5db199SXin Lidef run_rpc_on_multiple_hostnames(rpc_call, shard_hostnames, **kwargs): 1093*9c5db199SXin Li """Runs an rpc to multiple AFEs 1094*9c5db199SXin Li 1095*9c5db199SXin Li This is i.e. used to propagate changes made to hosts after they are assigned 1096*9c5db199SXin Li to a shard. 1097*9c5db199SXin Li 1098*9c5db199SXin Li @param rpc_call: Name of the rpc endpoint to call. 1099*9c5db199SXin Li @param shard_hostnames: List of hostnames to run the rpcs on. 1100*9c5db199SXin Li @param **kwargs: Keyword arguments to pass in the rpcs. 1101*9c5db199SXin Li """ 1102*9c5db199SXin Li # Make sure this function is not called on shards but only on main. 1103*9c5db199SXin Li assert not server_utils.is_shard() 1104*9c5db199SXin Li for shard_hostname in shard_hostnames: 1105*9c5db199SXin Li afe = frontend_wrappers.RetryingAFE(server=shard_hostname, 1106*9c5db199SXin Li user=thread_local.get_user()) 1107*9c5db199SXin Li afe.run(rpc_call, **kwargs) 1108*9c5db199SXin Li 1109*9c5db199SXin Li 1110*9c5db199SXin Lidef get_label(name): 1111*9c5db199SXin Li """Gets a label object using a given name. 1112*9c5db199SXin Li 1113*9c5db199SXin Li @param name: Label name. 1114*9c5db199SXin Li @raises model.Label.DoesNotExist: when there is no label matching 1115*9c5db199SXin Li the given name. 1116*9c5db199SXin Li @return: a label object matching the given name. 1117*9c5db199SXin Li """ 1118*9c5db199SXin Li try: 1119*9c5db199SXin Li label = models.Label.smart_get(name) 1120*9c5db199SXin Li except models.Label.DoesNotExist: 1121*9c5db199SXin Li return None 1122*9c5db199SXin Li return label 1123*9c5db199SXin Li 1124*9c5db199SXin Li 1125*9c5db199SXin Li# TODO: hide the following rpcs under is_moblab 1126*9c5db199SXin Lidef moblab_only(func): 1127*9c5db199SXin Li """Ensure moblab specific functions only run on Moblab devices.""" 1128*9c5db199SXin Li def verify(*args, **kwargs): 1129*9c5db199SXin Li if not server_utils.is_moblab(): 1130*9c5db199SXin Li raise error.RPCException('RPC: %s can only run on Moblab Systems!', 1131*9c5db199SXin Li func.__name__) 1132*9c5db199SXin Li return func(*args, **kwargs) 1133*9c5db199SXin Li return verify 1134*9c5db199SXin Li 1135*9c5db199SXin Li 1136*9c5db199SXin Lidef route_rpc_to_main(func): 1137*9c5db199SXin Li """Route RPC to main AFE. 1138*9c5db199SXin Li 1139*9c5db199SXin Li When a shard receives an RPC decorated by this, the RPC is just 1140*9c5db199SXin Li forwarded to the main. 1141*9c5db199SXin Li When the main gets the RPC, the RPC function is executed. 1142*9c5db199SXin Li 1143*9c5db199SXin Li @param func: An RPC function to decorate 1144*9c5db199SXin Li 1145*9c5db199SXin Li @returns: A function replacing the RPC func. 1146*9c5db199SXin Li """ 1147*9c5db199SXin Li argspec = inspect.getargspec(func) 1148*9c5db199SXin Li if argspec.varargs is not None: 1149*9c5db199SXin Li raise Exception('RPC function must not have *args.') 1150*9c5db199SXin Li 1151*9c5db199SXin Li @wraps(func) 1152*9c5db199SXin Li def replacement(*args, **kwargs): 1153*9c5db199SXin Li """We need special handling when decorating an RPC that can be called 1154*9c5db199SXin Li directly using positional arguments. 1155*9c5db199SXin Li 1156*9c5db199SXin Li One example is rpc_interface.create_job(). 1157*9c5db199SXin Li rpc_interface.create_job_page_handler() calls the function using both 1158*9c5db199SXin Li positional and keyword arguments. Since frontend.RpcClient.run() 1159*9c5db199SXin Li takes only keyword arguments for an RPC, positional arguments of the 1160*9c5db199SXin Li RPC function need to be transformed into keyword arguments. 1161*9c5db199SXin Li """ 1162*9c5db199SXin Li kwargs = _convert_to_kwargs_only(func, args, kwargs) 1163*9c5db199SXin Li if server_utils.is_shard(): 1164*9c5db199SXin Li afe = frontend_wrappers.RetryingAFE( 1165*9c5db199SXin Li server=server_utils.get_global_afe_hostname(), 1166*9c5db199SXin Li user=thread_local.get_user()) 1167*9c5db199SXin Li return afe.run(func.__name__, **kwargs) 1168*9c5db199SXin Li return func(**kwargs) 1169*9c5db199SXin Li 1170*9c5db199SXin Li return replacement 1171*9c5db199SXin Li 1172*9c5db199SXin Li 1173*9c5db199SXin Lidef _convert_to_kwargs_only(func, args, kwargs): 1174*9c5db199SXin Li """Convert a function call's arguments to a kwargs dict. 1175*9c5db199SXin Li 1176*9c5db199SXin Li This is best illustrated with an example. Given: 1177*9c5db199SXin Li 1178*9c5db199SXin Li def foo(a, b, **kwargs): 1179*9c5db199SXin Li pass 1180*9c5db199SXin Li _to_kwargs(foo, (1, 2), {'c': 3}) # corresponding to foo(1, 2, c=3) 1181*9c5db199SXin Li 1182*9c5db199SXin Li foo(**kwargs) 1183*9c5db199SXin Li 1184*9c5db199SXin Li @param func: function whose signature to use 1185*9c5db199SXin Li @param args: positional arguments of call 1186*9c5db199SXin Li @param kwargs: keyword arguments of call 1187*9c5db199SXin Li 1188*9c5db199SXin Li @returns: kwargs dict 1189*9c5db199SXin Li """ 1190*9c5db199SXin Li argspec = inspect.getargspec(func) 1191*9c5db199SXin Li # callargs looks like {'a': 1, 'b': 2, 'kwargs': {'c': 3}} 1192*9c5db199SXin Li callargs = inspect.getcallargs(func, *args, **kwargs) 1193*9c5db199SXin Li if argspec.keywords is None: 1194*9c5db199SXin Li kwargs = {} 1195*9c5db199SXin Li else: 1196*9c5db199SXin Li kwargs = callargs.pop(argspec.keywords) 1197*9c5db199SXin Li kwargs.update(callargs) 1198*9c5db199SXin Li return kwargs 1199*9c5db199SXin Li 1200*9c5db199SXin Li 1201*9c5db199SXin Lidef get_sample_dut(board, pool): 1202*9c5db199SXin Li """Get a dut with the given board and pool. 1203*9c5db199SXin Li 1204*9c5db199SXin Li This method is used to help to locate a dut with the given board and pool. 1205*9c5db199SXin Li The dut then can be used to identify a devserver in the same subnet. 1206*9c5db199SXin Li 1207*9c5db199SXin Li @param board: Name of the board. 1208*9c5db199SXin Li @param pool: Name of the pool. 1209*9c5db199SXin Li 1210*9c5db199SXin Li @return: Name of a dut with the given board and pool. 1211*9c5db199SXin Li """ 1212*9c5db199SXin Li if not (dev_server.PREFER_LOCAL_DEVSERVER and pool and board): 1213*9c5db199SXin Li return None 1214*9c5db199SXin Li 1215*9c5db199SXin Li hosts = list(get_host_query( 1216*9c5db199SXin Li multiple_labels=('pool:%s' % pool, 'board:%s' % board), 1217*9c5db199SXin Li exclude_only_if_needed_labels=False, 1218*9c5db199SXin Li valid_only=True, 1219*9c5db199SXin Li filter_data={}, 1220*9c5db199SXin Li )) 1221*9c5db199SXin Li if not hosts: 1222*9c5db199SXin Li return None 1223*9c5db199SXin Li else: 1224*9c5db199SXin Li return hosts[0].hostname 1225