xref: /aosp_15_r20/external/toolchain-utils/bestflags/pipeline_worker.py (revision 760c253c1ed00ce9abd48f8546f08516e57485fe)
1*760c253cSXin Li# Copyright 2013 The ChromiumOS Authors
2*760c253cSXin Li# Use of this source code is governed by a BSD-style license that can be
3*760c253cSXin Li# found in the LICENSE file.
4*760c253cSXin Li"""The pipeline_worker functions of the build and test stage of the framework.
5*760c253cSXin Li
6*760c253cSXin LiPart of the Chrome build flags optimization.
7*760c253cSXin Li
8*760c253cSXin LiThis module defines the helper and the worker. If there are duplicate tasks, for
9*760c253cSXin Liexample t1 and t2, needs to be built/tested, one of them, for example t1, will
10*760c253cSXin Libe built/tested and the helper waits for the result of t1 and set the results of
11*760c253cSXin Lithe other task, t2 here, to be the same as that of t1. Setting the result of t2
12*760c253cSXin Lito be the same as t1 is referred to as resolving the result of t2.
13*760c253cSXin LiThe worker invokes the work method of the tasks that are not duplicate.
14*760c253cSXin Li"""
15*760c253cSXin Li
16*760c253cSXin Li__author__ = "[email protected] (Yuheng Long)"
17*760c253cSXin Li
18*760c253cSXin Liimport pipeline_process
19*760c253cSXin Li
20*760c253cSXin Li
21*760c253cSXin Lidef Helper(stage, done_dict, helper_queue, completed_queue, result_queue):
22*760c253cSXin Li    """Helper that filters duplicate tasks.
23*760c253cSXin Li
24*760c253cSXin Li    This method Continuously pulls duplicate tasks from the helper_queue. The
25*760c253cSXin Li    duplicate tasks need not be compiled/tested. This method also pulls completed
26*760c253cSXin Li    tasks from the worker queue and let the results of the duplicate tasks be the
27*760c253cSXin Li    same as their corresponding finished task.
28*760c253cSXin Li
29*760c253cSXin Li    Args:
30*760c253cSXin Li      stage: The current stage of the pipeline, for example, build stage or test
31*760c253cSXin Li        stage.
32*760c253cSXin Li      done_dict: A dictionary of tasks that are done. The key of the dictionary is
33*760c253cSXin Li        the identifier of the task. The value of the dictionary is the results of
34*760c253cSXin Li        performing the corresponding task.
35*760c253cSXin Li      helper_queue: A queue of duplicate tasks whose results need to be resolved.
36*760c253cSXin Li        This is a communication channel between the pipeline_process and this
37*760c253cSXin Li        helper process.
38*760c253cSXin Li      completed_queue: A queue of tasks that have been built/tested. The results
39*760c253cSXin Li        of these tasks are needed to resolve the results of the duplicate tasks.
40*760c253cSXin Li        This is the communication channel between the workers and this helper
41*760c253cSXin Li        process.
42*760c253cSXin Li      result_queue: After the results of the duplicate tasks have been resolved,
43*760c253cSXin Li        the duplicate tasks will be sent to the next stage via this queue.
44*760c253cSXin Li    """
45*760c253cSXin Li
46*760c253cSXin Li    # The list of duplicate tasks, the results of which need to be resolved.
47*760c253cSXin Li    waiting_list = []
48*760c253cSXin Li
49*760c253cSXin Li    while True:
50*760c253cSXin Li        # Pull duplicate task from the helper queue.
51*760c253cSXin Li        if not helper_queue.empty():
52*760c253cSXin Li            task = helper_queue.get()
53*760c253cSXin Li
54*760c253cSXin Li            if task == pipeline_process.POISONPILL:
55*760c253cSXin Li                # Poison pill means no more duplicate task from the helper queue.
56*760c253cSXin Li                break
57*760c253cSXin Li
58*760c253cSXin Li            # The task has not been performed before.
59*760c253cSXin Li            assert not task.Done(stage)
60*760c253cSXin Li
61*760c253cSXin Li            # The identifier of this task.
62*760c253cSXin Li            identifier = task.GetIdentifier(stage)
63*760c253cSXin Li
64*760c253cSXin Li            # If a duplicate task comes before the corresponding resolved results from
65*760c253cSXin Li            # the completed_queue, it will be put in the waiting list. If the result
66*760c253cSXin Li            # arrives before the duplicate task, the duplicate task will be resolved
67*760c253cSXin Li            # right away.
68*760c253cSXin Li            if identifier in done_dict:
69*760c253cSXin Li                # This task has been encountered before and the result is available. The
70*760c253cSXin Li                # result can be resolved right away.
71*760c253cSXin Li                task.SetResult(stage, done_dict[identifier])
72*760c253cSXin Li                result_queue.put(task)
73*760c253cSXin Li            else:
74*760c253cSXin Li                waiting_list.append(task)
75*760c253cSXin Li
76*760c253cSXin Li        # Check and get completed tasks from completed_queue.
77*760c253cSXin Li        GetResultFromCompletedQueue(
78*760c253cSXin Li            stage, completed_queue, done_dict, waiting_list, result_queue
79*760c253cSXin Li        )
80*760c253cSXin Li
81*760c253cSXin Li    # Wait to resolve the results of the remaining duplicate tasks.
82*760c253cSXin Li    while waiting_list:
83*760c253cSXin Li        GetResultFromCompletedQueue(
84*760c253cSXin Li            stage, completed_queue, done_dict, waiting_list, result_queue
85*760c253cSXin Li        )
86*760c253cSXin Li
87*760c253cSXin Li
88*760c253cSXin Lidef GetResultFromCompletedQueue(
89*760c253cSXin Li    stage, completed_queue, done_dict, waiting_list, result_queue
90*760c253cSXin Li):
91*760c253cSXin Li    """Pull results from the completed queue and resolves duplicate tasks.
92*760c253cSXin Li
93*760c253cSXin Li    Args:
94*760c253cSXin Li      stage: The current stage of the pipeline, for example, build stage or test
95*760c253cSXin Li        stage.
96*760c253cSXin Li      completed_queue: A queue of tasks that have been performed. The results of
97*760c253cSXin Li        these tasks are needed to resolve the results of the duplicate tasks. This
98*760c253cSXin Li        is the communication channel between the workers and this method.
99*760c253cSXin Li      done_dict: A dictionary of tasks that are done. The key of the dictionary is
100*760c253cSXin Li        the optimization flags of the task. The value of the dictionary is the
101*760c253cSXin Li        compilation results of the corresponding task.
102*760c253cSXin Li      waiting_list: The list of duplicate tasks, the results of which need to be
103*760c253cSXin Li        resolved.
104*760c253cSXin Li      result_queue: After the results of the duplicate tasks have been resolved,
105*760c253cSXin Li        the duplicate tasks will be sent to the next stage via this queue.
106*760c253cSXin Li
107*760c253cSXin Li    This helper method tries to pull a completed task from the completed queue.
108*760c253cSXin Li    If it gets a task from the queue, it resolves the results of all the relevant
109*760c253cSXin Li    duplicate tasks in the waiting list. Relevant tasks are the tasks that have
110*760c253cSXin Li    the same flags as the currently received results from the completed_queue.
111*760c253cSXin Li    """
112*760c253cSXin Li    # Pull completed task from the worker queue.
113*760c253cSXin Li    if not completed_queue.empty():
114*760c253cSXin Li        (identifier, result) = completed_queue.get()
115*760c253cSXin Li        done_dict[identifier] = result
116*760c253cSXin Li
117*760c253cSXin Li        tasks = [
118*760c253cSXin Li            t for t in waiting_list if t.GetIdentifier(stage) == identifier
119*760c253cSXin Li        ]
120*760c253cSXin Li        for duplicate_task in tasks:
121*760c253cSXin Li            duplicate_task.SetResult(stage, result)
122*760c253cSXin Li            result_queue.put(duplicate_task)
123*760c253cSXin Li            waiting_list.remove(duplicate_task)
124*760c253cSXin Li
125*760c253cSXin Li
126*760c253cSXin Lidef Worker(stage, task, helper_queue, result_queue):
127*760c253cSXin Li    """Worker that performs the task.
128*760c253cSXin Li
129*760c253cSXin Li    This method calls the work method of the input task and distribute the result
130*760c253cSXin Li    to the helper and the next stage.
131*760c253cSXin Li
132*760c253cSXin Li    Args:
133*760c253cSXin Li      stage: The current stage of the pipeline, for example, build stage or test
134*760c253cSXin Li        stage.
135*760c253cSXin Li      task: Input task that needs to be performed.
136*760c253cSXin Li      helper_queue: Queue that holds the completed tasks and the results. This is
137*760c253cSXin Li        the communication channel between the worker and the helper.
138*760c253cSXin Li      result_queue: Queue that holds the completed tasks and the results. This is
139*760c253cSXin Li        the communication channel between the worker and the next stage.
140*760c253cSXin Li    """
141*760c253cSXin Li
142*760c253cSXin Li    # The task has not been completed before.
143*760c253cSXin Li    assert not task.Done(stage)
144*760c253cSXin Li
145*760c253cSXin Li    task.Work(stage)
146*760c253cSXin Li    helper_queue.put((task.GetIdentifier(stage), task.GetResult(stage)))
147*760c253cSXin Li    result_queue.put(task)
148