xref: /aosp_15_r20/external/toolchain-utils/bestflags/pipeline_worker.py (revision 760c253c1ed00ce9abd48f8546f08516e57485fe)
1# Copyright 2013 The ChromiumOS Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""The pipeline_worker functions of the build and test stage of the framework.
5
6Part of the Chrome build flags optimization.
7
8This module defines the helper and the worker. If there are duplicate tasks, for
9example t1 and t2, needs to be built/tested, one of them, for example t1, will
10be built/tested and the helper waits for the result of t1 and set the results of
11the other task, t2 here, to be the same as that of t1. Setting the result of t2
12to be the same as t1 is referred to as resolving the result of t2.
13The worker invokes the work method of the tasks that are not duplicate.
14"""
15
16__author__ = "[email protected] (Yuheng Long)"
17
18import pipeline_process
19
20
21def Helper(stage, done_dict, helper_queue, completed_queue, result_queue):
22    """Helper that filters duplicate tasks.
23
24    This method Continuously pulls duplicate tasks from the helper_queue. The
25    duplicate tasks need not be compiled/tested. This method also pulls completed
26    tasks from the worker queue and let the results of the duplicate tasks be the
27    same as their corresponding finished task.
28
29    Args:
30      stage: The current stage of the pipeline, for example, build stage or test
31        stage.
32      done_dict: A dictionary of tasks that are done. The key of the dictionary is
33        the identifier of the task. The value of the dictionary is the results of
34        performing the corresponding task.
35      helper_queue: A queue of duplicate tasks whose results need to be resolved.
36        This is a communication channel between the pipeline_process and this
37        helper process.
38      completed_queue: A queue of tasks that have been built/tested. The results
39        of these tasks are needed to resolve the results of the duplicate tasks.
40        This is the communication channel between the workers and this helper
41        process.
42      result_queue: After the results of the duplicate tasks have been resolved,
43        the duplicate tasks will be sent to the next stage via this queue.
44    """
45
46    # The list of duplicate tasks, the results of which need to be resolved.
47    waiting_list = []
48
49    while True:
50        # Pull duplicate task from the helper queue.
51        if not helper_queue.empty():
52            task = helper_queue.get()
53
54            if task == pipeline_process.POISONPILL:
55                # Poison pill means no more duplicate task from the helper queue.
56                break
57
58            # The task has not been performed before.
59            assert not task.Done(stage)
60
61            # The identifier of this task.
62            identifier = task.GetIdentifier(stage)
63
64            # If a duplicate task comes before the corresponding resolved results from
65            # the completed_queue, it will be put in the waiting list. If the result
66            # arrives before the duplicate task, the duplicate task will be resolved
67            # right away.
68            if identifier in done_dict:
69                # This task has been encountered before and the result is available. The
70                # result can be resolved right away.
71                task.SetResult(stage, done_dict[identifier])
72                result_queue.put(task)
73            else:
74                waiting_list.append(task)
75
76        # Check and get completed tasks from completed_queue.
77        GetResultFromCompletedQueue(
78            stage, completed_queue, done_dict, waiting_list, result_queue
79        )
80
81    # Wait to resolve the results of the remaining duplicate tasks.
82    while waiting_list:
83        GetResultFromCompletedQueue(
84            stage, completed_queue, done_dict, waiting_list, result_queue
85        )
86
87
88def GetResultFromCompletedQueue(
89    stage, completed_queue, done_dict, waiting_list, result_queue
90):
91    """Pull results from the completed queue and resolves duplicate tasks.
92
93    Args:
94      stage: The current stage of the pipeline, for example, build stage or test
95        stage.
96      completed_queue: A queue of tasks that have been performed. The results of
97        these tasks are needed to resolve the results of the duplicate tasks. This
98        is the communication channel between the workers and this method.
99      done_dict: A dictionary of tasks that are done. The key of the dictionary is
100        the optimization flags of the task. The value of the dictionary is the
101        compilation results of the corresponding task.
102      waiting_list: The list of duplicate tasks, the results of which need to be
103        resolved.
104      result_queue: After the results of the duplicate tasks have been resolved,
105        the duplicate tasks will be sent to the next stage via this queue.
106
107    This helper method tries to pull a completed task from the completed queue.
108    If it gets a task from the queue, it resolves the results of all the relevant
109    duplicate tasks in the waiting list. Relevant tasks are the tasks that have
110    the same flags as the currently received results from the completed_queue.
111    """
112    # Pull completed task from the worker queue.
113    if not completed_queue.empty():
114        (identifier, result) = completed_queue.get()
115        done_dict[identifier] = result
116
117        tasks = [
118            t for t in waiting_list if t.GetIdentifier(stage) == identifier
119        ]
120        for duplicate_task in tasks:
121            duplicate_task.SetResult(stage, result)
122            result_queue.put(duplicate_task)
123            waiting_list.remove(duplicate_task)
124
125
126def Worker(stage, task, helper_queue, result_queue):
127    """Worker that performs the task.
128
129    This method calls the work method of the input task and distribute the result
130    to the helper and the next stage.
131
132    Args:
133      stage: The current stage of the pipeline, for example, build stage or test
134        stage.
135      task: Input task that needs to be performed.
136      helper_queue: Queue that holds the completed tasks and the results. This is
137        the communication channel between the worker and the helper.
138      result_queue: Queue that holds the completed tasks and the results. This is
139        the communication channel between the worker and the next stage.
140    """
141
142    # The task has not been completed before.
143    assert not task.Done(stage)
144
145    task.Work(stage)
146    helper_queue.put((task.GetIdentifier(stage), task.GetResult(stage)))
147    result_queue.put(task)
148