xref: /aosp_15_r20/external/webrtc/rtc_base/operations_chain_unittest.cc (revision d9f758449e529ab9291ac668be2861e7a55c2422)
1 /*
2  *  Copyright 2019 The WebRTC Project Authors. All rights reserved.
3  *
4  *  Use of this source code is governed by a BSD-style license
5  *  that can be found in the LICENSE file in the root of the source
6  *  tree. An additional intellectual property rights grant can be found
7  *  in the file PATENTS.  All contributing project authors may
8  *  be found in the AUTHORS file in the root of the source tree.
9  */
10 
11 #include "rtc_base/operations_chain.h"
12 
13 #include <atomic>
14 #include <functional>
15 #include <memory>
16 #include <utility>
17 #include <vector>
18 
19 #include "rtc_base/event.h"
20 #include "rtc_base/gunit.h"
21 #include "rtc_base/thread.h"
22 #include "test/gmock.h"
23 #include "test/gtest.h"
24 
25 namespace rtc {
26 
27 using ::testing::ElementsAre;
28 
29 namespace {
30 
31 constexpr int kDefaultTimeout = 3000;
32 
33 }  // namespace
34 
35 class OperationTracker {
36  public:
OperationTracker()37   OperationTracker() : background_thread_(Thread::Create()) {
38     background_thread_->Start();
39   }
40   // The caller is responsible for ensuring that no operations are pending.
~OperationTracker()41   ~OperationTracker() {}
42 
43   // Creates a binding for the synchronous operation (see
44   // StartSynchronousOperation() below).
BindSynchronousOperation(Event * operation_complete_event)45   std::function<void(std::function<void()>)> BindSynchronousOperation(
46       Event* operation_complete_event) {
47     return [this, operation_complete_event](std::function<void()> callback) {
48       StartSynchronousOperation(operation_complete_event, std::move(callback));
49     };
50   }
51 
52   // Creates a binding for the asynchronous operation (see
53   // StartAsynchronousOperation() below).
BindAsynchronousOperation(Event * unblock_operation_event,Event * operation_complete_event)54   std::function<void(std::function<void()>)> BindAsynchronousOperation(
55       Event* unblock_operation_event,
56       Event* operation_complete_event) {
57     return [this, unblock_operation_event,
58             operation_complete_event](std::function<void()> callback) {
59       StartAsynchronousOperation(unblock_operation_event,
60                                  operation_complete_event, std::move(callback));
61     };
62   }
63 
64   // When an operation is completed, its associated Event* is added to this
65   // list, in chronological order. This allows you to verify the order that
66   // operations are executed.
completed_operation_events() const67   const std::vector<Event*>& completed_operation_events() const {
68     return completed_operation_events_;
69   }
70 
71  private:
72   // This operation is completed synchronously; the callback is invoked before
73   // the function returns.
StartSynchronousOperation(Event * operation_complete_event,std::function<void ()> callback)74   void StartSynchronousOperation(Event* operation_complete_event,
75                                  std::function<void()> callback) {
76     completed_operation_events_.push_back(operation_complete_event);
77     operation_complete_event->Set();
78     callback();
79   }
80 
81   // This operation is completed asynchronously; it pings `background_thread_`,
82   // blocking that thread until `unblock_operation_event` is signaled and then
83   // completes upon posting back to the thread that the operation started on.
84   // Note that this requires the starting thread to be executing tasks (handle
85   // messages), i.e. must not be blocked.
StartAsynchronousOperation(Event * unblock_operation_event,Event * operation_complete_event,std::function<void ()> callback)86   void StartAsynchronousOperation(Event* unblock_operation_event,
87                                   Event* operation_complete_event,
88                                   std::function<void()> callback) {
89     Thread* current_thread = Thread::Current();
90     background_thread_->PostTask([this, current_thread, unblock_operation_event,
91                                   operation_complete_event, callback]() {
92       unblock_operation_event->Wait(Event::kForever);
93       current_thread->PostTask([this, operation_complete_event, callback]() {
94         completed_operation_events_.push_back(operation_complete_event);
95         operation_complete_event->Set();
96         callback();
97       });
98     });
99   }
100 
101   std::unique_ptr<Thread> background_thread_;
102   std::vector<Event*> completed_operation_events_;
103 };
104 
105 // The OperationTrackerProxy ensures all operations are chained on a separate
106 // thread. This allows tests to block while chained operations are posting
107 // between threads.
108 class OperationTrackerProxy {
109  public:
OperationTrackerProxy()110   OperationTrackerProxy()
111       : operations_chain_thread_(Thread::Create()),
112         operation_tracker_(nullptr),
113         operations_chain_(nullptr) {
114     operations_chain_thread_->Start();
115   }
116 
Initialize()117   std::unique_ptr<Event> Initialize() {
118     std::unique_ptr<Event> event = std::make_unique<Event>();
119     operations_chain_thread_->PostTask([this, event_ptr = event.get()]() {
120       operation_tracker_ = std::make_unique<OperationTracker>();
121       operations_chain_ = OperationsChain::Create();
122       event_ptr->Set();
123     });
124     return event;
125   }
126 
SetOnChainEmptyCallback(std::function<void ()> on_chain_empty_callback)127   void SetOnChainEmptyCallback(std::function<void()> on_chain_empty_callback) {
128     Event event;
129     operations_chain_thread_->PostTask(
130         [this, &event,
131          on_chain_empty_callback = std::move(on_chain_empty_callback)]() {
132           operations_chain_->SetOnChainEmptyCallback(
133               std::move(on_chain_empty_callback));
134           event.Set();
135         });
136     event.Wait(Event::kForever);
137   }
138 
IsEmpty()139   bool IsEmpty() {
140     Event event;
141     bool is_empty = false;
142     operations_chain_thread_->PostTask([this, &event, &is_empty]() {
143       is_empty = operations_chain_->IsEmpty();
144       event.Set();
145     });
146     event.Wait(Event::kForever);
147     return is_empty;
148   }
149 
ReleaseOperationChain()150   std::unique_ptr<Event> ReleaseOperationChain() {
151     std::unique_ptr<Event> event = std::make_unique<Event>();
152     operations_chain_thread_->PostTask([this, event_ptr = event.get()]() {
153       operations_chain_ = nullptr;
154       event_ptr->Set();
155     });
156     return event;
157   }
158 
159   // Chains a synchronous operation on the operation chain's thread.
PostSynchronousOperation()160   std::unique_ptr<Event> PostSynchronousOperation() {
161     std::unique_ptr<Event> operation_complete_event = std::make_unique<Event>();
162     operations_chain_thread_->PostTask(
163         [this,
164          operation_complete_event_ptr = operation_complete_event.get()]() {
165           operations_chain_->ChainOperation(
166               operation_tracker_->BindSynchronousOperation(
167                   operation_complete_event_ptr));
168         });
169     return operation_complete_event;
170   }
171 
172   // Chains an asynchronous operation on the operation chain's thread. This
173   // involves the operation chain thread and an additional background thread.
PostAsynchronousOperation(Event * unblock_operation_event)174   std::unique_ptr<Event> PostAsynchronousOperation(
175       Event* unblock_operation_event) {
176     std::unique_ptr<Event> operation_complete_event = std::make_unique<Event>();
177     operations_chain_thread_->PostTask(
178         [this, unblock_operation_event,
179          operation_complete_event_ptr = operation_complete_event.get()]() {
180           operations_chain_->ChainOperation(
181               operation_tracker_->BindAsynchronousOperation(
182                   unblock_operation_event, operation_complete_event_ptr));
183         });
184     return operation_complete_event;
185   }
186 
187   // The order of completed events. Touches the `operation_tracker_` on the
188   // calling thread, this is only thread safe if all chained operations have
189   // completed.
completed_operation_events() const190   const std::vector<Event*>& completed_operation_events() const {
191     return operation_tracker_->completed_operation_events();
192   }
193 
194  private:
195   std::unique_ptr<Thread> operations_chain_thread_;
196   std::unique_ptr<OperationTracker> operation_tracker_;
197   scoped_refptr<OperationsChain> operations_chain_;
198 };
199 
200 // On destruction, sets a boolean flag to true.
201 class SignalOnDestruction final {
202  public:
SignalOnDestruction(bool * destructor_called)203   SignalOnDestruction(bool* destructor_called)
204       : destructor_called_(destructor_called) {
205     RTC_DCHECK(destructor_called_);
206   }
~SignalOnDestruction()207   ~SignalOnDestruction() {
208     // Moved objects will have `destructor_called_` set to null. Destroying a
209     // moved SignalOnDestruction should not signal.
210     if (destructor_called_) {
211       *destructor_called_ = true;
212     }
213   }
214 
215   SignalOnDestruction(const SignalOnDestruction&) = delete;
216   SignalOnDestruction& operator=(const SignalOnDestruction&) = delete;
217 
218   // Move operators.
SignalOnDestruction(SignalOnDestruction && other)219   SignalOnDestruction(SignalOnDestruction&& other)
220       : SignalOnDestruction(other.destructor_called_) {
221     other.destructor_called_ = nullptr;
222   }
operator =(SignalOnDestruction && other)223   SignalOnDestruction& operator=(SignalOnDestruction&& other) {
224     destructor_called_ = other.destructor_called_;
225     other.destructor_called_ = nullptr;
226     return *this;
227   }
228 
229  private:
230   bool* destructor_called_;
231 };
232 
TEST(OperationsChainTest,SynchronousOperation)233 TEST(OperationsChainTest, SynchronousOperation) {
234   OperationTrackerProxy operation_tracker_proxy;
235   operation_tracker_proxy.Initialize()->Wait(Event::kForever);
236 
237   operation_tracker_proxy.PostSynchronousOperation()->Wait(Event::kForever);
238 }
239 
TEST(OperationsChainTest,AsynchronousOperation)240 TEST(OperationsChainTest, AsynchronousOperation) {
241   OperationTrackerProxy operation_tracker_proxy;
242   operation_tracker_proxy.Initialize()->Wait(Event::kForever);
243 
244   Event unblock_async_operation_event;
245   auto async_operation_completed_event =
246       operation_tracker_proxy.PostAsynchronousOperation(
247           &unblock_async_operation_event);
248   // This should not be signaled until we unblock the operation.
249   EXPECT_FALSE(
250       async_operation_completed_event->Wait(webrtc::TimeDelta::Zero()));
251   // Unblock the operation and wait for it to complete.
252   unblock_async_operation_event.Set();
253   async_operation_completed_event->Wait(Event::kForever);
254 }
255 
TEST(OperationsChainTest,SynchronousOperationsAreExecutedImmediatelyWhenChainIsEmpty)256 TEST(OperationsChainTest,
257      SynchronousOperationsAreExecutedImmediatelyWhenChainIsEmpty) {
258   // Testing synchonicity must be done without the OperationTrackerProxy to
259   // ensure messages are not processed in parallel. This test has no background
260   // threads.
261   scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
262   OperationTracker operation_tracker;
263   Event event0;
264   operations_chain->ChainOperation(
265       operation_tracker.BindSynchronousOperation(&event0));
266   // This should already be signaled. (If it wasn't, waiting wouldn't help,
267   // because we'd be blocking the only thread that exists.)
268   EXPECT_TRUE(event0.Wait(webrtc::TimeDelta::Zero()));
269   // Chaining another operation should also execute immediately because the
270   // chain should already be empty.
271   Event event1;
272   operations_chain->ChainOperation(
273       operation_tracker.BindSynchronousOperation(&event1));
274   EXPECT_TRUE(event1.Wait(webrtc::TimeDelta::Zero()));
275 }
276 
TEST(OperationsChainTest,AsynchronousOperationBlocksSynchronousOperation)277 TEST(OperationsChainTest, AsynchronousOperationBlocksSynchronousOperation) {
278   OperationTrackerProxy operation_tracker_proxy;
279   operation_tracker_proxy.Initialize()->Wait(Event::kForever);
280 
281   Event unblock_async_operation_event;
282   auto async_operation_completed_event =
283       operation_tracker_proxy.PostAsynchronousOperation(
284           &unblock_async_operation_event);
285 
286   auto sync_operation_completed_event =
287       operation_tracker_proxy.PostSynchronousOperation();
288 
289   unblock_async_operation_event.Set();
290 
291   sync_operation_completed_event->Wait(Event::kForever);
292   // The asynchronous avent should have blocked the synchronous event, meaning
293   // this should already be signaled.
294   EXPECT_TRUE(async_operation_completed_event->Wait(webrtc::TimeDelta::Zero()));
295 }
296 
TEST(OperationsChainTest,OperationsAreExecutedInOrder)297 TEST(OperationsChainTest, OperationsAreExecutedInOrder) {
298   OperationTrackerProxy operation_tracker_proxy;
299   operation_tracker_proxy.Initialize()->Wait(Event::kForever);
300 
301   // Chain a mix of asynchronous and synchronous operations.
302   Event operation0_unblock_event;
303   auto operation0_completed_event =
304       operation_tracker_proxy.PostAsynchronousOperation(
305           &operation0_unblock_event);
306 
307   Event operation1_unblock_event;
308   auto operation1_completed_event =
309       operation_tracker_proxy.PostAsynchronousOperation(
310           &operation1_unblock_event);
311 
312   auto operation2_completed_event =
313       operation_tracker_proxy.PostSynchronousOperation();
314 
315   auto operation3_completed_event =
316       operation_tracker_proxy.PostSynchronousOperation();
317 
318   Event operation4_unblock_event;
319   auto operation4_completed_event =
320       operation_tracker_proxy.PostAsynchronousOperation(
321           &operation4_unblock_event);
322 
323   auto operation5_completed_event =
324       operation_tracker_proxy.PostSynchronousOperation();
325 
326   Event operation6_unblock_event;
327   auto operation6_completed_event =
328       operation_tracker_proxy.PostAsynchronousOperation(
329           &operation6_unblock_event);
330 
331   // Unblock events in reverse order. Operations 5, 3 and 2 are synchronous and
332   // don't need to be unblocked.
333   operation6_unblock_event.Set();
334   operation4_unblock_event.Set();
335   operation1_unblock_event.Set();
336   operation0_unblock_event.Set();
337   // Await all operations. The await-order shouldn't matter since they all get
338   // executed eventually.
339   operation0_completed_event->Wait(Event::kForever);
340   operation1_completed_event->Wait(Event::kForever);
341   operation2_completed_event->Wait(Event::kForever);
342   operation3_completed_event->Wait(Event::kForever);
343   operation4_completed_event->Wait(Event::kForever);
344   operation5_completed_event->Wait(Event::kForever);
345   operation6_completed_event->Wait(Event::kForever);
346 
347   EXPECT_THAT(
348       operation_tracker_proxy.completed_operation_events(),
349       ElementsAre(
350           operation0_completed_event.get(), operation1_completed_event.get(),
351           operation2_completed_event.get(), operation3_completed_event.get(),
352           operation4_completed_event.get(), operation5_completed_event.get(),
353           operation6_completed_event.get()));
354 }
355 
TEST(OperationsChainTest,IsEmpty)356 TEST(OperationsChainTest, IsEmpty) {
357   OperationTrackerProxy operation_tracker_proxy;
358   operation_tracker_proxy.Initialize()->Wait(Event::kForever);
359 
360   // The chain is initially empty.
361   EXPECT_TRUE(operation_tracker_proxy.IsEmpty());
362   // Chain a single event.
363   Event unblock_async_operation_event0;
364   auto async_operation_completed_event0 =
365       operation_tracker_proxy.PostAsynchronousOperation(
366           &unblock_async_operation_event0);
367   // The chain is not empty while an event is pending.
368   EXPECT_FALSE(operation_tracker_proxy.IsEmpty());
369   // Completing the operation empties the chain.
370   unblock_async_operation_event0.Set();
371   async_operation_completed_event0->Wait(Event::kForever);
372   EXPECT_TRUE(operation_tracker_proxy.IsEmpty());
373 
374   // Chain multiple events.
375   Event unblock_async_operation_event1;
376   auto async_operation_completed_event1 =
377       operation_tracker_proxy.PostAsynchronousOperation(
378           &unblock_async_operation_event1);
379   Event unblock_async_operation_event2;
380   auto async_operation_completed_event2 =
381       operation_tracker_proxy.PostAsynchronousOperation(
382           &unblock_async_operation_event2);
383   // Again, the chain is not empty while an event is pending.
384   EXPECT_FALSE(operation_tracker_proxy.IsEmpty());
385   // Upon completing the first event, the chain is still not empty.
386   unblock_async_operation_event1.Set();
387   async_operation_completed_event1->Wait(Event::kForever);
388   EXPECT_FALSE(operation_tracker_proxy.IsEmpty());
389   // Completing the last evenet empties the chain.
390   unblock_async_operation_event2.Set();
391   async_operation_completed_event2->Wait(Event::kForever);
392   EXPECT_TRUE(operation_tracker_proxy.IsEmpty());
393 }
394 
TEST(OperationsChainTest,OnChainEmptyCallback)395 TEST(OperationsChainTest, OnChainEmptyCallback) {
396   rtc::AutoThread main_thread;
397   OperationTrackerProxy operation_tracker_proxy;
398   operation_tracker_proxy.Initialize()->Wait(Event::kForever);
399 
400   std::atomic<size_t> on_empty_callback_counter(0u);
401   operation_tracker_proxy.SetOnChainEmptyCallback(
402       [&on_empty_callback_counter] { ++on_empty_callback_counter; });
403 
404   // Chain a single event.
405   Event unblock_async_operation_event0;
406   auto async_operation_completed_event0 =
407       operation_tracker_proxy.PostAsynchronousOperation(
408           &unblock_async_operation_event0);
409   // The callback is not invoked until the operation has completed.
410   EXPECT_EQ(0u, on_empty_callback_counter);
411   // Completing the operation empties the chain, invoking the callback.
412   unblock_async_operation_event0.Set();
413   async_operation_completed_event0->Wait(Event::kForever);
414   EXPECT_TRUE_WAIT(1u == on_empty_callback_counter, kDefaultTimeout);
415 
416   // Chain multiple events.
417   Event unblock_async_operation_event1;
418   auto async_operation_completed_event1 =
419       operation_tracker_proxy.PostAsynchronousOperation(
420           &unblock_async_operation_event1);
421   Event unblock_async_operation_event2;
422   auto async_operation_completed_event2 =
423       operation_tracker_proxy.PostAsynchronousOperation(
424           &unblock_async_operation_event2);
425   // Again, the callback is not invoked until the operation has completed.
426   EXPECT_TRUE_WAIT(1u == on_empty_callback_counter, kDefaultTimeout);
427   // Upon completing the first event, the chain is still not empty, so the
428   // callback must not be invoked yet.
429   unblock_async_operation_event1.Set();
430   async_operation_completed_event1->Wait(Event::kForever);
431   EXPECT_TRUE_WAIT(1u == on_empty_callback_counter, kDefaultTimeout);
432   // Completing the last evenet empties the chain, invoking the callback.
433   unblock_async_operation_event2.Set();
434   async_operation_completed_event2->Wait(Event::kForever);
435   EXPECT_TRUE_WAIT(2u == on_empty_callback_counter, kDefaultTimeout);
436 }
437 
TEST(OperationsChainTest,SafeToReleaseReferenceToOperationChainWhileOperationIsPending)438 TEST(OperationsChainTest,
439      SafeToReleaseReferenceToOperationChainWhileOperationIsPending) {
440   OperationTrackerProxy operation_tracker_proxy;
441   operation_tracker_proxy.Initialize()->Wait(Event::kForever);
442 
443   Event unblock_async_operation_event;
444   auto async_operation_completed_event =
445       operation_tracker_proxy.PostAsynchronousOperation(
446           &unblock_async_operation_event);
447 
448   // Pending operations keep the OperationChain alive, making it safe for the
449   // test to release any references before unblocking the async operation.
450   operation_tracker_proxy.ReleaseOperationChain()->Wait(Event::kForever);
451 
452   unblock_async_operation_event.Set();
453   async_operation_completed_event->Wait(Event::kForever);
454 }
455 
TEST(OperationsChainTest,FunctorIsNotDestroyedWhileExecuting)456 TEST(OperationsChainTest, FunctorIsNotDestroyedWhileExecuting) {
457   scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
458 
459   bool destructor_called = false;
460   SignalOnDestruction signal_on_destruction(&destructor_called);
461 
462   operations_chain->ChainOperation(
463       [signal_on_destruction = std::move(signal_on_destruction),
464        &destructor_called](std::function<void()> callback) {
465         EXPECT_FALSE(destructor_called);
466         // Invoking the callback marks the operation as complete, popping the
467         // Operation object from the OperationsChain internal queue.
468         callback();
469         // Even though the internal Operation object has been destroyed,
470         // variables captured by this lambda expression must still be valid (the
471         // associated functor must not be deleted while executing).
472         EXPECT_FALSE(destructor_called);
473       });
474   // The lambda having executed synchronously and completed, its captured
475   // variables should now have been deleted.
476   EXPECT_TRUE(destructor_called);
477 }
478 
479 #if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
480 
TEST(OperationsChainDeathTest,OperationNotInvokingCallbackShouldCrash)481 TEST(OperationsChainDeathTest, OperationNotInvokingCallbackShouldCrash) {
482   scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
483   EXPECT_DEATH(
484       operations_chain->ChainOperation([](std::function<void()> callback) {}),
485       "");
486 }
487 
TEST(OperationsChainDeathTest,OperationInvokingCallbackMultipleTimesShouldCrash)488 TEST(OperationsChainDeathTest,
489      OperationInvokingCallbackMultipleTimesShouldCrash) {
490   scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
491   EXPECT_DEATH(
492       operations_chain->ChainOperation([](std::function<void()> callback) {
493         // Signal that the operation has completed multiple times.
494         callback();
495         callback();
496       }),
497       "");
498 }
499 
500 #endif  // RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
501 
502 }  // namespace rtc
503