xref: /aosp_15_r20/external/autotest/frontend/afe/rpc_utils.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1*9c5db199SXin Li# pylint: disable=missing-docstring
2*9c5db199SXin Li"""
3*9c5db199SXin LiUtility functions for rpc_interface.py.  We keep them in a separate file so that
4*9c5db199SXin Lionly RPC interface functions go into that file.
5*9c5db199SXin Li"""
6*9c5db199SXin Li
7*9c5db199SXin Li__author__ = '[email protected] (Steve Howard)'
8*9c5db199SXin Li
9*9c5db199SXin Liimport collections
10*9c5db199SXin Liimport datetime
11*9c5db199SXin Liimport inspect
12*9c5db199SXin Liimport logging
13*9c5db199SXin Liimport os
14*9c5db199SXin Lifrom functools import wraps
15*9c5db199SXin Li
16*9c5db199SXin Liimport django.db.utils
17*9c5db199SXin Liimport django.http
18*9c5db199SXin Liimport six
19*9c5db199SXin Lifrom autotest_lib.client.common_lib import (control_data, error, global_config,
20*9c5db199SXin Li                                            time_utils)
21*9c5db199SXin Lifrom autotest_lib.client.common_lib.cros import dev_server
22*9c5db199SXin Lifrom autotest_lib.frontend import thread_local
23*9c5db199SXin Lifrom autotest_lib.frontend.afe import model_logic, models
24*9c5db199SXin Lifrom autotest_lib.server import utils as server_utils
25*9c5db199SXin Lifrom autotest_lib.server.cros import provision
26*9c5db199SXin Lifrom autotest_lib.server.cros.dynamic_suite import frontend_wrappers
27*9c5db199SXin Li
28*9c5db199SXin LiNULL_DATETIME = datetime.datetime.max
29*9c5db199SXin LiNULL_DATE = datetime.date.max
30*9c5db199SXin LiDUPLICATE_KEY_MSG = 'Duplicate entry'
31*9c5db199SXin LiRESPECT_STATIC_LABELS = global_config.global_config.get_config_value(
32*9c5db199SXin Li        'SKYLAB', 'respect_static_labels', type=bool, default=False)
33*9c5db199SXin Li
34*9c5db199SXin Lidef prepare_for_serialization(objects):
35*9c5db199SXin Li    """
36*9c5db199SXin Li    Prepare Python objects to be returned via RPC.
37*9c5db199SXin Li    @param objects: objects to be prepared.
38*9c5db199SXin Li    """
39*9c5db199SXin Li    if (isinstance(objects, list) and len(objects) and
40*9c5db199SXin Li        isinstance(objects[0], dict) and 'id' in objects[0]):
41*9c5db199SXin Li        objects = _gather_unique_dicts(objects)
42*9c5db199SXin Li    return _prepare_data(objects)
43*9c5db199SXin Li
44*9c5db199SXin Li
45*9c5db199SXin Lidef prepare_rows_as_nested_dicts(query, nested_dict_column_names):
46*9c5db199SXin Li    """
47*9c5db199SXin Li    Prepare a Django query to be returned via RPC as a sequence of nested
48*9c5db199SXin Li    dictionaries.
49*9c5db199SXin Li
50*9c5db199SXin Li    @param query - A Django model query object with a select_related() method.
51*9c5db199SXin Li    @param nested_dict_column_names - A list of column/attribute names for the
52*9c5db199SXin Li            rows returned by query to expand into nested dictionaries using
53*9c5db199SXin Li            their get_object_dict() method when not None.
54*9c5db199SXin Li
55*9c5db199SXin Li    @returns An list suitable to returned in an RPC.
56*9c5db199SXin Li    """
57*9c5db199SXin Li    all_dicts = []
58*9c5db199SXin Li    for row in query.select_related():
59*9c5db199SXin Li        row_dict = row.get_object_dict()
60*9c5db199SXin Li        for column in nested_dict_column_names:
61*9c5db199SXin Li            if row_dict[column] is not None:
62*9c5db199SXin Li                row_dict[column] = getattr(row, column).get_object_dict()
63*9c5db199SXin Li        all_dicts.append(row_dict)
64*9c5db199SXin Li    return prepare_for_serialization(all_dicts)
65*9c5db199SXin Li
66*9c5db199SXin Li
67*9c5db199SXin Lidef _prepare_data(data):
68*9c5db199SXin Li    """
69*9c5db199SXin Li    Recursively process data structures, performing necessary type
70*9c5db199SXin Li    conversions to values in data to allow for RPC serialization:
71*9c5db199SXin Li    -convert datetimes to strings
72*9c5db199SXin Li    -convert tuples and sets to lists
73*9c5db199SXin Li    """
74*9c5db199SXin Li    if isinstance(data, dict):
75*9c5db199SXin Li        new_data = {}
76*9c5db199SXin Li        for key, value in six.iteritems(data):
77*9c5db199SXin Li            new_data[key] = _prepare_data(value)
78*9c5db199SXin Li        return new_data
79*9c5db199SXin Li    elif (isinstance(data, list) or isinstance(data, tuple) or
80*9c5db199SXin Li          isinstance(data, set)):
81*9c5db199SXin Li        return [_prepare_data(item) for item in data]
82*9c5db199SXin Li    elif isinstance(data, datetime.date):
83*9c5db199SXin Li        if data is NULL_DATETIME or data is NULL_DATE:
84*9c5db199SXin Li            return None
85*9c5db199SXin Li        return str(data)
86*9c5db199SXin Li    else:
87*9c5db199SXin Li        return data
88*9c5db199SXin Li
89*9c5db199SXin Li
90*9c5db199SXin Lidef fetchall_as_list_of_dicts(cursor):
91*9c5db199SXin Li    """
92*9c5db199SXin Li    Converts each row in the cursor to a dictionary so that values can be read
93*9c5db199SXin Li    by using the column name.
94*9c5db199SXin Li    @param cursor: The database cursor to read from.
95*9c5db199SXin Li    @returns: A list of each row in the cursor as a dictionary.
96*9c5db199SXin Li    """
97*9c5db199SXin Li    desc = cursor.description
98*9c5db199SXin Li    return [ dict(zip([col[0] for col in desc], row))
99*9c5db199SXin Li             for row in cursor.fetchall() ]
100*9c5db199SXin Li
101*9c5db199SXin Li
102*9c5db199SXin Lidef raw_http_response(response_data, content_type=None):
103*9c5db199SXin Li    response = django.http.HttpResponse(response_data, mimetype=content_type)
104*9c5db199SXin Li    response['Content-length'] = str(len(response.content))
105*9c5db199SXin Li    return response
106*9c5db199SXin Li
107*9c5db199SXin Li
108*9c5db199SXin Lidef _gather_unique_dicts(dict_iterable):
109*9c5db199SXin Li    """\
110*9c5db199SXin Li    Pick out unique objects (by ID) from an iterable of object dicts.
111*9c5db199SXin Li    """
112*9c5db199SXin Li    objects = collections.OrderedDict()
113*9c5db199SXin Li    for obj in dict_iterable:
114*9c5db199SXin Li        objects.setdefault(obj['id'], obj)
115*9c5db199SXin Li    return list(objects.values())
116*9c5db199SXin Li
117*9c5db199SXin Li
118*9c5db199SXin Lidef extra_job_status_filters(not_yet_run=False, running=False, finished=False):
119*9c5db199SXin Li    """\
120*9c5db199SXin Li    Generate a SQL WHERE clause for job status filtering, and return it in
121*9c5db199SXin Li    a dict of keyword args to pass to query.extra().
122*9c5db199SXin Li    * not_yet_run: all HQEs are Queued
123*9c5db199SXin Li    * finished: all HQEs are complete
124*9c5db199SXin Li    * running: everything else
125*9c5db199SXin Li    """
126*9c5db199SXin Li    if not (not_yet_run or running or finished):
127*9c5db199SXin Li        return {}
128*9c5db199SXin Li    not_queued = ('(SELECT job_id FROM afe_host_queue_entries '
129*9c5db199SXin Li                  'WHERE status != "%s")'
130*9c5db199SXin Li                  % models.HostQueueEntry.Status.QUEUED)
131*9c5db199SXin Li    not_finished = ('(SELECT job_id FROM afe_host_queue_entries '
132*9c5db199SXin Li                    'WHERE not complete)')
133*9c5db199SXin Li
134*9c5db199SXin Li    where = []
135*9c5db199SXin Li    if not_yet_run:
136*9c5db199SXin Li        where.append('id NOT IN ' + not_queued)
137*9c5db199SXin Li    if running:
138*9c5db199SXin Li        where.append('(id IN %s) AND (id IN %s)' % (not_queued, not_finished))
139*9c5db199SXin Li    if finished:
140*9c5db199SXin Li        where.append('id NOT IN ' + not_finished)
141*9c5db199SXin Li    return {'where': [' OR '.join(['(%s)' % x for x in where])]}
142*9c5db199SXin Li
143*9c5db199SXin Li
144*9c5db199SXin Lidef extra_job_type_filters(extra_args, suite=False,
145*9c5db199SXin Li                           sub=False, standalone=False):
146*9c5db199SXin Li    """\
147*9c5db199SXin Li    Generate a SQL WHERE clause for job status filtering, and return it in
148*9c5db199SXin Li    a dict of keyword args to pass to query.extra().
149*9c5db199SXin Li
150*9c5db199SXin Li    param extra_args: a dict of existing extra_args.
151*9c5db199SXin Li
152*9c5db199SXin Li    No more than one of the parameters should be passed as True:
153*9c5db199SXin Li    * suite: job which is parent of other jobs
154*9c5db199SXin Li    * sub: job with a parent job
155*9c5db199SXin Li    * standalone: job with no child or parent jobs
156*9c5db199SXin Li    """
157*9c5db199SXin Li    assert not ((suite and sub) or
158*9c5db199SXin Li                (suite and standalone) or
159*9c5db199SXin Li                (sub and standalone)), ('Cannot specify more than one '
160*9c5db199SXin Li                                        'filter to this function')
161*9c5db199SXin Li
162*9c5db199SXin Li    where = extra_args.get('where', [])
163*9c5db199SXin Li    parent_job_id = ('DISTINCT parent_job_id')
164*9c5db199SXin Li    child_job_id = ('id')
165*9c5db199SXin Li    filter_common = ('(SELECT %s FROM afe_jobs '
166*9c5db199SXin Li                     'WHERE parent_job_id IS NOT NULL)')
167*9c5db199SXin Li
168*9c5db199SXin Li    if suite:
169*9c5db199SXin Li        where.append('id IN ' + filter_common % parent_job_id)
170*9c5db199SXin Li    elif sub:
171*9c5db199SXin Li        where.append('id IN ' + filter_common % child_job_id)
172*9c5db199SXin Li    elif standalone:
173*9c5db199SXin Li        where.append('NOT EXISTS (SELECT 1 from afe_jobs AS sub_query '
174*9c5db199SXin Li                     'WHERE parent_job_id IS NOT NULL'
175*9c5db199SXin Li                     ' AND (sub_query.parent_job_id=afe_jobs.id'
176*9c5db199SXin Li                     ' OR sub_query.id=afe_jobs.id))')
177*9c5db199SXin Li    else:
178*9c5db199SXin Li        return extra_args
179*9c5db199SXin Li
180*9c5db199SXin Li    extra_args['where'] = where
181*9c5db199SXin Li    return extra_args
182*9c5db199SXin Li
183*9c5db199SXin Li
184*9c5db199SXin Lidef get_host_query(multiple_labels, exclude_only_if_needed_labels,
185*9c5db199SXin Li                   valid_only, filter_data):
186*9c5db199SXin Li    """
187*9c5db199SXin Li    @param exclude_only_if_needed_labels: Deprecated. By default it's false.
188*9c5db199SXin Li    """
189*9c5db199SXin Li    if valid_only:
190*9c5db199SXin Li        initial_query = models.Host.valid_objects.all()
191*9c5db199SXin Li    else:
192*9c5db199SXin Li        initial_query = models.Host.objects.all()
193*9c5db199SXin Li
194*9c5db199SXin Li    try:
195*9c5db199SXin Li        hosts = models.Host.get_hosts_with_labels(
196*9c5db199SXin Li                multiple_labels, initial_query)
197*9c5db199SXin Li        if not hosts:
198*9c5db199SXin Li            return hosts
199*9c5db199SXin Li
200*9c5db199SXin Li        return models.Host.query_objects(filter_data, initial_query=hosts)
201*9c5db199SXin Li    except models.Label.DoesNotExist:
202*9c5db199SXin Li        return models.Host.objects.none()
203*9c5db199SXin Li
204*9c5db199SXin Li
205*9c5db199SXin Liclass InconsistencyException(Exception):
206*9c5db199SXin Li    'Raised when a list of objects does not have a consistent value'
207*9c5db199SXin Li
208*9c5db199SXin Li
209*9c5db199SXin Lidef get_consistent_value(objects, field):
210*9c5db199SXin Li    if not objects:
211*9c5db199SXin Li        # well a list of nothing is consistent
212*9c5db199SXin Li        return None
213*9c5db199SXin Li
214*9c5db199SXin Li    value = getattr(objects[0], field)
215*9c5db199SXin Li    for obj in objects:
216*9c5db199SXin Li        this_value = getattr(obj, field)
217*9c5db199SXin Li        if this_value != value:
218*9c5db199SXin Li            raise InconsistencyException(objects[0], obj)
219*9c5db199SXin Li    return value
220*9c5db199SXin Li
221*9c5db199SXin Li
222*9c5db199SXin Lidef afe_test_dict_to_test_object(test_dict):
223*9c5db199SXin Li    if not isinstance(test_dict, dict):
224*9c5db199SXin Li        return test_dict
225*9c5db199SXin Li
226*9c5db199SXin Li    numerized_dict = {}
227*9c5db199SXin Li    for key, value in six.iteritems(test_dict):
228*9c5db199SXin Li        try:
229*9c5db199SXin Li            numerized_dict[key] = int(value)
230*9c5db199SXin Li        except (ValueError, TypeError):
231*9c5db199SXin Li            numerized_dict[key] = value
232*9c5db199SXin Li
233*9c5db199SXin Li    return type('TestObject', (object,), numerized_dict)
234*9c5db199SXin Li
235*9c5db199SXin Li
236*9c5db199SXin Lidef _check_is_server_test(test_type):
237*9c5db199SXin Li    """Checks if the test type is a server test.
238*9c5db199SXin Li
239*9c5db199SXin Li    @param test_type The test type in enum integer or string.
240*9c5db199SXin Li
241*9c5db199SXin Li    @returns A boolean to identify if the test type is server test.
242*9c5db199SXin Li    """
243*9c5db199SXin Li    if test_type is not None:
244*9c5db199SXin Li        if isinstance(test_type, six.string_types):
245*9c5db199SXin Li            try:
246*9c5db199SXin Li                test_type = control_data.CONTROL_TYPE.get_value(test_type)
247*9c5db199SXin Li            except AttributeError:
248*9c5db199SXin Li                return False
249*9c5db199SXin Li        return (test_type == control_data.CONTROL_TYPE.SERVER)
250*9c5db199SXin Li    return False
251*9c5db199SXin Li
252*9c5db199SXin Li
253*9c5db199SXin Lidef prepare_generate_control_file(tests, profilers, db_tests=True):
254*9c5db199SXin Li    if db_tests:
255*9c5db199SXin Li        test_objects = [models.Test.smart_get(test) for test in tests]
256*9c5db199SXin Li    else:
257*9c5db199SXin Li        test_objects = [afe_test_dict_to_test_object(test) for test in tests]
258*9c5db199SXin Li
259*9c5db199SXin Li    profiler_objects = [models.Profiler.smart_get(profiler)
260*9c5db199SXin Li                        for profiler in profilers]
261*9c5db199SXin Li    # ensure tests are all the same type
262*9c5db199SXin Li    try:
263*9c5db199SXin Li        test_type = get_consistent_value(test_objects, 'test_type')
264*9c5db199SXin Li    except InconsistencyException as exc:
265*9c5db199SXin Li        test1, test2 = exc.args
266*9c5db199SXin Li        raise model_logic.ValidationError(
267*9c5db199SXin Li            {'tests' : 'You cannot run both test_suites and server-side '
268*9c5db199SXin Li             'tests together (tests %s and %s differ' % (
269*9c5db199SXin Li            test1.name, test2.name)})
270*9c5db199SXin Li
271*9c5db199SXin Li    is_server = _check_is_server_test(test_type)
272*9c5db199SXin Li    if test_objects:
273*9c5db199SXin Li        synch_count = max(test.sync_count for test in test_objects)
274*9c5db199SXin Li    else:
275*9c5db199SXin Li        synch_count = 1
276*9c5db199SXin Li
277*9c5db199SXin Li    if db_tests:
278*9c5db199SXin Li        dependencies = set(label.name for label
279*9c5db199SXin Li                           in models.Label.objects.filter(test__in=test_objects))
280*9c5db199SXin Li    else:
281*9c5db199SXin Li        dependencies = reduce(
282*9c5db199SXin Li                set.union, [set(test.dependencies) for test in test_objects])
283*9c5db199SXin Li
284*9c5db199SXin Li    cf_info = dict(is_server=is_server, synch_count=synch_count,
285*9c5db199SXin Li                   dependencies=list(dependencies))
286*9c5db199SXin Li    return cf_info, test_objects, profiler_objects
287*9c5db199SXin Li
288*9c5db199SXin Li
289*9c5db199SXin Lidef check_job_dependencies(host_objects, job_dependencies):
290*9c5db199SXin Li    """
291*9c5db199SXin Li    Check that a set of machines satisfies a job's dependencies.
292*9c5db199SXin Li    host_objects: list of models.Host objects
293*9c5db199SXin Li    job_dependencies: list of names of labels
294*9c5db199SXin Li    """
295*9c5db199SXin Li    # check that hosts satisfy dependencies
296*9c5db199SXin Li    host_ids = [host.id for host in host_objects]
297*9c5db199SXin Li    hosts_in_job = models.Host.objects.filter(id__in=host_ids)
298*9c5db199SXin Li    ok_hosts = hosts_in_job
299*9c5db199SXin Li    for index, dependency in enumerate(job_dependencies):
300*9c5db199SXin Li        if not provision.is_for_special_action(dependency):
301*9c5db199SXin Li            try:
302*9c5db199SXin Li                label = models.Label.smart_get(dependency)
303*9c5db199SXin Li            except models.Label.DoesNotExist:
304*9c5db199SXin Li                logging.info(
305*9c5db199SXin Li                        'Label %r does not exist, so it cannot '
306*9c5db199SXin Li                        'be replaced by static label.', dependency)
307*9c5db199SXin Li                label = None
308*9c5db199SXin Li
309*9c5db199SXin Li            if label is not None and label.is_replaced_by_static():
310*9c5db199SXin Li                ok_hosts = ok_hosts.filter(static_labels__name=dependency)
311*9c5db199SXin Li            else:
312*9c5db199SXin Li                ok_hosts = ok_hosts.filter(labels__name=dependency)
313*9c5db199SXin Li
314*9c5db199SXin Li    failing_hosts = (set(host.hostname for host in host_objects) -
315*9c5db199SXin Li                     set(host.hostname for host in ok_hosts))
316*9c5db199SXin Li    if failing_hosts:
317*9c5db199SXin Li        raise model_logic.ValidationError(
318*9c5db199SXin Li            {'hosts' : 'Host(s) failed to meet job dependencies (' +
319*9c5db199SXin Li                       (', '.join(job_dependencies)) + '): ' +
320*9c5db199SXin Li                       (', '.join(failing_hosts))})
321*9c5db199SXin Li
322*9c5db199SXin Li
323*9c5db199SXin Lidef check_job_metahost_dependencies(metahost_objects, job_dependencies):
324*9c5db199SXin Li    """
325*9c5db199SXin Li    Check that at least one machine within the metahost spec satisfies the job's
326*9c5db199SXin Li    dependencies.
327*9c5db199SXin Li
328*9c5db199SXin Li    @param metahost_objects A list of label objects representing the metahosts.
329*9c5db199SXin Li    @param job_dependencies A list of strings of the required label names.
330*9c5db199SXin Li    @raises NoEligibleHostException If a metahost cannot run the job.
331*9c5db199SXin Li    """
332*9c5db199SXin Li    for metahost in metahost_objects:
333*9c5db199SXin Li        if metahost.is_replaced_by_static():
334*9c5db199SXin Li            static_metahost = models.StaticLabel.smart_get(metahost.name)
335*9c5db199SXin Li            hosts = models.Host.objects.filter(static_labels=static_metahost)
336*9c5db199SXin Li        else:
337*9c5db199SXin Li            hosts = models.Host.objects.filter(labels=metahost)
338*9c5db199SXin Li
339*9c5db199SXin Li        for label_name in job_dependencies:
340*9c5db199SXin Li            if not provision.is_for_special_action(label_name):
341*9c5db199SXin Li                try:
342*9c5db199SXin Li                    label = models.Label.smart_get(label_name)
343*9c5db199SXin Li                except models.Label.DoesNotExist:
344*9c5db199SXin Li                    logging.info('Label %r does not exist, so it cannot '
345*9c5db199SXin Li                                 'be replaced by static label.', label_name)
346*9c5db199SXin Li                    label = None
347*9c5db199SXin Li
348*9c5db199SXin Li                if label is not None and label.is_replaced_by_static():
349*9c5db199SXin Li                    hosts = hosts.filter(static_labels__name=label_name)
350*9c5db199SXin Li                else:
351*9c5db199SXin Li                    hosts = hosts.filter(labels__name=label_name)
352*9c5db199SXin Li
353*9c5db199SXin Li        if not any(hosts):
354*9c5db199SXin Li            raise error.NoEligibleHostException("No hosts within %s satisfy %s."
355*9c5db199SXin Li                    % (metahost.name, ', '.join(job_dependencies)))
356*9c5db199SXin Li
357*9c5db199SXin Li
358*9c5db199SXin Lidef _execution_key_for(host_queue_entry):
359*9c5db199SXin Li    return (host_queue_entry.job.id, host_queue_entry.execution_subdir)
360*9c5db199SXin Li
361*9c5db199SXin Li
362*9c5db199SXin Lidef check_abort_synchronous_jobs(host_queue_entries):
363*9c5db199SXin Li    # ensure user isn't aborting part of a synchronous autoserv execution
364*9c5db199SXin Li    count_per_execution = {}
365*9c5db199SXin Li    for queue_entry in host_queue_entries:
366*9c5db199SXin Li        key = _execution_key_for(queue_entry)
367*9c5db199SXin Li        count_per_execution.setdefault(key, 0)
368*9c5db199SXin Li        count_per_execution[key] += 1
369*9c5db199SXin Li
370*9c5db199SXin Li    for queue_entry in host_queue_entries:
371*9c5db199SXin Li        if not queue_entry.execution_subdir:
372*9c5db199SXin Li            continue
373*9c5db199SXin Li        execution_count = count_per_execution[_execution_key_for(queue_entry)]
374*9c5db199SXin Li        if execution_count < queue_entry.job.synch_count:
375*9c5db199SXin Li            raise model_logic.ValidationError(
376*9c5db199SXin Li                {'' : 'You cannot abort part of a synchronous job execution '
377*9c5db199SXin Li                      '(%d/%s), %d included, %d expected'
378*9c5db199SXin Li                      % (queue_entry.job.id, queue_entry.execution_subdir,
379*9c5db199SXin Li                         execution_count, queue_entry.job.synch_count)})
380*9c5db199SXin Li
381*9c5db199SXin Li
382*9c5db199SXin Lidef check_modify_host(update_data):
383*9c5db199SXin Li    """
384*9c5db199SXin Li    Check modify_host* requests.
385*9c5db199SXin Li
386*9c5db199SXin Li    @param update_data: A dictionary with the changes to make to a host
387*9c5db199SXin Li            or hosts.
388*9c5db199SXin Li    """
389*9c5db199SXin Li    # Only the scheduler (monitor_db) is allowed to modify Host status.
390*9c5db199SXin Li    # Otherwise race conditions happen as a hosts state is changed out from
391*9c5db199SXin Li    # beneath tasks being run on a host.
392*9c5db199SXin Li    if 'status' in update_data:
393*9c5db199SXin Li        raise model_logic.ValidationError({
394*9c5db199SXin Li                'status': 'Host status can not be modified by the frontend.'})
395*9c5db199SXin Li
396*9c5db199SXin Li
397*9c5db199SXin Lidef check_modify_host_locking(host, update_data):
398*9c5db199SXin Li    """
399*9c5db199SXin Li    Checks when locking/unlocking has been requested if the host is already
400*9c5db199SXin Li    locked/unlocked.
401*9c5db199SXin Li
402*9c5db199SXin Li    @param host: models.Host object to be modified
403*9c5db199SXin Li    @param update_data: A dictionary with the changes to make to the host.
404*9c5db199SXin Li    """
405*9c5db199SXin Li    locked = update_data.get('locked', None)
406*9c5db199SXin Li    lock_reason = update_data.get('lock_reason', None)
407*9c5db199SXin Li    if locked is not None:
408*9c5db199SXin Li        if locked and host.locked:
409*9c5db199SXin Li            raise model_logic.ValidationError({
410*9c5db199SXin Li                    'locked': 'Host %s already locked by %s on %s.' %
411*9c5db199SXin Li                    (host.hostname, host.locked_by, host.lock_time)})
412*9c5db199SXin Li        if not locked and not host.locked:
413*9c5db199SXin Li            raise model_logic.ValidationError({
414*9c5db199SXin Li                    'locked': 'Host %s already unlocked.' % host.hostname})
415*9c5db199SXin Li        if locked and not lock_reason and not host.locked:
416*9c5db199SXin Li            raise model_logic.ValidationError({
417*9c5db199SXin Li                    'locked': 'Please provide a reason for locking Host %s' %
418*9c5db199SXin Li                    host.hostname})
419*9c5db199SXin Li
420*9c5db199SXin Li
421*9c5db199SXin Lidef get_motd():
422*9c5db199SXin Li    dirname = os.path.dirname(__file__)
423*9c5db199SXin Li    filename = os.path.join(dirname, "..", "..", "motd.txt")
424*9c5db199SXin Li    text = ''
425*9c5db199SXin Li    try:
426*9c5db199SXin Li        fp = open(filename, "r")
427*9c5db199SXin Li        try:
428*9c5db199SXin Li            text = fp.read()
429*9c5db199SXin Li        finally:
430*9c5db199SXin Li            fp.close()
431*9c5db199SXin Li    except:
432*9c5db199SXin Li        pass
433*9c5db199SXin Li
434*9c5db199SXin Li    return text
435*9c5db199SXin Li
436*9c5db199SXin Li
437*9c5db199SXin Lidef _get_metahost_counts(metahost_objects):
438*9c5db199SXin Li    metahost_counts = {}
439*9c5db199SXin Li    for metahost in metahost_objects:
440*9c5db199SXin Li        metahost_counts.setdefault(metahost, 0)
441*9c5db199SXin Li        metahost_counts[metahost] += 1
442*9c5db199SXin Li    return metahost_counts
443*9c5db199SXin Li
444*9c5db199SXin Li
445*9c5db199SXin Lidef get_job_info(job, preserve_metahosts=False, queue_entry_filter_data=None):
446*9c5db199SXin Li    hosts = []
447*9c5db199SXin Li    one_time_hosts = []
448*9c5db199SXin Li    meta_hosts = []
449*9c5db199SXin Li    hostless = False
450*9c5db199SXin Li
451*9c5db199SXin Li    queue_entries = job.hostqueueentry_set.all()
452*9c5db199SXin Li    if queue_entry_filter_data:
453*9c5db199SXin Li        queue_entries = models.HostQueueEntry.query_objects(
454*9c5db199SXin Li            queue_entry_filter_data, initial_query=queue_entries)
455*9c5db199SXin Li
456*9c5db199SXin Li    for queue_entry in queue_entries:
457*9c5db199SXin Li        if (queue_entry.host and (preserve_metahosts or
458*9c5db199SXin Li                                  not queue_entry.meta_host)):
459*9c5db199SXin Li            if queue_entry.deleted:
460*9c5db199SXin Li                continue
461*9c5db199SXin Li            if queue_entry.host.invalid:
462*9c5db199SXin Li                one_time_hosts.append(queue_entry.host)
463*9c5db199SXin Li            else:
464*9c5db199SXin Li                hosts.append(queue_entry.host)
465*9c5db199SXin Li        elif queue_entry.meta_host:
466*9c5db199SXin Li            meta_hosts.append(queue_entry.meta_host)
467*9c5db199SXin Li        else:
468*9c5db199SXin Li            hostless = True
469*9c5db199SXin Li
470*9c5db199SXin Li    meta_host_counts = _get_metahost_counts(meta_hosts)
471*9c5db199SXin Li
472*9c5db199SXin Li    info = dict(dependencies=[label.name for label
473*9c5db199SXin Li                              in job.dependency_labels.all()],
474*9c5db199SXin Li                hosts=hosts,
475*9c5db199SXin Li                meta_hosts=meta_hosts,
476*9c5db199SXin Li                meta_host_counts=meta_host_counts,
477*9c5db199SXin Li                one_time_hosts=one_time_hosts,
478*9c5db199SXin Li                hostless=hostless)
479*9c5db199SXin Li    return info
480*9c5db199SXin Li
481*9c5db199SXin Li
482*9c5db199SXin Lidef check_for_duplicate_hosts(host_objects):
483*9c5db199SXin Li    host_counts = collections.Counter(host_objects)
484*9c5db199SXin Li    duplicate_hostnames = {
485*9c5db199SXin Li            host.hostname
486*9c5db199SXin Li            for host, count in six.iteritems(host_counts) if count > 1
487*9c5db199SXin Li    }
488*9c5db199SXin Li    if duplicate_hostnames:
489*9c5db199SXin Li        raise model_logic.ValidationError(
490*9c5db199SXin Li                {'hosts' : 'Duplicate hosts: %s'
491*9c5db199SXin Li                 % ', '.join(duplicate_hostnames)})
492*9c5db199SXin Li
493*9c5db199SXin Li
494*9c5db199SXin Lidef create_new_job(owner, options, host_objects, metahost_objects):
495*9c5db199SXin Li    all_host_objects = host_objects + metahost_objects
496*9c5db199SXin Li    dependencies = options.get('dependencies', [])
497*9c5db199SXin Li    synch_count = options.get('synch_count')
498*9c5db199SXin Li
499*9c5db199SXin Li    if synch_count is not None and synch_count > len(all_host_objects):
500*9c5db199SXin Li        raise model_logic.ValidationError(
501*9c5db199SXin Li                {'hosts':
502*9c5db199SXin Li                 'only %d hosts provided for job with synch_count = %d' %
503*9c5db199SXin Li                 (len(all_host_objects), synch_count)})
504*9c5db199SXin Li
505*9c5db199SXin Li    check_for_duplicate_hosts(host_objects)
506*9c5db199SXin Li
507*9c5db199SXin Li    for label_name in dependencies:
508*9c5db199SXin Li        if provision.is_for_special_action(label_name):
509*9c5db199SXin Li            # TODO: We could save a few queries
510*9c5db199SXin Li            # if we had a bulk ensure-label-exists function, which used
511*9c5db199SXin Li            # a bulk .get() call. The win is probably very small.
512*9c5db199SXin Li            _ensure_label_exists(label_name)
513*9c5db199SXin Li
514*9c5db199SXin Li    # This only checks targeted hosts, not hosts eligible due to the metahost
515*9c5db199SXin Li    check_job_dependencies(host_objects, dependencies)
516*9c5db199SXin Li    check_job_metahost_dependencies(metahost_objects, dependencies)
517*9c5db199SXin Li
518*9c5db199SXin Li    options['dependencies'] = list(
519*9c5db199SXin Li            models.Label.objects.filter(name__in=dependencies))
520*9c5db199SXin Li
521*9c5db199SXin Li    job = models.Job.create(owner=owner, options=options,
522*9c5db199SXin Li                            hosts=all_host_objects)
523*9c5db199SXin Li    job.queue(all_host_objects,
524*9c5db199SXin Li              is_template=options.get('is_template', False))
525*9c5db199SXin Li    return job.id
526*9c5db199SXin Li
527*9c5db199SXin Li
528*9c5db199SXin Lidef _ensure_label_exists(name):
529*9c5db199SXin Li    """
530*9c5db199SXin Li    Ensure that a label called |name| exists in the Django models.
531*9c5db199SXin Li
532*9c5db199SXin Li    This function is to be called from within afe rpcs only, as an
533*9c5db199SXin Li    alternative to server.cros.provision.ensure_label_exists(...). It works
534*9c5db199SXin Li    by Django model manipulation, rather than by making another create_label
535*9c5db199SXin Li    rpc call.
536*9c5db199SXin Li
537*9c5db199SXin Li    @param name: the label to check for/create.
538*9c5db199SXin Li    @raises ValidationError: There was an error in the response that was
539*9c5db199SXin Li                             not because the label already existed.
540*9c5db199SXin Li    @returns True is a label was created, False otherwise.
541*9c5db199SXin Li    """
542*9c5db199SXin Li    # Make sure this function is not called on shards but only on main.
543*9c5db199SXin Li    assert not server_utils.is_shard()
544*9c5db199SXin Li    try:
545*9c5db199SXin Li        models.Label.objects.get(name=name)
546*9c5db199SXin Li    except models.Label.DoesNotExist:
547*9c5db199SXin Li        try:
548*9c5db199SXin Li            new_label = models.Label.objects.create(name=name)
549*9c5db199SXin Li            new_label.save()
550*9c5db199SXin Li            return True
551*9c5db199SXin Li        except django.db.utils.IntegrityError as e:
552*9c5db199SXin Li            # It is possible that another suite/test already
553*9c5db199SXin Li            # created the label between the check and save.
554*9c5db199SXin Li            if DUPLICATE_KEY_MSG in str(e):
555*9c5db199SXin Li                return False
556*9c5db199SXin Li            else:
557*9c5db199SXin Li                raise
558*9c5db199SXin Li    return False
559*9c5db199SXin Li
560*9c5db199SXin Li
561*9c5db199SXin Lidef find_platform(hostname, label_list):
562*9c5db199SXin Li    """
563*9c5db199SXin Li    Figure out the platform name for the given host
564*9c5db199SXin Li    object.  If none, the return value for either will be None.
565*9c5db199SXin Li
566*9c5db199SXin Li    @param hostname: The hostname to find platform.
567*9c5db199SXin Li    @param label_list: The label list to find platform.
568*9c5db199SXin Li
569*9c5db199SXin Li    @returns platform name for the given host.
570*9c5db199SXin Li    """
571*9c5db199SXin Li    platforms = [label.name for label in label_list if label.platform]
572*9c5db199SXin Li    if not platforms:
573*9c5db199SXin Li        platform = None
574*9c5db199SXin Li    else:
575*9c5db199SXin Li        platform = platforms[0]
576*9c5db199SXin Li
577*9c5db199SXin Li    if len(platforms) > 1:
578*9c5db199SXin Li        raise ValueError('Host %s has more than one platform: %s' %
579*9c5db199SXin Li                         (hostname, ', '.join(platforms)))
580*9c5db199SXin Li
581*9c5db199SXin Li    return platform
582*9c5db199SXin Li
583*9c5db199SXin Li
584*9c5db199SXin Li# support for get_host_queue_entries_and_special_tasks()
585*9c5db199SXin Li
586*9c5db199SXin Lidef _common_entry_to_dict(entry, type, job_dict, exec_path, status, started_on):
587*9c5db199SXin Li    return dict(type=type,
588*9c5db199SXin Li                host=entry['host'],
589*9c5db199SXin Li                job=job_dict,
590*9c5db199SXin Li                execution_path=exec_path,
591*9c5db199SXin Li                status=status,
592*9c5db199SXin Li                started_on=started_on,
593*9c5db199SXin Li                id=str(entry['id']) + type,
594*9c5db199SXin Li                oid=entry['id'])
595*9c5db199SXin Li
596*9c5db199SXin Li
597*9c5db199SXin Lidef _special_task_to_dict(task, queue_entries):
598*9c5db199SXin Li    """Transforms a special task dictionary to another form of dictionary.
599*9c5db199SXin Li
600*9c5db199SXin Li    @param task           Special task as a dictionary type
601*9c5db199SXin Li    @param queue_entries  Host queue entries as a list of dictionaries.
602*9c5db199SXin Li
603*9c5db199SXin Li    @return Transformed dictionary for a special task.
604*9c5db199SXin Li    """
605*9c5db199SXin Li    job_dict = None
606*9c5db199SXin Li    if task['queue_entry']:
607*9c5db199SXin Li        # Scan queue_entries to get the job detail info.
608*9c5db199SXin Li        for qentry in queue_entries:
609*9c5db199SXin Li            if task['queue_entry']['id'] == qentry['id']:
610*9c5db199SXin Li                job_dict = qentry['job']
611*9c5db199SXin Li                break
612*9c5db199SXin Li        # If not found, get it from DB.
613*9c5db199SXin Li        if job_dict is None:
614*9c5db199SXin Li            job = models.Job.objects.get(id=task['queue_entry']['job'])
615*9c5db199SXin Li            job_dict = job.get_object_dict()
616*9c5db199SXin Li
617*9c5db199SXin Li    exec_path = server_utils.get_special_task_exec_path(
618*9c5db199SXin Li            task['host']['hostname'], task['id'], task['task'],
619*9c5db199SXin Li            time_utils.time_string_to_datetime(task['time_requested']))
620*9c5db199SXin Li    status = server_utils.get_special_task_status(
621*9c5db199SXin Li            task['is_complete'], task['success'], task['is_active'])
622*9c5db199SXin Li    return _common_entry_to_dict(task, task['task'], job_dict,
623*9c5db199SXin Li            exec_path, status, task['time_started'])
624*9c5db199SXin Li
625*9c5db199SXin Li
626*9c5db199SXin Lidef _queue_entry_to_dict(queue_entry):
627*9c5db199SXin Li    job_dict = queue_entry['job']
628*9c5db199SXin Li    tag = server_utils.get_job_tag(job_dict['id'], job_dict['owner'])
629*9c5db199SXin Li    exec_path = server_utils.get_hqe_exec_path(tag,
630*9c5db199SXin Li                                               queue_entry['execution_subdir'])
631*9c5db199SXin Li    return _common_entry_to_dict(queue_entry, 'Job', job_dict, exec_path,
632*9c5db199SXin Li            queue_entry['status'], queue_entry['started_on'])
633*9c5db199SXin Li
634*9c5db199SXin Li
635*9c5db199SXin Lidef prepare_host_queue_entries_and_special_tasks(interleaved_entries,
636*9c5db199SXin Li                                                 queue_entries):
637*9c5db199SXin Li    """
638*9c5db199SXin Li    Prepare for serialization the interleaved entries of host queue entries
639*9c5db199SXin Li    and special tasks.
640*9c5db199SXin Li    Each element in the entries is a dictionary type.
641*9c5db199SXin Li    The special task dictionary has only a job id for a job and lacks
642*9c5db199SXin Li    the detail of the job while the host queue entry dictionary has.
643*9c5db199SXin Li    queue_entries is used to look up the job detail info.
644*9c5db199SXin Li
645*9c5db199SXin Li    @param interleaved_entries  Host queue entries and special tasks as a list
646*9c5db199SXin Li                                of dictionaries.
647*9c5db199SXin Li    @param queue_entries        Host queue entries as a list of dictionaries.
648*9c5db199SXin Li
649*9c5db199SXin Li    @return A post-processed list of dictionaries that is to be serialized.
650*9c5db199SXin Li    """
651*9c5db199SXin Li    dict_list = []
652*9c5db199SXin Li    for e in interleaved_entries:
653*9c5db199SXin Li        # Distinguish the two mixed entries based on the existence of
654*9c5db199SXin Li        # the key "task". If an entry has the key, the entry is for
655*9c5db199SXin Li        # special task. Otherwise, host queue entry.
656*9c5db199SXin Li        if 'task' in e:
657*9c5db199SXin Li            dict_list.append(_special_task_to_dict(e, queue_entries))
658*9c5db199SXin Li        else:
659*9c5db199SXin Li            dict_list.append(_queue_entry_to_dict(e))
660*9c5db199SXin Li    return prepare_for_serialization(dict_list)
661*9c5db199SXin Li
662*9c5db199SXin Li
663*9c5db199SXin Lidef _compute_next_job_for_tasks(queue_entries, special_tasks):
664*9c5db199SXin Li    """
665*9c5db199SXin Li    For each task, try to figure out the next job that ran after that task.
666*9c5db199SXin Li    This is done using two pieces of information:
667*9c5db199SXin Li    * if the task has a queue entry, we can use that entry's job ID.
668*9c5db199SXin Li    * if the task has a time_started, we can try to compare that against the
669*9c5db199SXin Li      started_on field of queue_entries. this isn't guaranteed to work perfectly
670*9c5db199SXin Li      since queue_entries may also have null started_on values.
671*9c5db199SXin Li    * if the task has neither, or if use of time_started fails, just use the
672*9c5db199SXin Li      last computed job ID.
673*9c5db199SXin Li
674*9c5db199SXin Li    @param queue_entries    Host queue entries as a list of dictionaries.
675*9c5db199SXin Li    @param special_tasks    Special tasks as a list of dictionaries.
676*9c5db199SXin Li    """
677*9c5db199SXin Li    next_job_id = None # most recently computed next job
678*9c5db199SXin Li    hqe_index = 0 # index for scanning by started_on times
679*9c5db199SXin Li    for task in special_tasks:
680*9c5db199SXin Li        if task['queue_entry']:
681*9c5db199SXin Li            next_job_id = task['queue_entry']['job']
682*9c5db199SXin Li        elif task['time_started'] is not None:
683*9c5db199SXin Li            for queue_entry in queue_entries[hqe_index:]:
684*9c5db199SXin Li                if queue_entry['started_on'] is None:
685*9c5db199SXin Li                    continue
686*9c5db199SXin Li                t1 = time_utils.time_string_to_datetime(
687*9c5db199SXin Li                        queue_entry['started_on'])
688*9c5db199SXin Li                t2 = time_utils.time_string_to_datetime(task['time_started'])
689*9c5db199SXin Li                if t1 < t2:
690*9c5db199SXin Li                    break
691*9c5db199SXin Li                next_job_id = queue_entry['job']['id']
692*9c5db199SXin Li
693*9c5db199SXin Li        task['next_job_id'] = next_job_id
694*9c5db199SXin Li
695*9c5db199SXin Li        # advance hqe_index to just after next_job_id
696*9c5db199SXin Li        if next_job_id is not None:
697*9c5db199SXin Li            for queue_entry in queue_entries[hqe_index:]:
698*9c5db199SXin Li                if queue_entry['job']['id'] < next_job_id:
699*9c5db199SXin Li                    break
700*9c5db199SXin Li                hqe_index += 1
701*9c5db199SXin Li
702*9c5db199SXin Li
703*9c5db199SXin Lidef interleave_entries(queue_entries, special_tasks):
704*9c5db199SXin Li    """
705*9c5db199SXin Li    Both lists should be ordered by descending ID.
706*9c5db199SXin Li    """
707*9c5db199SXin Li    _compute_next_job_for_tasks(queue_entries, special_tasks)
708*9c5db199SXin Li
709*9c5db199SXin Li    # start with all special tasks that've run since the last job
710*9c5db199SXin Li    interleaved_entries = []
711*9c5db199SXin Li    for task in special_tasks:
712*9c5db199SXin Li        if task['next_job_id'] is not None:
713*9c5db199SXin Li            break
714*9c5db199SXin Li        interleaved_entries.append(task)
715*9c5db199SXin Li
716*9c5db199SXin Li    # now interleave queue entries with the remaining special tasks
717*9c5db199SXin Li    special_task_index = len(interleaved_entries)
718*9c5db199SXin Li    for queue_entry in queue_entries:
719*9c5db199SXin Li        interleaved_entries.append(queue_entry)
720*9c5db199SXin Li        # add all tasks that ran between this job and the previous one
721*9c5db199SXin Li        for task in special_tasks[special_task_index:]:
722*9c5db199SXin Li            if task['next_job_id'] < queue_entry['job']['id']:
723*9c5db199SXin Li                break
724*9c5db199SXin Li            interleaved_entries.append(task)
725*9c5db199SXin Li            special_task_index += 1
726*9c5db199SXin Li
727*9c5db199SXin Li    return interleaved_entries
728*9c5db199SXin Li
729*9c5db199SXin Li
730*9c5db199SXin Lidef bucket_hosts_by_shard(host_objs):
731*9c5db199SXin Li    """Figure out which hosts are on which shards.
732*9c5db199SXin Li
733*9c5db199SXin Li    @param host_objs: A list of host objects.
734*9c5db199SXin Li
735*9c5db199SXin Li    @return: A map of shard hostname: list of hosts on the shard.
736*9c5db199SXin Li    """
737*9c5db199SXin Li    shard_host_map = collections.defaultdict(list)
738*9c5db199SXin Li    for host in host_objs:
739*9c5db199SXin Li        if host.shard:
740*9c5db199SXin Li            shard_host_map[host.shard.hostname].append(host.hostname)
741*9c5db199SXin Li    return shard_host_map
742*9c5db199SXin Li
743*9c5db199SXin Li
744*9c5db199SXin Lidef create_job_common(
745*9c5db199SXin Li        name,
746*9c5db199SXin Li        priority,
747*9c5db199SXin Li        control_type,
748*9c5db199SXin Li        control_file=None,
749*9c5db199SXin Li        hosts=(),
750*9c5db199SXin Li        meta_hosts=(),
751*9c5db199SXin Li        one_time_hosts=(),
752*9c5db199SXin Li        synch_count=None,
753*9c5db199SXin Li        is_template=False,
754*9c5db199SXin Li        timeout=None,
755*9c5db199SXin Li        timeout_mins=None,
756*9c5db199SXin Li        max_runtime_mins=None,
757*9c5db199SXin Li        run_verify=True,
758*9c5db199SXin Li        email_list='',
759*9c5db199SXin Li        dependencies=(),
760*9c5db199SXin Li        reboot_before=None,
761*9c5db199SXin Li        reboot_after=None,
762*9c5db199SXin Li        parse_failed_repair=None,
763*9c5db199SXin Li        hostless=False,
764*9c5db199SXin Li        keyvals=None,
765*9c5db199SXin Li        drone_set=None,
766*9c5db199SXin Li        parent_job_id=None,
767*9c5db199SXin Li        run_reset=True,
768*9c5db199SXin Li        require_ssp=None):
769*9c5db199SXin Li    #pylint: disable-msg=C0111
770*9c5db199SXin Li    """
771*9c5db199SXin Li    Common code between creating "standard" jobs and creating parameterized jobs
772*9c5db199SXin Li    """
773*9c5db199SXin Li    # input validation
774*9c5db199SXin Li    host_args_passed = any((hosts, meta_hosts, one_time_hosts))
775*9c5db199SXin Li    if hostless:
776*9c5db199SXin Li        if host_args_passed:
777*9c5db199SXin Li            raise model_logic.ValidationError({
778*9c5db199SXin Li                    'hostless': 'Hostless jobs cannot include any hosts!'})
779*9c5db199SXin Li        if control_type != control_data.CONTROL_TYPE_NAMES.SERVER:
780*9c5db199SXin Li            raise model_logic.ValidationError({
781*9c5db199SXin Li                    'control_type': 'Hostless jobs cannot use client-side '
782*9c5db199SXin Li                                    'control files'})
783*9c5db199SXin Li    elif not host_args_passed:
784*9c5db199SXin Li        raise model_logic.ValidationError({
785*9c5db199SXin Li            'arguments' : "For host jobs, you must pass at least one of"
786*9c5db199SXin Li                          " 'hosts', 'meta_hosts', 'one_time_hosts'."
787*9c5db199SXin Li            })
788*9c5db199SXin Li    label_objects = list(models.Label.objects.filter(name__in=meta_hosts))
789*9c5db199SXin Li
790*9c5db199SXin Li    # convert hostnames & meta hosts to host/label objects
791*9c5db199SXin Li    host_objects = models.Host.smart_get_bulk(hosts)
792*9c5db199SXin Li    _validate_host_job_sharding(host_objects)
793*9c5db199SXin Li    for host in one_time_hosts:
794*9c5db199SXin Li        this_host = models.Host.create_one_time_host(host)
795*9c5db199SXin Li        host_objects.append(this_host)
796*9c5db199SXin Li
797*9c5db199SXin Li    metahost_objects = []
798*9c5db199SXin Li    meta_host_labels_by_name = {label.name: label for label in label_objects}
799*9c5db199SXin Li    for label_name in meta_hosts:
800*9c5db199SXin Li        if label_name in meta_host_labels_by_name:
801*9c5db199SXin Li            metahost_objects.append(meta_host_labels_by_name[label_name])
802*9c5db199SXin Li        else:
803*9c5db199SXin Li            raise model_logic.ValidationError(
804*9c5db199SXin Li                {'meta_hosts' : 'Label "%s" not found' % label_name})
805*9c5db199SXin Li
806*9c5db199SXin Li    options = dict(name=name,
807*9c5db199SXin Li                   priority=priority,
808*9c5db199SXin Li                   control_file=control_file,
809*9c5db199SXin Li                   control_type=control_type,
810*9c5db199SXin Li                   is_template=is_template,
811*9c5db199SXin Li                   timeout=timeout,
812*9c5db199SXin Li                   timeout_mins=timeout_mins,
813*9c5db199SXin Li                   max_runtime_mins=max_runtime_mins,
814*9c5db199SXin Li                   synch_count=synch_count,
815*9c5db199SXin Li                   run_verify=run_verify,
816*9c5db199SXin Li                   email_list=email_list,
817*9c5db199SXin Li                   dependencies=dependencies,
818*9c5db199SXin Li                   reboot_before=reboot_before,
819*9c5db199SXin Li                   reboot_after=reboot_after,
820*9c5db199SXin Li                   parse_failed_repair=parse_failed_repair,
821*9c5db199SXin Li                   keyvals=keyvals,
822*9c5db199SXin Li                   drone_set=drone_set,
823*9c5db199SXin Li                   parent_job_id=parent_job_id,
824*9c5db199SXin Li                   # TODO(crbug.com/873716) DEPRECATED. Remove entirely.
825*9c5db199SXin Li                   test_retry=0,
826*9c5db199SXin Li                   run_reset=run_reset,
827*9c5db199SXin Li                   require_ssp=require_ssp)
828*9c5db199SXin Li
829*9c5db199SXin Li    return create_new_job(owner=models.User.current_user().login,
830*9c5db199SXin Li                          options=options,
831*9c5db199SXin Li                          host_objects=host_objects,
832*9c5db199SXin Li                          metahost_objects=metahost_objects)
833*9c5db199SXin Li
834*9c5db199SXin Li
835*9c5db199SXin Lidef _validate_host_job_sharding(host_objects):
836*9c5db199SXin Li    """Check that the hosts obey job sharding rules."""
837*9c5db199SXin Li    if not (server_utils.is_shard()
838*9c5db199SXin Li            or _allowed_hosts_for_main_job(host_objects)):
839*9c5db199SXin Li        shard_host_map = bucket_hosts_by_shard(host_objects)
840*9c5db199SXin Li        raise ValueError(
841*9c5db199SXin Li                'The following hosts are on shard(s), please create '
842*9c5db199SXin Li                'seperate jobs for hosts on each shard: %s ' %
843*9c5db199SXin Li                shard_host_map)
844*9c5db199SXin Li
845*9c5db199SXin Li
846*9c5db199SXin Lidef _allowed_hosts_for_main_job(host_objects):
847*9c5db199SXin Li    """Check that the hosts are allowed for a job on main."""
848*9c5db199SXin Li    # We disallow the following jobs on main:
849*9c5db199SXin Li    #   num_shards > 1: this is a job spanning across multiple shards.
850*9c5db199SXin Li    #   num_shards == 1 but number of hosts on shard is less
851*9c5db199SXin Li    #   than total number of hosts: this is a job that spans across
852*9c5db199SXin Li    #   one shard and the main.
853*9c5db199SXin Li    shard_host_map = bucket_hosts_by_shard(host_objects)
854*9c5db199SXin Li    num_shards = len(shard_host_map)
855*9c5db199SXin Li    if num_shards > 1:
856*9c5db199SXin Li        return False
857*9c5db199SXin Li    if num_shards == 1:
858*9c5db199SXin Li        hosts_on_shard = list(shard_host_map.values())[0]
859*9c5db199SXin Li        assert len(hosts_on_shard) <= len(host_objects)
860*9c5db199SXin Li        return len(hosts_on_shard) == len(host_objects)
861*9c5db199SXin Li    else:
862*9c5db199SXin Li        return True
863*9c5db199SXin Li
864*9c5db199SXin Li
865*9c5db199SXin Lidef encode_ascii(control_file):
866*9c5db199SXin Li    """Force a control file to only contain ascii characters.
867*9c5db199SXin Li
868*9c5db199SXin Li    @param control_file: Control file to encode.
869*9c5db199SXin Li
870*9c5db199SXin Li    @returns the control file in an ascii encoding.
871*9c5db199SXin Li
872*9c5db199SXin Li    @raises error.ControlFileMalformed: if encoding fails.
873*9c5db199SXin Li    """
874*9c5db199SXin Li    try:
875*9c5db199SXin Li        return control_file.encode('ascii')
876*9c5db199SXin Li    except UnicodeDecodeError as e:
877*9c5db199SXin Li        raise error.ControlFileMalformed(str(e))
878*9c5db199SXin Li
879*9c5db199SXin Li
880*9c5db199SXin Lidef get_wmatrix_url():
881*9c5db199SXin Li    """Get wmatrix url from config file.
882*9c5db199SXin Li
883*9c5db199SXin Li    @returns the wmatrix url or an empty string.
884*9c5db199SXin Li    """
885*9c5db199SXin Li    return global_config.global_config.get_config_value('AUTOTEST_WEB',
886*9c5db199SXin Li                                                        'wmatrix_url',
887*9c5db199SXin Li                                                        default='')
888*9c5db199SXin Li
889*9c5db199SXin Li
890*9c5db199SXin Lidef get_stainless_url():
891*9c5db199SXin Li    """Get stainless url from config file.
892*9c5db199SXin Li
893*9c5db199SXin Li    @returns the stainless url or an empty string.
894*9c5db199SXin Li    """
895*9c5db199SXin Li    return global_config.global_config.get_config_value('AUTOTEST_WEB',
896*9c5db199SXin Li                                                        'stainless_url',
897*9c5db199SXin Li                                                        default='')
898*9c5db199SXin Li
899*9c5db199SXin Li
900*9c5db199SXin Lidef inject_times_to_filter(start_time_key=None, end_time_key=None,
901*9c5db199SXin Li                         start_time_value=None, end_time_value=None,
902*9c5db199SXin Li                         **filter_data):
903*9c5db199SXin Li    """Inject the key value pairs of start and end time if provided.
904*9c5db199SXin Li
905*9c5db199SXin Li    @param start_time_key: A string represents the filter key of start_time.
906*9c5db199SXin Li    @param end_time_key: A string represents the filter key of end_time.
907*9c5db199SXin Li    @param start_time_value: Start_time value.
908*9c5db199SXin Li    @param end_time_value: End_time value.
909*9c5db199SXin Li
910*9c5db199SXin Li    @returns the injected filter_data.
911*9c5db199SXin Li    """
912*9c5db199SXin Li    if start_time_value:
913*9c5db199SXin Li        filter_data[start_time_key] = start_time_value
914*9c5db199SXin Li    if end_time_value:
915*9c5db199SXin Li        filter_data[end_time_key] = end_time_value
916*9c5db199SXin Li    return filter_data
917*9c5db199SXin Li
918*9c5db199SXin Li
919*9c5db199SXin Lidef inject_times_to_hqe_special_tasks_filters(filter_data_common,
920*9c5db199SXin Li                                              start_time, end_time):
921*9c5db199SXin Li    """Inject start and end time to hqe and special tasks filters.
922*9c5db199SXin Li
923*9c5db199SXin Li    @param filter_data_common: Common filter for hqe and special tasks.
924*9c5db199SXin Li    @param start_time_key: A string represents the filter key of start_time.
925*9c5db199SXin Li    @param end_time_key: A string represents the filter key of end_time.
926*9c5db199SXin Li
927*9c5db199SXin Li    @returns a pair of hqe and special tasks filters.
928*9c5db199SXin Li    """
929*9c5db199SXin Li    filter_data_special_tasks = filter_data_common.copy()
930*9c5db199SXin Li    return (inject_times_to_filter('started_on__gte', 'started_on__lte',
931*9c5db199SXin Li                                   start_time, end_time, **filter_data_common),
932*9c5db199SXin Li           inject_times_to_filter('time_started__gte', 'time_started__lte',
933*9c5db199SXin Li                                  start_time, end_time,
934*9c5db199SXin Li                                  **filter_data_special_tasks))
935*9c5db199SXin Li
936*9c5db199SXin Li
937*9c5db199SXin Lidef retrieve_shard(shard_hostname):
938*9c5db199SXin Li    """
939*9c5db199SXin Li    Retrieves the shard with the given hostname from the database.
940*9c5db199SXin Li
941*9c5db199SXin Li    @param shard_hostname: Hostname of the shard to retrieve
942*9c5db199SXin Li
943*9c5db199SXin Li    @raises models.Shard.DoesNotExist, if no shard with this hostname was found.
944*9c5db199SXin Li
945*9c5db199SXin Li    @returns: Shard object
946*9c5db199SXin Li    """
947*9c5db199SXin Li    return models.Shard.smart_get(shard_hostname)
948*9c5db199SXin Li
949*9c5db199SXin Li
950*9c5db199SXin Lidef find_records_for_shard(shard, known_job_ids, known_host_ids):
951*9c5db199SXin Li    """Find records that should be sent to a shard.
952*9c5db199SXin Li
953*9c5db199SXin Li    @param shard: Shard to find records for.
954*9c5db199SXin Li    @param known_job_ids: List of ids of jobs the shard already has.
955*9c5db199SXin Li    @param known_host_ids: List of ids of hosts the shard already has.
956*9c5db199SXin Li
957*9c5db199SXin Li    @returns: Tuple of lists:
958*9c5db199SXin Li              (hosts, jobs, suite_job_keyvals, invalid_host_ids)
959*9c5db199SXin Li    """
960*9c5db199SXin Li    hosts, invalid_host_ids = models.Host.assign_to_shard(
961*9c5db199SXin Li            shard, known_host_ids)
962*9c5db199SXin Li    jobs = models.Job.assign_to_shard(shard, known_job_ids)
963*9c5db199SXin Li    parent_job_ids = [job.parent_job_id for job in jobs]
964*9c5db199SXin Li    suite_job_keyvals = models.JobKeyval.objects.filter(
965*9c5db199SXin Li            job_id__in=parent_job_ids)
966*9c5db199SXin Li    return hosts, jobs, suite_job_keyvals, invalid_host_ids
967*9c5db199SXin Li
968*9c5db199SXin Li
969*9c5db199SXin Lidef _persist_records_with_type_sent_from_shard(
970*9c5db199SXin Li    shard, records, record_type, *args, **kwargs):
971*9c5db199SXin Li    """
972*9c5db199SXin Li    Handle records of a specified type that were sent to the shard main.
973*9c5db199SXin Li
974*9c5db199SXin Li    @param shard: The shard the records were sent from.
975*9c5db199SXin Li    @param records: The records sent in their serialized format.
976*9c5db199SXin Li    @param record_type: Type of the objects represented by records.
977*9c5db199SXin Li    @param args: Additional arguments that will be passed on to the checks.
978*9c5db199SXin Li    @param kwargs: Additional arguments that will be passed on to the checks.
979*9c5db199SXin Li
980*9c5db199SXin Li    @raises error.UnallowedRecordsSentToMain if any of the checks fail.
981*9c5db199SXin Li
982*9c5db199SXin Li    @returns: List of primary keys of the processed records.
983*9c5db199SXin Li    """
984*9c5db199SXin Li    pks = []
985*9c5db199SXin Li    for serialized_record in records:
986*9c5db199SXin Li        pk = serialized_record['id']
987*9c5db199SXin Li        try:
988*9c5db199SXin Li            current_record = record_type.objects.get(pk=pk)
989*9c5db199SXin Li        except record_type.DoesNotExist:
990*9c5db199SXin Li            raise error.UnallowedRecordsSentToMain(
991*9c5db199SXin Li                'Object with pk %s of type %s does not exist on main.' % (
992*9c5db199SXin Li                    pk, record_type))
993*9c5db199SXin Li
994*9c5db199SXin Li        try:
995*9c5db199SXin Li            current_record._check_update_from_shard(
996*9c5db199SXin Li                shard, serialized_record, *args, **kwargs)
997*9c5db199SXin Li        except error.IgnorableUnallowedRecordsSentToMain:
998*9c5db199SXin Li            # An illegal record change was attempted, but it was of a non-fatal
999*9c5db199SXin Li            # variety. Silently skip this record.
1000*9c5db199SXin Li            pass
1001*9c5db199SXin Li        else:
1002*9c5db199SXin Li            current_record.update_from_serialized(serialized_record)
1003*9c5db199SXin Li            pks.append(pk)
1004*9c5db199SXin Li
1005*9c5db199SXin Li    return pks
1006*9c5db199SXin Li
1007*9c5db199SXin Li
1008*9c5db199SXin Lidef persist_records_sent_from_shard(shard, jobs, hqes):
1009*9c5db199SXin Li    """
1010*9c5db199SXin Li    Checking then saving serialized records sent to main from shard.
1011*9c5db199SXin Li
1012*9c5db199SXin Li    During heartbeats shards upload jobs and hostqueuentries. This performs
1013*9c5db199SXin Li    some checks on these and then updates the existing records for those
1014*9c5db199SXin Li    entries with the updated ones from the heartbeat.
1015*9c5db199SXin Li
1016*9c5db199SXin Li    The checks include:
1017*9c5db199SXin Li    - Checking if the objects sent already exist on the main.
1018*9c5db199SXin Li    - Checking if the objects sent were assigned to this shard.
1019*9c5db199SXin Li    - hostqueueentries must be sent together with their jobs.
1020*9c5db199SXin Li
1021*9c5db199SXin Li    @param shard: The shard the records were sent from.
1022*9c5db199SXin Li    @param jobs: The jobs the shard sent.
1023*9c5db199SXin Li    @param hqes: The hostqueuentries the shart sent.
1024*9c5db199SXin Li
1025*9c5db199SXin Li    @raises error.UnallowedRecordsSentToMain if any of the checks fail.
1026*9c5db199SXin Li    """
1027*9c5db199SXin Li    job_ids_persisted = _persist_records_with_type_sent_from_shard(
1028*9c5db199SXin Li            shard, jobs, models.Job)
1029*9c5db199SXin Li    _persist_records_with_type_sent_from_shard(
1030*9c5db199SXin Li            shard, hqes, models.HostQueueEntry,
1031*9c5db199SXin Li            job_ids_sent=job_ids_persisted)
1032*9c5db199SXin Li
1033*9c5db199SXin Li
1034*9c5db199SXin Lidef forward_single_host_rpc_to_shard(func):
1035*9c5db199SXin Li    """This decorator forwards rpc calls that modify a host to a shard.
1036*9c5db199SXin Li
1037*9c5db199SXin Li    If a host is assigned to a shard, rpcs that change the host attributes should be
1038*9c5db199SXin Li    forwarded to the shard.
1039*9c5db199SXin Li
1040*9c5db199SXin Li    This assumes the first argument of the function represents a host id.
1041*9c5db199SXin Li
1042*9c5db199SXin Li    @param func: The function to decorate
1043*9c5db199SXin Li
1044*9c5db199SXin Li    @returns: The function to replace func with.
1045*9c5db199SXin Li    """
1046*9c5db199SXin Li    def replacement(**kwargs):
1047*9c5db199SXin Li        # Only keyword arguments can be accepted here, as we need the argument
1048*9c5db199SXin Li        # names to send the rpc. serviceHandler always provides arguments with
1049*9c5db199SXin Li        # their keywords, so this is not a problem.
1050*9c5db199SXin Li
1051*9c5db199SXin Li        # A host record (identified by kwargs['id']) can be deleted in
1052*9c5db199SXin Li        # func(). Therefore, we should save the data that can be needed later
1053*9c5db199SXin Li        # before func() is called.
1054*9c5db199SXin Li        shard_hostname = None
1055*9c5db199SXin Li        host = models.Host.smart_get(kwargs['id'])
1056*9c5db199SXin Li        if host and host.shard:
1057*9c5db199SXin Li            shard_hostname = host.shard.hostname
1058*9c5db199SXin Li        ret = func(**kwargs)
1059*9c5db199SXin Li        if shard_hostname and not server_utils.is_shard():
1060*9c5db199SXin Li            run_rpc_on_multiple_hostnames(func.__name__, [shard_hostname],
1061*9c5db199SXin Li                                          **kwargs)
1062*9c5db199SXin Li        return ret
1063*9c5db199SXin Li
1064*9c5db199SXin Li    return replacement
1065*9c5db199SXin Li
1066*9c5db199SXin Li
1067*9c5db199SXin Lidef fanout_rpc(host_objs, rpc_name, include_hostnames=True, **kwargs):
1068*9c5db199SXin Li    """Fanout the given rpc to shards of given hosts.
1069*9c5db199SXin Li
1070*9c5db199SXin Li    @param host_objs: Host objects for the rpc.
1071*9c5db199SXin Li    @param rpc_name: The name of the rpc.
1072*9c5db199SXin Li    @param include_hostnames: If True, include the hostnames in the kwargs.
1073*9c5db199SXin Li        Hostnames are not always necessary, this functions is designed to
1074*9c5db199SXin Li        send rpcs to the shard a host is on, the rpcs themselves could be
1075*9c5db199SXin Li        related to labels, acls etc.
1076*9c5db199SXin Li    @param kwargs: The kwargs for the rpc.
1077*9c5db199SXin Li    """
1078*9c5db199SXin Li    # Figure out which hosts are on which shards.
1079*9c5db199SXin Li    shard_host_map = bucket_hosts_by_shard(host_objs)
1080*9c5db199SXin Li
1081*9c5db199SXin Li    # Execute the rpc against the appropriate shards.
1082*9c5db199SXin Li    for shard, hostnames in six.iteritems(shard_host_map):
1083*9c5db199SXin Li        if include_hostnames:
1084*9c5db199SXin Li            kwargs['hosts'] = hostnames
1085*9c5db199SXin Li        try:
1086*9c5db199SXin Li            run_rpc_on_multiple_hostnames(rpc_name, [shard], **kwargs)
1087*9c5db199SXin Li        except Exception as e:
1088*9c5db199SXin Li            raise error.RPCException('RPC %s failed on shard %s due to %s' %
1089*9c5db199SXin Li                                     (rpc_name, shard, e))
1090*9c5db199SXin Li
1091*9c5db199SXin Li
1092*9c5db199SXin Lidef run_rpc_on_multiple_hostnames(rpc_call, shard_hostnames, **kwargs):
1093*9c5db199SXin Li    """Runs an rpc to multiple AFEs
1094*9c5db199SXin Li
1095*9c5db199SXin Li    This is i.e. used to propagate changes made to hosts after they are assigned
1096*9c5db199SXin Li    to a shard.
1097*9c5db199SXin Li
1098*9c5db199SXin Li    @param rpc_call: Name of the rpc endpoint to call.
1099*9c5db199SXin Li    @param shard_hostnames: List of hostnames to run the rpcs on.
1100*9c5db199SXin Li    @param **kwargs: Keyword arguments to pass in the rpcs.
1101*9c5db199SXin Li    """
1102*9c5db199SXin Li    # Make sure this function is not called on shards but only on main.
1103*9c5db199SXin Li    assert not server_utils.is_shard()
1104*9c5db199SXin Li    for shard_hostname in shard_hostnames:
1105*9c5db199SXin Li        afe = frontend_wrappers.RetryingAFE(server=shard_hostname,
1106*9c5db199SXin Li                                            user=thread_local.get_user())
1107*9c5db199SXin Li        afe.run(rpc_call, **kwargs)
1108*9c5db199SXin Li
1109*9c5db199SXin Li
1110*9c5db199SXin Lidef get_label(name):
1111*9c5db199SXin Li    """Gets a label object using a given name.
1112*9c5db199SXin Li
1113*9c5db199SXin Li    @param name: Label name.
1114*9c5db199SXin Li    @raises model.Label.DoesNotExist: when there is no label matching
1115*9c5db199SXin Li                                      the given name.
1116*9c5db199SXin Li    @return: a label object matching the given name.
1117*9c5db199SXin Li    """
1118*9c5db199SXin Li    try:
1119*9c5db199SXin Li        label = models.Label.smart_get(name)
1120*9c5db199SXin Li    except models.Label.DoesNotExist:
1121*9c5db199SXin Li        return None
1122*9c5db199SXin Li    return label
1123*9c5db199SXin Li
1124*9c5db199SXin Li
1125*9c5db199SXin Li# TODO: hide the following rpcs under is_moblab
1126*9c5db199SXin Lidef moblab_only(func):
1127*9c5db199SXin Li    """Ensure moblab specific functions only run on Moblab devices."""
1128*9c5db199SXin Li    def verify(*args, **kwargs):
1129*9c5db199SXin Li        if not server_utils.is_moblab():
1130*9c5db199SXin Li            raise error.RPCException('RPC: %s can only run on Moblab Systems!',
1131*9c5db199SXin Li                                     func.__name__)
1132*9c5db199SXin Li        return func(*args, **kwargs)
1133*9c5db199SXin Li    return verify
1134*9c5db199SXin Li
1135*9c5db199SXin Li
1136*9c5db199SXin Lidef route_rpc_to_main(func):
1137*9c5db199SXin Li    """Route RPC to main AFE.
1138*9c5db199SXin Li
1139*9c5db199SXin Li    When a shard receives an RPC decorated by this, the RPC is just
1140*9c5db199SXin Li    forwarded to the main.
1141*9c5db199SXin Li    When the main gets the RPC, the RPC function is executed.
1142*9c5db199SXin Li
1143*9c5db199SXin Li    @param func: An RPC function to decorate
1144*9c5db199SXin Li
1145*9c5db199SXin Li    @returns: A function replacing the RPC func.
1146*9c5db199SXin Li    """
1147*9c5db199SXin Li    argspec = inspect.getargspec(func)
1148*9c5db199SXin Li    if argspec.varargs is not None:
1149*9c5db199SXin Li        raise Exception('RPC function must not have *args.')
1150*9c5db199SXin Li
1151*9c5db199SXin Li    @wraps(func)
1152*9c5db199SXin Li    def replacement(*args, **kwargs):
1153*9c5db199SXin Li        """We need special handling when decorating an RPC that can be called
1154*9c5db199SXin Li        directly using positional arguments.
1155*9c5db199SXin Li
1156*9c5db199SXin Li        One example is rpc_interface.create_job().
1157*9c5db199SXin Li        rpc_interface.create_job_page_handler() calls the function using both
1158*9c5db199SXin Li        positional and keyword arguments.  Since frontend.RpcClient.run()
1159*9c5db199SXin Li        takes only keyword arguments for an RPC, positional arguments of the
1160*9c5db199SXin Li        RPC function need to be transformed into keyword arguments.
1161*9c5db199SXin Li        """
1162*9c5db199SXin Li        kwargs = _convert_to_kwargs_only(func, args, kwargs)
1163*9c5db199SXin Li        if server_utils.is_shard():
1164*9c5db199SXin Li            afe = frontend_wrappers.RetryingAFE(
1165*9c5db199SXin Li                    server=server_utils.get_global_afe_hostname(),
1166*9c5db199SXin Li                    user=thread_local.get_user())
1167*9c5db199SXin Li            return afe.run(func.__name__, **kwargs)
1168*9c5db199SXin Li        return func(**kwargs)
1169*9c5db199SXin Li
1170*9c5db199SXin Li    return replacement
1171*9c5db199SXin Li
1172*9c5db199SXin Li
1173*9c5db199SXin Lidef _convert_to_kwargs_only(func, args, kwargs):
1174*9c5db199SXin Li    """Convert a function call's arguments to a kwargs dict.
1175*9c5db199SXin Li
1176*9c5db199SXin Li    This is best illustrated with an example.  Given:
1177*9c5db199SXin Li
1178*9c5db199SXin Li    def foo(a, b, **kwargs):
1179*9c5db199SXin Li        pass
1180*9c5db199SXin Li    _to_kwargs(foo, (1, 2), {'c': 3})  # corresponding to foo(1, 2, c=3)
1181*9c5db199SXin Li
1182*9c5db199SXin Li        foo(**kwargs)
1183*9c5db199SXin Li
1184*9c5db199SXin Li    @param func: function whose signature to use
1185*9c5db199SXin Li    @param args: positional arguments of call
1186*9c5db199SXin Li    @param kwargs: keyword arguments of call
1187*9c5db199SXin Li
1188*9c5db199SXin Li    @returns: kwargs dict
1189*9c5db199SXin Li    """
1190*9c5db199SXin Li    argspec = inspect.getargspec(func)
1191*9c5db199SXin Li    # callargs looks like {'a': 1, 'b': 2, 'kwargs': {'c': 3}}
1192*9c5db199SXin Li    callargs = inspect.getcallargs(func, *args, **kwargs)
1193*9c5db199SXin Li    if argspec.keywords is None:
1194*9c5db199SXin Li        kwargs = {}
1195*9c5db199SXin Li    else:
1196*9c5db199SXin Li        kwargs = callargs.pop(argspec.keywords)
1197*9c5db199SXin Li    kwargs.update(callargs)
1198*9c5db199SXin Li    return kwargs
1199*9c5db199SXin Li
1200*9c5db199SXin Li
1201*9c5db199SXin Lidef get_sample_dut(board, pool):
1202*9c5db199SXin Li    """Get a dut with the given board and pool.
1203*9c5db199SXin Li
1204*9c5db199SXin Li    This method is used to help to locate a dut with the given board and pool.
1205*9c5db199SXin Li    The dut then can be used to identify a devserver in the same subnet.
1206*9c5db199SXin Li
1207*9c5db199SXin Li    @param board: Name of the board.
1208*9c5db199SXin Li    @param pool: Name of the pool.
1209*9c5db199SXin Li
1210*9c5db199SXin Li    @return: Name of a dut with the given board and pool.
1211*9c5db199SXin Li    """
1212*9c5db199SXin Li    if not (dev_server.PREFER_LOCAL_DEVSERVER and pool and board):
1213*9c5db199SXin Li        return None
1214*9c5db199SXin Li
1215*9c5db199SXin Li    hosts = list(get_host_query(
1216*9c5db199SXin Li        multiple_labels=('pool:%s' % pool, 'board:%s' % board),
1217*9c5db199SXin Li        exclude_only_if_needed_labels=False,
1218*9c5db199SXin Li        valid_only=True,
1219*9c5db199SXin Li        filter_data={},
1220*9c5db199SXin Li    ))
1221*9c5db199SXin Li    if not hosts:
1222*9c5db199SXin Li        return None
1223*9c5db199SXin Li    else:
1224*9c5db199SXin Li        return hosts[0].hostname
1225