xref: /aosp_15_r20/external/pigweed/pw_target_runner/go/worker_pool.go (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1// Copyright 2019 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15package pw_target_runner
16
17import (
18	"errors"
19	"fmt"
20	"log"
21	"os"
22	"sync"
23	"sync/atomic"
24	"time"
25
26	pb "pigweed/proto/pw_target_runner/target_runner_pb"
27)
28
29// RunRequest represents a client request to run a single executable on-device.
30type RunRequest struct {
31	// Filesystem path to the executable.
32	Path string
33
34	// Channel to which the response is sent back.
35	ResponseChannel chan<- *RunResponse
36
37	// Time when the request was queued. Internal to the worker pool.
38	queueStart time.Time
39}
40
41// RunResponse is the response sent after a run request is processed.
42type RunResponse struct {
43	// Length of time that the run request was queued before being handled
44	// by a worker. Set by the worker pool.
45	QueueTime time.Duration
46
47	// Length of time the runner command took to run the executable.
48	// Set by the worker pool.
49	RunTime time.Duration
50
51	// Raw output of the execution.
52	Output []byte
53
54	// Result of the run.
55	Status pb.RunStatus
56
57	// Error that occurred during the run, if any. If this is not nil, none
58	// of the other fields in this struct are guaranteed to be valid.
59	Err error
60}
61
62// DeviceRunner represents a worker which handles run requests.
63type DeviceRunner interface {
64	// WorkerStart is the lifecycle hook called when the worker routine is
65	// started. Any resources required by the worker should be initialized
66	// here.
67	WorkerStart() error
68
69	// HandleRunRequest is the method called when an executable is scheduled
70	// to run on the worker by the worker pool. It processes the request,
71	// runs the executable, and returns an appropriate response.
72	HandleRunRequest(*RunRequest) *RunResponse
73
74	// WorkerExit is the lifecycle hook called before the worker exits.
75	// Should be used to clean up any resources used by the worker.
76	WorkerExit()
77}
78
79// WorkerPool represents a collection of device runners which run on-device
80// binaries. The worker pool distributes requests to run binaries among its
81// available workers.
82type WorkerPool struct {
83	activeWorkers uint32
84	logger        *log.Logger
85	workers       []DeviceRunner
86	waitGroup     sync.WaitGroup
87	reqChannel    chan *RunRequest
88	quitChannel   chan bool
89}
90
91var (
92	errWorkerPoolActive    = errors.New("Worker pool is running")
93	errNoRegisteredWorkers = errors.New("No workers registered in pool")
94)
95
96// newWorkerPool creates an empty worker pool.
97func newWorkerPool(name string) *WorkerPool {
98	logPrefix := fmt.Sprintf("[%s] ", name)
99	return &WorkerPool{
100		logger:      log.New(os.Stdout, logPrefix, log.LstdFlags),
101		workers:     make([]DeviceRunner, 0),
102		reqChannel:  make(chan *RunRequest, 1024),
103		quitChannel: make(chan bool, 64),
104	}
105}
106
107// RegisterWorker adds a new worker to the pool. This cannot be done when the
108// pool is processing requests; Stop() must be called first.
109func (p *WorkerPool) RegisterWorker(worker DeviceRunner) error {
110	if p.Active() {
111		return errWorkerPoolActive
112	}
113	p.workers = append(p.workers, worker)
114	return nil
115}
116
117// Start launches all registered workers in the pool.
118func (p *WorkerPool) Start() error {
119	if p.Active() {
120		return errWorkerPoolActive
121	}
122
123	p.logger.Printf("Starting %d workers\n", len(p.workers))
124	for _, worker := range p.workers {
125		p.waitGroup.Add(1)
126		atomic.AddUint32(&p.activeWorkers, 1)
127		go p.runWorker(worker)
128	}
129
130	return nil
131}
132
133// Stop terminates all running workers in the pool. The work queue is not
134// cleared; queued requests persist and can be processed by calling Start()
135// again.
136func (p *WorkerPool) Stop() {
137	if !p.Active() {
138		return
139	}
140
141	// Send N quit commands to the workers and wait for them to exit.
142	for i := uint32(0); i < p.activeWorkers; i++ {
143		p.quitChannel <- true
144	}
145	p.waitGroup.Wait()
146
147	p.logger.Println("All workers in pool stopped")
148}
149
150// Active returns true if any worker routines are currently running.
151func (p *WorkerPool) Active() bool {
152	return p.activeWorkers > 0
153}
154
155// QueueExecutable adds an executable to the worker pool's queue. If no workers
156// are registered in the pool, this operation fails and an immediate response is
157// sent back to the requester indicating the error.
158func (p *WorkerPool) QueueExecutable(req *RunRequest) {
159	if len(p.workers) == 0 {
160		p.logger.Printf("Attempt to queue executable %s with no active workers", req.Path)
161		req.ResponseChannel <- &RunResponse{
162			Err: errNoRegisteredWorkers,
163		}
164		return
165	}
166
167	p.logger.Printf("Queueing executable %s\n", req.Path)
168
169	// Start tracking how long the request is queued.
170	req.queueStart = time.Now()
171	p.reqChannel <- req
172}
173
174// runWorker is a function run by the worker pool in a separate goroutine for
175// each of its registered workers. The function is responsible for calling the
176// appropriate worker lifecycle hooks and processing requests as they come in
177// through the worker pool's queue.
178func (p *WorkerPool) runWorker(worker DeviceRunner) {
179	defer func() {
180		atomic.AddUint32(&p.activeWorkers, ^uint32(0))
181		p.waitGroup.Done()
182	}()
183
184	if err := worker.WorkerStart(); err != nil {
185		return
186	}
187
188processLoop:
189	for {
190		// Force the quit channel to be processed before the request
191		// channel by using a select statement with an empty default
192		// case to make the read non-blocking. If the quit channel is
193		// empty, the code will fall through to the main select below.
194		select {
195		case q, ok := <-p.quitChannel:
196			if q || !ok {
197				break processLoop
198			}
199		default:
200		}
201
202		select {
203		case q, ok := <-p.quitChannel:
204			if q || !ok {
205				break processLoop
206			}
207		case req, ok := <-p.reqChannel:
208			if !ok {
209				continue
210			}
211
212			queueTime := time.Since(req.queueStart)
213
214			runStart := time.Now()
215			res := worker.HandleRunRequest(req)
216			res.RunTime = time.Since(runStart)
217
218			res.QueueTime = queueTime
219			req.ResponseChannel <- res
220		}
221	}
222
223	worker.WorkerExit()
224}
225