1*d9f75844SAndroid Build Coastguard Worker /*
2*d9f75844SAndroid Build Coastguard Worker * Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
3*d9f75844SAndroid Build Coastguard Worker *
4*d9f75844SAndroid Build Coastguard Worker * Use of this source code is governed by a BSD-style license
5*d9f75844SAndroid Build Coastguard Worker * that can be found in the LICENSE file in the root of the source
6*d9f75844SAndroid Build Coastguard Worker * tree. An additional intellectual property rights grant can be found
7*d9f75844SAndroid Build Coastguard Worker * in the file PATENTS. All contributing project authors may
8*d9f75844SAndroid Build Coastguard Worker * be found in the AUTHORS file in the root of the source tree.
9*d9f75844SAndroid Build Coastguard Worker */
10*d9f75844SAndroid Build Coastguard Worker
11*d9f75844SAndroid Build Coastguard Worker #ifndef RTC_BASE_SWAP_QUEUE_H_
12*d9f75844SAndroid Build Coastguard Worker #define RTC_BASE_SWAP_QUEUE_H_
13*d9f75844SAndroid Build Coastguard Worker
14*d9f75844SAndroid Build Coastguard Worker #include <stddef.h>
15*d9f75844SAndroid Build Coastguard Worker
16*d9f75844SAndroid Build Coastguard Worker #include <atomic>
17*d9f75844SAndroid Build Coastguard Worker #include <utility>
18*d9f75844SAndroid Build Coastguard Worker #include <vector>
19*d9f75844SAndroid Build Coastguard Worker
20*d9f75844SAndroid Build Coastguard Worker #include "absl/base/attributes.h"
21*d9f75844SAndroid Build Coastguard Worker #include "rtc_base/checks.h"
22*d9f75844SAndroid Build Coastguard Worker
23*d9f75844SAndroid Build Coastguard Worker namespace webrtc {
24*d9f75844SAndroid Build Coastguard Worker
25*d9f75844SAndroid Build Coastguard Worker namespace internal {
26*d9f75844SAndroid Build Coastguard Worker
27*d9f75844SAndroid Build Coastguard Worker // (Internal; please don't use outside this file.)
28*d9f75844SAndroid Build Coastguard Worker template <typename T>
NoopSwapQueueItemVerifierFunction(const T &)29*d9f75844SAndroid Build Coastguard Worker bool NoopSwapQueueItemVerifierFunction(const T&) {
30*d9f75844SAndroid Build Coastguard Worker return true;
31*d9f75844SAndroid Build Coastguard Worker }
32*d9f75844SAndroid Build Coastguard Worker
33*d9f75844SAndroid Build Coastguard Worker } // namespace internal
34*d9f75844SAndroid Build Coastguard Worker
35*d9f75844SAndroid Build Coastguard Worker // Functor to use when supplying a verifier function for the queue.
36*d9f75844SAndroid Build Coastguard Worker template <typename T,
37*d9f75844SAndroid Build Coastguard Worker bool (*QueueItemVerifierFunction)(const T&) =
38*d9f75844SAndroid Build Coastguard Worker internal::NoopSwapQueueItemVerifierFunction>
39*d9f75844SAndroid Build Coastguard Worker class SwapQueueItemVerifier {
40*d9f75844SAndroid Build Coastguard Worker public:
operator()41*d9f75844SAndroid Build Coastguard Worker bool operator()(const T& t) const { return QueueItemVerifierFunction(t); }
42*d9f75844SAndroid Build Coastguard Worker };
43*d9f75844SAndroid Build Coastguard Worker
44*d9f75844SAndroid Build Coastguard Worker // This class is a fixed-size queue. A single producer calls Insert() to insert
45*d9f75844SAndroid Build Coastguard Worker // an element of type T at the back of the queue, and a single consumer calls
46*d9f75844SAndroid Build Coastguard Worker // Remove() to remove an element from the front of the queue. It's safe for the
47*d9f75844SAndroid Build Coastguard Worker // producer and the consumer to access the queue concurrently, from different
48*d9f75844SAndroid Build Coastguard Worker // threads.
49*d9f75844SAndroid Build Coastguard Worker //
50*d9f75844SAndroid Build Coastguard Worker // To avoid the construction, copying, and destruction of Ts that a naive
51*d9f75844SAndroid Build Coastguard Worker // queue implementation would require, for each "full" T passed from
52*d9f75844SAndroid Build Coastguard Worker // producer to consumer, SwapQueue<T> passes an "empty" T in the other
53*d9f75844SAndroid Build Coastguard Worker // direction (an "empty" T is one that contains nothing of value for the
54*d9f75844SAndroid Build Coastguard Worker // consumer). This bidirectional movement is implemented with swap().
55*d9f75844SAndroid Build Coastguard Worker //
56*d9f75844SAndroid Build Coastguard Worker // // Create queue:
57*d9f75844SAndroid Build Coastguard Worker // Bottle proto(568); // Prepare an empty Bottle. Heap allocates space for
58*d9f75844SAndroid Build Coastguard Worker // // 568 ml.
59*d9f75844SAndroid Build Coastguard Worker // SwapQueue<Bottle> q(N, proto); // Init queue with N copies of proto.
60*d9f75844SAndroid Build Coastguard Worker // // Each copy allocates on the heap.
61*d9f75844SAndroid Build Coastguard Worker // // Producer pseudo-code:
62*d9f75844SAndroid Build Coastguard Worker // Bottle b(568); // Prepare an empty Bottle. Heap allocates space for 568 ml.
63*d9f75844SAndroid Build Coastguard Worker // loop {
64*d9f75844SAndroid Build Coastguard Worker // b.Fill(amount); // Where amount <= 568 ml.
65*d9f75844SAndroid Build Coastguard Worker // q.Insert(&b); // Swap our full Bottle for an empty one from q.
66*d9f75844SAndroid Build Coastguard Worker // }
67*d9f75844SAndroid Build Coastguard Worker //
68*d9f75844SAndroid Build Coastguard Worker // // Consumer pseudo-code:
69*d9f75844SAndroid Build Coastguard Worker // Bottle b(568); // Prepare an empty Bottle. Heap allocates space for 568 ml.
70*d9f75844SAndroid Build Coastguard Worker // loop {
71*d9f75844SAndroid Build Coastguard Worker // q.Remove(&b); // Swap our empty Bottle for the next-in-line full Bottle.
72*d9f75844SAndroid Build Coastguard Worker // Drink(&b);
73*d9f75844SAndroid Build Coastguard Worker // }
74*d9f75844SAndroid Build Coastguard Worker //
75*d9f75844SAndroid Build Coastguard Worker // For a well-behaved Bottle class, there are no allocations in the
76*d9f75844SAndroid Build Coastguard Worker // producer, since it just fills an empty Bottle that's already large
77*d9f75844SAndroid Build Coastguard Worker // enough; no deallocations in the consumer, since it returns each empty
78*d9f75844SAndroid Build Coastguard Worker // Bottle to the queue after having drunk it; and no copies along the
79*d9f75844SAndroid Build Coastguard Worker // way, since the queue uses swap() everywhere to move full Bottles in
80*d9f75844SAndroid Build Coastguard Worker // one direction and empty ones in the other.
81*d9f75844SAndroid Build Coastguard Worker template <typename T, typename QueueItemVerifier = SwapQueueItemVerifier<T>>
82*d9f75844SAndroid Build Coastguard Worker class SwapQueue {
83*d9f75844SAndroid Build Coastguard Worker public:
84*d9f75844SAndroid Build Coastguard Worker // Creates a queue of size size and fills it with default constructed Ts.
SwapQueue(size_t size)85*d9f75844SAndroid Build Coastguard Worker explicit SwapQueue(size_t size) : queue_(size) {
86*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(VerifyQueueSlots());
87*d9f75844SAndroid Build Coastguard Worker }
88*d9f75844SAndroid Build Coastguard Worker
89*d9f75844SAndroid Build Coastguard Worker // Same as above and accepts an item verification functor.
SwapQueue(size_t size,const QueueItemVerifier & queue_item_verifier)90*d9f75844SAndroid Build Coastguard Worker SwapQueue(size_t size, const QueueItemVerifier& queue_item_verifier)
91*d9f75844SAndroid Build Coastguard Worker : queue_item_verifier_(queue_item_verifier), queue_(size) {
92*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(VerifyQueueSlots());
93*d9f75844SAndroid Build Coastguard Worker }
94*d9f75844SAndroid Build Coastguard Worker
95*d9f75844SAndroid Build Coastguard Worker // Creates a queue of size size and fills it with copies of prototype.
SwapQueue(size_t size,const T & prototype)96*d9f75844SAndroid Build Coastguard Worker SwapQueue(size_t size, const T& prototype) : queue_(size, prototype) {
97*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(VerifyQueueSlots());
98*d9f75844SAndroid Build Coastguard Worker }
99*d9f75844SAndroid Build Coastguard Worker
100*d9f75844SAndroid Build Coastguard Worker // Same as above and accepts an item verification functor.
SwapQueue(size_t size,const T & prototype,const QueueItemVerifier & queue_item_verifier)101*d9f75844SAndroid Build Coastguard Worker SwapQueue(size_t size,
102*d9f75844SAndroid Build Coastguard Worker const T& prototype,
103*d9f75844SAndroid Build Coastguard Worker const QueueItemVerifier& queue_item_verifier)
104*d9f75844SAndroid Build Coastguard Worker : queue_item_verifier_(queue_item_verifier), queue_(size, prototype) {
105*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(VerifyQueueSlots());
106*d9f75844SAndroid Build Coastguard Worker }
107*d9f75844SAndroid Build Coastguard Worker
108*d9f75844SAndroid Build Coastguard Worker // Resets the queue to have zero content while maintaining the queue size.
109*d9f75844SAndroid Build Coastguard Worker // Just like Remove(), this can only be called (safely) from the
110*d9f75844SAndroid Build Coastguard Worker // consumer.
Clear()111*d9f75844SAndroid Build Coastguard Worker void Clear() {
112*d9f75844SAndroid Build Coastguard Worker // Drop all non-empty elements by resetting num_elements_ and incrementing
113*d9f75844SAndroid Build Coastguard Worker // next_read_index_ by the previous value of num_elements_. Relaxed memory
114*d9f75844SAndroid Build Coastguard Worker // ordering is sufficient since the dropped elements are not accessed.
115*d9f75844SAndroid Build Coastguard Worker next_read_index_ += std::atomic_exchange_explicit(
116*d9f75844SAndroid Build Coastguard Worker &num_elements_, size_t{0}, std::memory_order_relaxed);
117*d9f75844SAndroid Build Coastguard Worker if (next_read_index_ >= queue_.size()) {
118*d9f75844SAndroid Build Coastguard Worker next_read_index_ -= queue_.size();
119*d9f75844SAndroid Build Coastguard Worker }
120*d9f75844SAndroid Build Coastguard Worker
121*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK_LT(next_read_index_, queue_.size());
122*d9f75844SAndroid Build Coastguard Worker }
123*d9f75844SAndroid Build Coastguard Worker
124*d9f75844SAndroid Build Coastguard Worker // Inserts a "full" T at the back of the queue by swapping *input with an
125*d9f75844SAndroid Build Coastguard Worker // "empty" T from the queue.
126*d9f75844SAndroid Build Coastguard Worker // Returns true if the item was inserted or false if not (the queue was full).
127*d9f75844SAndroid Build Coastguard Worker // When specified, the T given in *input must pass the ItemVerifier() test.
128*d9f75844SAndroid Build Coastguard Worker // The contents of *input after the call are then also guaranteed to pass the
129*d9f75844SAndroid Build Coastguard Worker // ItemVerifier() test.
Insert(T * input)130*d9f75844SAndroid Build Coastguard Worker ABSL_MUST_USE_RESULT bool Insert(T* input) {
131*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(input);
132*d9f75844SAndroid Build Coastguard Worker
133*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(queue_item_verifier_(*input));
134*d9f75844SAndroid Build Coastguard Worker
135*d9f75844SAndroid Build Coastguard Worker // Load the value of num_elements_. Acquire memory ordering prevents reads
136*d9f75844SAndroid Build Coastguard Worker // and writes to queue_[next_write_index_] to be reordered to before the
137*d9f75844SAndroid Build Coastguard Worker // load. (That element might be accessed by a concurrent call to Remove()
138*d9f75844SAndroid Build Coastguard Worker // until the load finishes.)
139*d9f75844SAndroid Build Coastguard Worker if (std::atomic_load_explicit(&num_elements_, std::memory_order_acquire) ==
140*d9f75844SAndroid Build Coastguard Worker queue_.size()) {
141*d9f75844SAndroid Build Coastguard Worker return false;
142*d9f75844SAndroid Build Coastguard Worker }
143*d9f75844SAndroid Build Coastguard Worker
144*d9f75844SAndroid Build Coastguard Worker using std::swap;
145*d9f75844SAndroid Build Coastguard Worker swap(*input, queue_[next_write_index_]);
146*d9f75844SAndroid Build Coastguard Worker
147*d9f75844SAndroid Build Coastguard Worker // Increment the value of num_elements_ to account for the inserted element.
148*d9f75844SAndroid Build Coastguard Worker // Release memory ordering prevents the reads and writes to
149*d9f75844SAndroid Build Coastguard Worker // queue_[next_write_index_] to be reordered to after the increment. (Once
150*d9f75844SAndroid Build Coastguard Worker // the increment has finished, Remove() might start accessing that element.)
151*d9f75844SAndroid Build Coastguard Worker const size_t old_num_elements = std::atomic_fetch_add_explicit(
152*d9f75844SAndroid Build Coastguard Worker &num_elements_, size_t{1}, std::memory_order_release);
153*d9f75844SAndroid Build Coastguard Worker
154*d9f75844SAndroid Build Coastguard Worker ++next_write_index_;
155*d9f75844SAndroid Build Coastguard Worker if (next_write_index_ == queue_.size()) {
156*d9f75844SAndroid Build Coastguard Worker next_write_index_ = 0;
157*d9f75844SAndroid Build Coastguard Worker }
158*d9f75844SAndroid Build Coastguard Worker
159*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK_LT(next_write_index_, queue_.size());
160*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK_LT(old_num_elements, queue_.size());
161*d9f75844SAndroid Build Coastguard Worker
162*d9f75844SAndroid Build Coastguard Worker return true;
163*d9f75844SAndroid Build Coastguard Worker }
164*d9f75844SAndroid Build Coastguard Worker
165*d9f75844SAndroid Build Coastguard Worker // Removes the frontmost "full" T from the queue by swapping it with
166*d9f75844SAndroid Build Coastguard Worker // the "empty" T in *output.
167*d9f75844SAndroid Build Coastguard Worker // Returns true if an item could be removed or false if not (the queue was
168*d9f75844SAndroid Build Coastguard Worker // empty). When specified, The T given in *output must pass the ItemVerifier()
169*d9f75844SAndroid Build Coastguard Worker // test and the contents of *output after the call are then also guaranteed to
170*d9f75844SAndroid Build Coastguard Worker // pass the ItemVerifier() test.
Remove(T * output)171*d9f75844SAndroid Build Coastguard Worker ABSL_MUST_USE_RESULT bool Remove(T* output) {
172*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(output);
173*d9f75844SAndroid Build Coastguard Worker
174*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(queue_item_verifier_(*output));
175*d9f75844SAndroid Build Coastguard Worker
176*d9f75844SAndroid Build Coastguard Worker // Load the value of num_elements_. Acquire memory ordering prevents reads
177*d9f75844SAndroid Build Coastguard Worker // and writes to queue_[next_read_index_] to be reordered to before the
178*d9f75844SAndroid Build Coastguard Worker // load. (That element might be accessed by a concurrent call to Insert()
179*d9f75844SAndroid Build Coastguard Worker // until the load finishes.)
180*d9f75844SAndroid Build Coastguard Worker if (std::atomic_load_explicit(&num_elements_, std::memory_order_acquire) ==
181*d9f75844SAndroid Build Coastguard Worker 0) {
182*d9f75844SAndroid Build Coastguard Worker return false;
183*d9f75844SAndroid Build Coastguard Worker }
184*d9f75844SAndroid Build Coastguard Worker
185*d9f75844SAndroid Build Coastguard Worker using std::swap;
186*d9f75844SAndroid Build Coastguard Worker swap(*output, queue_[next_read_index_]);
187*d9f75844SAndroid Build Coastguard Worker
188*d9f75844SAndroid Build Coastguard Worker // Decrement the value of num_elements_ to account for the removed element.
189*d9f75844SAndroid Build Coastguard Worker // Release memory ordering prevents the reads and writes to
190*d9f75844SAndroid Build Coastguard Worker // queue_[next_write_index_] to be reordered to after the decrement. (Once
191*d9f75844SAndroid Build Coastguard Worker // the decrement has finished, Insert() might start accessing that element.)
192*d9f75844SAndroid Build Coastguard Worker std::atomic_fetch_sub_explicit(&num_elements_, size_t{1},
193*d9f75844SAndroid Build Coastguard Worker std::memory_order_release);
194*d9f75844SAndroid Build Coastguard Worker
195*d9f75844SAndroid Build Coastguard Worker ++next_read_index_;
196*d9f75844SAndroid Build Coastguard Worker if (next_read_index_ == queue_.size()) {
197*d9f75844SAndroid Build Coastguard Worker next_read_index_ = 0;
198*d9f75844SAndroid Build Coastguard Worker }
199*d9f75844SAndroid Build Coastguard Worker
200*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK_LT(next_read_index_, queue_.size());
201*d9f75844SAndroid Build Coastguard Worker
202*d9f75844SAndroid Build Coastguard Worker return true;
203*d9f75844SAndroid Build Coastguard Worker }
204*d9f75844SAndroid Build Coastguard Worker
205*d9f75844SAndroid Build Coastguard Worker // Returns the current number of elements in the queue. Since elements may be
206*d9f75844SAndroid Build Coastguard Worker // concurrently added to the queue, the caller must treat this as a lower
207*d9f75844SAndroid Build Coastguard Worker // bound, not an exact count.
208*d9f75844SAndroid Build Coastguard Worker // May only be called by the consumer.
SizeAtLeast()209*d9f75844SAndroid Build Coastguard Worker size_t SizeAtLeast() const {
210*d9f75844SAndroid Build Coastguard Worker // Acquire memory ordering ensures that we wait for the producer to finish
211*d9f75844SAndroid Build Coastguard Worker // inserting any element in progress.
212*d9f75844SAndroid Build Coastguard Worker return std::atomic_load_explicit(&num_elements_, std::memory_order_acquire);
213*d9f75844SAndroid Build Coastguard Worker }
214*d9f75844SAndroid Build Coastguard Worker
215*d9f75844SAndroid Build Coastguard Worker private:
216*d9f75844SAndroid Build Coastguard Worker // Verify that the queue slots complies with the ItemVerifier test. This
217*d9f75844SAndroid Build Coastguard Worker // function is not thread-safe and can only be used in the constructors.
VerifyQueueSlots()218*d9f75844SAndroid Build Coastguard Worker bool VerifyQueueSlots() {
219*d9f75844SAndroid Build Coastguard Worker for (const auto& v : queue_) {
220*d9f75844SAndroid Build Coastguard Worker RTC_DCHECK(queue_item_verifier_(v));
221*d9f75844SAndroid Build Coastguard Worker }
222*d9f75844SAndroid Build Coastguard Worker return true;
223*d9f75844SAndroid Build Coastguard Worker }
224*d9f75844SAndroid Build Coastguard Worker
225*d9f75844SAndroid Build Coastguard Worker // TODO(peah): Change this to use std::function() once we can use C++11 std
226*d9f75844SAndroid Build Coastguard Worker // lib.
227*d9f75844SAndroid Build Coastguard Worker QueueItemVerifier queue_item_verifier_;
228*d9f75844SAndroid Build Coastguard Worker
229*d9f75844SAndroid Build Coastguard Worker // Only accessed by the single producer.
230*d9f75844SAndroid Build Coastguard Worker size_t next_write_index_ = 0;
231*d9f75844SAndroid Build Coastguard Worker
232*d9f75844SAndroid Build Coastguard Worker // Only accessed by the single consumer.
233*d9f75844SAndroid Build Coastguard Worker size_t next_read_index_ = 0;
234*d9f75844SAndroid Build Coastguard Worker
235*d9f75844SAndroid Build Coastguard Worker // Accessed by both the producer and the consumer and used for synchronization
236*d9f75844SAndroid Build Coastguard Worker // between them.
237*d9f75844SAndroid Build Coastguard Worker std::atomic<size_t> num_elements_{0};
238*d9f75844SAndroid Build Coastguard Worker
239*d9f75844SAndroid Build Coastguard Worker // The elements of the queue are acced by both the producer and the consumer,
240*d9f75844SAndroid Build Coastguard Worker // mediated by num_elements_. queue_.size() is constant.
241*d9f75844SAndroid Build Coastguard Worker std::vector<T> queue_;
242*d9f75844SAndroid Build Coastguard Worker
243*d9f75844SAndroid Build Coastguard Worker SwapQueue(const SwapQueue&) = delete;
244*d9f75844SAndroid Build Coastguard Worker SwapQueue& operator=(const SwapQueue&) = delete;
245*d9f75844SAndroid Build Coastguard Worker };
246*d9f75844SAndroid Build Coastguard Worker
247*d9f75844SAndroid Build Coastguard Worker } // namespace webrtc
248*d9f75844SAndroid Build Coastguard Worker
249*d9f75844SAndroid Build Coastguard Worker #endif // RTC_BASE_SWAP_QUEUE_H_
250