xref: /aosp_15_r20/external/toolchain-utils/bestflags/pipeline_process.py (revision 760c253c1ed00ce9abd48f8546f08516e57485fe)
1*760c253cSXin Li# Copyright 2013 The ChromiumOS Authors
2*760c253cSXin Li# Use of this source code is governed by a BSD-style license that can be
3*760c253cSXin Li# found in the LICENSE file.
4*760c253cSXin Li"""Pipeline process that encapsulates the actual content.
5*760c253cSXin Li
6*760c253cSXin LiPart of the Chrome build flags optimization.
7*760c253cSXin Li
8*760c253cSXin LiThe actual stages include the builder and the executor.
9*760c253cSXin Li"""
10*760c253cSXin Li
11*760c253cSXin Li__author__ = "[email protected] (Yuheng Long)"
12*760c253cSXin Li
13*760c253cSXin Liimport multiprocessing
14*760c253cSXin Li
15*760c253cSXin Li
16*760c253cSXin Li# Pick an integer at random.
17*760c253cSXin LiPOISONPILL = 975
18*760c253cSXin Li
19*760c253cSXin Li
20*760c253cSXin Liclass PipelineProcess(multiprocessing.Process):
21*760c253cSXin Li    """A process that encapsulates the actual content pipeline stage.
22*760c253cSXin Li
23*760c253cSXin Li    The actual pipeline stage can be the builder or the tester.  This process
24*760c253cSXin Li    continuously pull tasks from the queue until a poison pill is received.
25*760c253cSXin Li    Once a job is received, it will hand it to the actual stage for processing.
26*760c253cSXin Li
27*760c253cSXin Li    Each pipeline stage contains three modules.
28*760c253cSXin Li    The first module continuously pulls task from the input queue. It searches the
29*760c253cSXin Li    cache to check whether the task has encountered before. If so, duplicate
30*760c253cSXin Li    computation can be avoided.
31*760c253cSXin Li    The second module consists of a pool of workers that do the actual work, e.g.,
32*760c253cSXin Li    the worker will compile the source code and get the image in the builder
33*760c253cSXin Li    pipeline stage.
34*760c253cSXin Li    The third module is a helper that put the result cost to the cost field of the
35*760c253cSXin Li    duplicate tasks. For example, if two tasks are equivalent, only one task, say
36*760c253cSXin Li    t1 will be executed and the other task, say t2 will not be executed. The third
37*760c253cSXin Li    mode gets the result from t1, when it is available and set the cost of t2 to
38*760c253cSXin Li    be the same as that of t1.
39*760c253cSXin Li    """
40*760c253cSXin Li
41*760c253cSXin Li    def __init__(
42*760c253cSXin Li        self,
43*760c253cSXin Li        num_processes,
44*760c253cSXin Li        name,
45*760c253cSXin Li        cache,
46*760c253cSXin Li        stage,
47*760c253cSXin Li        task_queue,
48*760c253cSXin Li        helper,
49*760c253cSXin Li        worker,
50*760c253cSXin Li        result_queue,
51*760c253cSXin Li    ):
52*760c253cSXin Li        """Set up input/output queue and the actual method to be called.
53*760c253cSXin Li
54*760c253cSXin Li        Args:
55*760c253cSXin Li          num_processes: Number of helpers subprocessors this stage has.
56*760c253cSXin Li          name: The name of this stage.
57*760c253cSXin Li          cache: The computed tasks encountered before.
58*760c253cSXin Li          stage: An int value that specifies the stage for this pipeline stage, for
59*760c253cSXin Li            example, build stage or test stage. This value will be used to retrieve
60*760c253cSXin Li            the keys in different stage. I.e., the flags set is the key in build
61*760c253cSXin Li            stage and the checksum is the key in the test stage. The key is used to
62*760c253cSXin Li            detect duplicates.
63*760c253cSXin Li          task_queue: The input task queue for this pipeline stage.
64*760c253cSXin Li          helper: The method hosted by the helper module to fill up the cost of the
65*760c253cSXin Li            duplicate tasks.
66*760c253cSXin Li          worker: The method hosted by the worker pools to do the actual work, e.g.,
67*760c253cSXin Li            compile the image.
68*760c253cSXin Li          result_queue: The output task queue for this pipeline stage.
69*760c253cSXin Li        """
70*760c253cSXin Li
71*760c253cSXin Li        multiprocessing.Process.__init__(self)
72*760c253cSXin Li
73*760c253cSXin Li        self._name = name
74*760c253cSXin Li        self._task_queue = task_queue
75*760c253cSXin Li        self._result_queue = result_queue
76*760c253cSXin Li
77*760c253cSXin Li        self._helper = helper
78*760c253cSXin Li        self._worker = worker
79*760c253cSXin Li
80*760c253cSXin Li        self._cache = cache
81*760c253cSXin Li        self._stage = stage
82*760c253cSXin Li        self._num_processes = num_processes
83*760c253cSXin Li
84*760c253cSXin Li        # the queues used by the modules for communication
85*760c253cSXin Li        manager = multiprocessing.Manager()
86*760c253cSXin Li        self._helper_queue = manager.Queue()
87*760c253cSXin Li        self._work_queue = manager.Queue()
88*760c253cSXin Li
89*760c253cSXin Li    def run(self):
90*760c253cSXin Li        """Busy pulling the next task from the queue for execution.
91*760c253cSXin Li
92*760c253cSXin Li        Once a job is pulled, this stage invokes the actual stage method and submits
93*760c253cSXin Li        the result to the next pipeline stage.
94*760c253cSXin Li
95*760c253cSXin Li        The process will terminate on receiving the poison pill from previous stage.
96*760c253cSXin Li        """
97*760c253cSXin Li
98*760c253cSXin Li        # the worker pool
99*760c253cSXin Li        work_pool = multiprocessing.Pool(self._num_processes)
100*760c253cSXin Li
101*760c253cSXin Li        # the helper process
102*760c253cSXin Li        helper_process = multiprocessing.Process(
103*760c253cSXin Li            target=self._helper,
104*760c253cSXin Li            args=(
105*760c253cSXin Li                self._stage,
106*760c253cSXin Li                self._cache,
107*760c253cSXin Li                self._helper_queue,
108*760c253cSXin Li                self._work_queue,
109*760c253cSXin Li                self._result_queue,
110*760c253cSXin Li            ),
111*760c253cSXin Li        )
112*760c253cSXin Li        helper_process.start()
113*760c253cSXin Li        mycache = self._cache.keys()
114*760c253cSXin Li
115*760c253cSXin Li        while True:
116*760c253cSXin Li            task = self._task_queue.get()
117*760c253cSXin Li            if task == POISONPILL:
118*760c253cSXin Li                # Poison pill means shutdown
119*760c253cSXin Li                self._result_queue.put(POISONPILL)
120*760c253cSXin Li                break
121*760c253cSXin Li
122*760c253cSXin Li            task_key = task.GetIdentifier(self._stage)
123*760c253cSXin Li            if task_key in mycache:
124*760c253cSXin Li                # The task has been encountered before. It will be sent to the helper
125*760c253cSXin Li                # module for further processing.
126*760c253cSXin Li                self._helper_queue.put(task)
127*760c253cSXin Li            else:
128*760c253cSXin Li                # Let the workers do the actual work.
129*760c253cSXin Li                work_pool.apply_async(
130*760c253cSXin Li                    self._worker,
131*760c253cSXin Li                    args=(
132*760c253cSXin Li                        self._stage,
133*760c253cSXin Li                        task,
134*760c253cSXin Li                        self._work_queue,
135*760c253cSXin Li                        self._result_queue,
136*760c253cSXin Li                    ),
137*760c253cSXin Li                )
138*760c253cSXin Li                mycache.append(task_key)
139*760c253cSXin Li
140*760c253cSXin Li        # Shutdown the workers pool and the helper process.
141*760c253cSXin Li        work_pool.close()
142*760c253cSXin Li        work_pool.join()
143*760c253cSXin Li
144*760c253cSXin Li        self._helper_queue.put(POISONPILL)
145*760c253cSXin Li        helper_process.join()
146