xref: /aosp_15_r20/external/pigweed/pw_work_queue/public/pw_work_queue/work_queue.h (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #pragma once
16 
17 #include <array>
18 #include <cstdint>
19 
20 #include "pw_containers/inline_queue.h"
21 #include "pw_function/function.h"
22 #include "pw_metric/metric.h"
23 #include "pw_span/span.h"
24 #include "pw_status/status.h"
25 #include "pw_sync/interrupt_spin_lock.h"
26 #include "pw_sync/lock_annotations.h"
27 #include "pw_sync/thread_notification.h"
28 #include "pw_thread/thread_core.h"
29 
30 namespace pw::work_queue {
31 
32 using WorkItem = Function<void()>;
33 
34 /// Enables threads and interrupts to enqueue work as a
35 /// `pw::work_queue::WorkItem` for execution by the work queue.
36 ///
37 /// **Queue sizing**: The number of outstanding work requests is limited
38 /// based on the internal queue size. The queue size is set through either
39 /// the size of the `queue_storage` buffer passed into the constructor or by
40 /// using the templated `pw::work_queue::WorkQueueWithBuffer` helper. When the
41 /// queue is full, the queue will not accept further work.
42 ///
43 /// **Cooperative thread cancellation**: The class is a
44 /// `pw::thread::ThreadCore`, meaning it should be executed as a single thread.
45 /// To facilitate clean shutdown, it provides a `RequestStop()` method for
46 /// cooperative cancellation which should be invoked before joining the thread.
47 /// Once a stop has been requested the queue will no longer accept further work.
48 ///
49 /// The entire API is thread-safe and interrupt-safe.
50 class WorkQueue : public thread::ThreadCore {
51  public:
52   /// @param[in] queue The work entries to enqueue.
53   ///
54   /// @param[in] queue_capacity The internal queue size which limits the number
55   /// of outstanding work requests.
56   ///
57   /// @note The `ThreadNotification` prevents this from being `constexpr`.
WorkQueue(InlineQueue<WorkItem> & queue,size_t queue_capacity)58   WorkQueue(InlineQueue<WorkItem>& queue, size_t queue_capacity)
59       : stop_requested_(false), queue_(queue) {
60     min_queue_remaining_.Set(static_cast<uint32_t>(queue_capacity));
61   }
62 
63   /// Enqueues a `work_item` for execution by the work queue thread.
64   ///
65   /// @param[in] work_item The entry to enqueue.
66   ///
67   /// @returns @rst
68   ///
69   /// .. pw-status-codes::
70   ///
71   ///    OK: Success. Entry was enqueued for execution.
72   ///
73   ///    FAILED_PRECONDITION: The work queue is shutting down. Entries are no
74   ///    longer permitted.
75   ///
76   ///    RESOURCE_EXHAUSTED: Internal work queue is full. Entry was not
77   ///    enqueued.
78   ///
79   /// @endrst
PushWork(WorkItem && work_item)80   Status PushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_) {
81     return InternalPushWork(std::move(work_item));
82   }
83 
84   /// Queues work for execution. Crashes if the work cannot be queued due to a
85   /// full queue or a stopped worker thread.
86   ///
87   /// This call is recommended where possible since it saves error handling code
88   /// at the callsite; and in many practical cases, it is a bug if the work
89   /// queue is full (and so a crash is useful to detect the problem).
90   ///
91   /// @param[in] work_item The entry to enqueue.
92   ///
93   /// @pre
94   /// * The queue must not overflow, i.e. be full.
95   /// * The queue must not have been requested to stop, i.e. it must
96   ///   not be in the process of shutting down.
97   void CheckPushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_);
98 
99   /// Locks the queue to prevent further work enqueing, finishes outstanding
100   /// work, then shuts down the worker thread.
101   ///
102   /// The `WorkQueue` cannot be resumed after stopping because the `ThreadCore`
103   /// thread returns and may be joined. The `WorkQueue` must be reconstructed
104   /// for re-use after the thread has been joined.
105   void RequestStop() PW_LOCKS_EXCLUDED(lock_);
106 
107  private:
108   void Run() override PW_LOCKS_EXCLUDED(lock_);
109   Status InternalPushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_);
110 
111   sync::InterruptSpinLock lock_;
112   bool stop_requested_ PW_GUARDED_BY(lock_);
113   InlineQueue<WorkItem>& queue_ PW_GUARDED_BY(lock_);
114   sync::ThreadNotification work_notification_;
115 
116   // TODO(ewout): The group and/or its name token should be passed as a ctor
117   // arg instead. Depending on the approach here the group should be exposed
118   // While doing this evaluate whether perhaps we should instead construct
119   // TypedMetric<uint32_t>s directly, avoiding the macro usage given the
120   // min_queue_remaining_ initial value requires dependency injection.
121   // And lastly when the restructure is finalized add unit tests to ensure these
122   // metrics work as intended.
123   PW_METRIC_GROUP(metrics_, "pw::work_queue::WorkQueue");
124   PW_METRIC(metrics_, max_queue_used_, "max_queue_used", 0u);
125   PW_METRIC(metrics_, min_queue_remaining_, "min_queue_remaining", 0u);
126 };
127 
128 template <size_t kWorkQueueEntries>
129 class WorkQueueWithBuffer : public WorkQueue {
130  public:
WorkQueueWithBuffer()131   constexpr WorkQueueWithBuffer() : WorkQueue(queue_, kWorkQueueEntries) {}
132 
133  private:
134   InlineQueue<WorkItem, kWorkQueueEntries> queue_;
135 };
136 
137 }  // namespace pw::work_queue
138