1 // Copyright 2021 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 #pragma once 16 17 #include <array> 18 #include <cstdint> 19 20 #include "pw_containers/inline_queue.h" 21 #include "pw_function/function.h" 22 #include "pw_metric/metric.h" 23 #include "pw_span/span.h" 24 #include "pw_status/status.h" 25 #include "pw_sync/interrupt_spin_lock.h" 26 #include "pw_sync/lock_annotations.h" 27 #include "pw_sync/thread_notification.h" 28 #include "pw_thread/thread_core.h" 29 30 namespace pw::work_queue { 31 32 using WorkItem = Function<void()>; 33 34 /// Enables threads and interrupts to enqueue work as a 35 /// `pw::work_queue::WorkItem` for execution by the work queue. 36 /// 37 /// **Queue sizing**: The number of outstanding work requests is limited 38 /// based on the internal queue size. The queue size is set through either 39 /// the size of the `queue_storage` buffer passed into the constructor or by 40 /// using the templated `pw::work_queue::WorkQueueWithBuffer` helper. When the 41 /// queue is full, the queue will not accept further work. 42 /// 43 /// **Cooperative thread cancellation**: The class is a 44 /// `pw::thread::ThreadCore`, meaning it should be executed as a single thread. 45 /// To facilitate clean shutdown, it provides a `RequestStop()` method for 46 /// cooperative cancellation which should be invoked before joining the thread. 47 /// Once a stop has been requested the queue will no longer accept further work. 48 /// 49 /// The entire API is thread-safe and interrupt-safe. 50 class WorkQueue : public thread::ThreadCore { 51 public: 52 /// @param[in] queue The work entries to enqueue. 53 /// 54 /// @param[in] queue_capacity The internal queue size which limits the number 55 /// of outstanding work requests. 56 /// 57 /// @note The `ThreadNotification` prevents this from being `constexpr`. WorkQueue(InlineQueue<WorkItem> & queue,size_t queue_capacity)58 WorkQueue(InlineQueue<WorkItem>& queue, size_t queue_capacity) 59 : stop_requested_(false), queue_(queue) { 60 min_queue_remaining_.Set(static_cast<uint32_t>(queue_capacity)); 61 } 62 63 /// Enqueues a `work_item` for execution by the work queue thread. 64 /// 65 /// @param[in] work_item The entry to enqueue. 66 /// 67 /// @returns @rst 68 /// 69 /// .. pw-status-codes:: 70 /// 71 /// OK: Success. Entry was enqueued for execution. 72 /// 73 /// FAILED_PRECONDITION: The work queue is shutting down. Entries are no 74 /// longer permitted. 75 /// 76 /// RESOURCE_EXHAUSTED: Internal work queue is full. Entry was not 77 /// enqueued. 78 /// 79 /// @endrst PushWork(WorkItem && work_item)80 Status PushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_) { 81 return InternalPushWork(std::move(work_item)); 82 } 83 84 /// Queues work for execution. Crashes if the work cannot be queued due to a 85 /// full queue or a stopped worker thread. 86 /// 87 /// This call is recommended where possible since it saves error handling code 88 /// at the callsite; and in many practical cases, it is a bug if the work 89 /// queue is full (and so a crash is useful to detect the problem). 90 /// 91 /// @param[in] work_item The entry to enqueue. 92 /// 93 /// @pre 94 /// * The queue must not overflow, i.e. be full. 95 /// * The queue must not have been requested to stop, i.e. it must 96 /// not be in the process of shutting down. 97 void CheckPushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_); 98 99 /// Locks the queue to prevent further work enqueing, finishes outstanding 100 /// work, then shuts down the worker thread. 101 /// 102 /// The `WorkQueue` cannot be resumed after stopping because the `ThreadCore` 103 /// thread returns and may be joined. The `WorkQueue` must be reconstructed 104 /// for re-use after the thread has been joined. 105 void RequestStop() PW_LOCKS_EXCLUDED(lock_); 106 107 private: 108 void Run() override PW_LOCKS_EXCLUDED(lock_); 109 Status InternalPushWork(WorkItem&& work_item) PW_LOCKS_EXCLUDED(lock_); 110 111 sync::InterruptSpinLock lock_; 112 bool stop_requested_ PW_GUARDED_BY(lock_); 113 InlineQueue<WorkItem>& queue_ PW_GUARDED_BY(lock_); 114 sync::ThreadNotification work_notification_; 115 116 // TODO(ewout): The group and/or its name token should be passed as a ctor 117 // arg instead. Depending on the approach here the group should be exposed 118 // While doing this evaluate whether perhaps we should instead construct 119 // TypedMetric<uint32_t>s directly, avoiding the macro usage given the 120 // min_queue_remaining_ initial value requires dependency injection. 121 // And lastly when the restructure is finalized add unit tests to ensure these 122 // metrics work as intended. 123 PW_METRIC_GROUP(metrics_, "pw::work_queue::WorkQueue"); 124 PW_METRIC(metrics_, max_queue_used_, "max_queue_used", 0u); 125 PW_METRIC(metrics_, min_queue_remaining_, "min_queue_remaining", 0u); 126 }; 127 128 template <size_t kWorkQueueEntries> 129 class WorkQueueWithBuffer : public WorkQueue { 130 public: WorkQueueWithBuffer()131 constexpr WorkQueueWithBuffer() : WorkQueue(queue_, kWorkQueueEntries) {} 132 133 private: 134 InlineQueue<WorkItem, kWorkQueueEntries> queue_; 135 }; 136 137 } // namespace pw::work_queue 138