xref: /aosp_15_r20/external/cronet/base/functional/concurrent_callbacks.h (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1*6777b538SAndroid Build Coastguard Worker // Copyright 2024 The Chromium Authors
2*6777b538SAndroid Build Coastguard Worker // Use of this source code is governed by a BSD-style license that can be
3*6777b538SAndroid Build Coastguard Worker // found in the LICENSE file.
4*6777b538SAndroid Build Coastguard Worker 
5*6777b538SAndroid Build Coastguard Worker #ifndef BASE_FUNCTIONAL_CONCURRENT_CALLBACKS_H_
6*6777b538SAndroid Build Coastguard Worker #define BASE_FUNCTIONAL_CONCURRENT_CALLBACKS_H_
7*6777b538SAndroid Build Coastguard Worker 
8*6777b538SAndroid Build Coastguard Worker #include <memory>
9*6777b538SAndroid Build Coastguard Worker #include <type_traits>
10*6777b538SAndroid Build Coastguard Worker #include <vector>
11*6777b538SAndroid Build Coastguard Worker 
12*6777b538SAndroid Build Coastguard Worker #include "base/functional/bind.h"
13*6777b538SAndroid Build Coastguard Worker #include "base/functional/callback.h"
14*6777b538SAndroid Build Coastguard Worker #include "base/location.h"
15*6777b538SAndroid Build Coastguard Worker #include "base/memory/raw_ptr.h"
16*6777b538SAndroid Build Coastguard Worker #include "base/task/bind_post_task.h"
17*6777b538SAndroid Build Coastguard Worker #include "base/task/sequenced_task_runner.h"
18*6777b538SAndroid Build Coastguard Worker 
19*6777b538SAndroid Build Coastguard Worker // OVERVIEW:
20*6777b538SAndroid Build Coastguard Worker //
21*6777b538SAndroid Build Coastguard Worker // ConcurrentCallbacks<T> is an alternative to BarrierCallback<T>, it dispenses
22*6777b538SAndroid Build Coastguard Worker // OnceCallbacks via CreateCallback() and invokes the callback passed to Done()
23*6777b538SAndroid Build Coastguard Worker // after all prior callbacks have been run.
24*6777b538SAndroid Build Coastguard Worker //
25*6777b538SAndroid Build Coastguard Worker // ConcurrentCallbacks<T> is intended to be used over BarrierCallback<T> in
26*6777b538SAndroid Build Coastguard Worker // cases where the count is unknown prior to requiring a callback to start a
27*6777b538SAndroid Build Coastguard Worker // task, and for cases where the count is manually derived from the code and
28*6777b538SAndroid Build Coastguard Worker // subject to human error.
29*6777b538SAndroid Build Coastguard Worker //
30*6777b538SAndroid Build Coastguard Worker // IMPORTANT NOTES:
31*6777b538SAndroid Build Coastguard Worker //
32*6777b538SAndroid Build Coastguard Worker // - ConcurrentCallbacks<T> is NOT thread safe.
33*6777b538SAndroid Build Coastguard Worker // - The done callback will NOT be run synchronously, it will be PostTask() to
34*6777b538SAndroid Build Coastguard Worker //   the sequence that Done() was invoked on.
35*6777b538SAndroid Build Coastguard Worker // - ConcurrentCallbacks<T> cannot be used after Done() is called, a CHECK
36*6777b538SAndroid Build Coastguard Worker //   verifies this.
37*6777b538SAndroid Build Coastguard Worker //
38*6777b538SAndroid Build Coastguard Worker // TYPICAL USAGE:
39*6777b538SAndroid Build Coastguard Worker //
40*6777b538SAndroid Build Coastguard Worker // class Example {
41*6777b538SAndroid Build Coastguard Worker //   void OnRequestsReceived(std::vector<Request> requests) {
42*6777b538SAndroid Build Coastguard Worker //     base::ConcurrentCallbacks<Result> concurrent;
43*6777b538SAndroid Build Coastguard Worker //
44*6777b538SAndroid Build Coastguard Worker //     for (Request& request : requests) {
45*6777b538SAndroid Build Coastguard Worker //       if (IsValidRequest(request)) {
46*6777b538SAndroid Build Coastguard Worker //         StartRequest(std::move(request), concurrent.CreateCallback());
47*6777b538SAndroid Build Coastguard Worker //       }
48*6777b538SAndroid Build Coastguard Worker //     }
49*6777b538SAndroid Build Coastguard Worker //
50*6777b538SAndroid Build Coastguard Worker //     std::move(concurrent).Done(
51*6777b538SAndroid Build Coastguard Worker //         base::BindOnce(&Example::OnRequestsComplete, GetWeakPtr()));
52*6777b538SAndroid Build Coastguard Worker //   }
53*6777b538SAndroid Build Coastguard Worker //
54*6777b538SAndroid Build Coastguard Worker //   void StartRequest(Request request,
55*6777b538SAndroid Build Coastguard Worker //                     base::OnceCallback<void(Result)> callback) {
56*6777b538SAndroid Build Coastguard Worker //     // Process the request asynchronously and call callback with a Result.
57*6777b538SAndroid Build Coastguard Worker //   }
58*6777b538SAndroid Build Coastguard Worker //
59*6777b538SAndroid Build Coastguard Worker //   void OnRequestsComplete(std::vector<Result> results) {
60*6777b538SAndroid Build Coastguard Worker //     // Invoked after all requests are completed and receives the results of
61*6777b538SAndroid Build Coastguard Worker //     // all of them.
62*6777b538SAndroid Build Coastguard Worker //   }
63*6777b538SAndroid Build Coastguard Worker // };
64*6777b538SAndroid Build Coastguard Worker 
65*6777b538SAndroid Build Coastguard Worker namespace base {
66*6777b538SAndroid Build Coastguard Worker 
67*6777b538SAndroid Build Coastguard Worker template <typename T>
68*6777b538SAndroid Build Coastguard Worker class ConcurrentCallbacks {
69*6777b538SAndroid Build Coastguard Worker  public:
70*6777b538SAndroid Build Coastguard Worker   using Results = std::vector<std::remove_cvref_t<T>>;
71*6777b538SAndroid Build Coastguard Worker 
ConcurrentCallbacks()72*6777b538SAndroid Build Coastguard Worker   ConcurrentCallbacks() {
73*6777b538SAndroid Build Coastguard Worker     auto info_owner = std::make_unique<Info>();
74*6777b538SAndroid Build Coastguard Worker     info_ = info_owner.get();
75*6777b538SAndroid Build Coastguard Worker     info_run_callback_ = BindRepeating(&Info::Run, std::move(info_owner));
76*6777b538SAndroid Build Coastguard Worker   }
77*6777b538SAndroid Build Coastguard Worker 
78*6777b538SAndroid Build Coastguard Worker   // Create a callback for the done callback to wait for.
CreateCallback()79*6777b538SAndroid Build Coastguard Worker   [[nodiscard]] OnceCallback<void(T)> CreateCallback() {
80*6777b538SAndroid Build Coastguard Worker     CHECK(info_);
81*6777b538SAndroid Build Coastguard Worker     ++info_->pending_;
82*6777b538SAndroid Build Coastguard Worker     return info_run_callback_;
83*6777b538SAndroid Build Coastguard Worker   }
84*6777b538SAndroid Build Coastguard Worker 
85*6777b538SAndroid Build Coastguard Worker   // Finish creating concurrent callbacks and provide done callback to run once
86*6777b538SAndroid Build Coastguard Worker   // all prior callbacks have executed.
87*6777b538SAndroid Build Coastguard Worker   // `this` is no longer usable after calling Done(), must be called with
88*6777b538SAndroid Build Coastguard Worker   // std::move().
89*6777b538SAndroid Build Coastguard Worker   void Done(OnceCallback<void(Results)> done_callback,
90*6777b538SAndroid Build Coastguard Worker             const Location& location = FROM_HERE) && {
91*6777b538SAndroid Build Coastguard Worker     CHECK(info_);
92*6777b538SAndroid Build Coastguard Worker     info_->done_callback_ =
93*6777b538SAndroid Build Coastguard Worker         BindPostTask(SequencedTaskRunner::GetCurrentDefault(),
94*6777b538SAndroid Build Coastguard Worker                      std::move(done_callback), location);
95*6777b538SAndroid Build Coastguard Worker     if (info_->pending_ == 0u) {
96*6777b538SAndroid Build Coastguard Worker       std::move(info_->done_callback_).Run(std::move(info_->results_));
97*6777b538SAndroid Build Coastguard Worker     }
98*6777b538SAndroid Build Coastguard Worker     info_ = nullptr;
99*6777b538SAndroid Build Coastguard Worker   }
100*6777b538SAndroid Build Coastguard Worker 
101*6777b538SAndroid Build Coastguard Worker  private:
102*6777b538SAndroid Build Coastguard Worker   class Info {
103*6777b538SAndroid Build Coastguard Worker    public:
104*6777b538SAndroid Build Coastguard Worker     Info() = default;
105*6777b538SAndroid Build Coastguard Worker 
Run(T value)106*6777b538SAndroid Build Coastguard Worker     void Run(T value) {
107*6777b538SAndroid Build Coastguard Worker       CHECK_GT(pending_, 0u);
108*6777b538SAndroid Build Coastguard Worker       --pending_;
109*6777b538SAndroid Build Coastguard Worker       results_.push_back(std::move(value));
110*6777b538SAndroid Build Coastguard Worker       if (done_callback_ && pending_ == 0u) {
111*6777b538SAndroid Build Coastguard Worker         std::move(done_callback_).Run(std::move(results_));
112*6777b538SAndroid Build Coastguard Worker       }
113*6777b538SAndroid Build Coastguard Worker     }
114*6777b538SAndroid Build Coastguard Worker 
115*6777b538SAndroid Build Coastguard Worker     size_t pending_ = 0u;
116*6777b538SAndroid Build Coastguard Worker     Results results_;
117*6777b538SAndroid Build Coastguard Worker     OnceCallback<void(Results)> done_callback_;
118*6777b538SAndroid Build Coastguard Worker   };
119*6777b538SAndroid Build Coastguard Worker 
120*6777b538SAndroid Build Coastguard Worker   RepeatingCallback<void(T)> info_run_callback_;
121*6777b538SAndroid Build Coastguard Worker   // info_ is owned by info_run_callback_.
122*6777b538SAndroid Build Coastguard Worker   raw_ptr<Info> info_;
123*6777b538SAndroid Build Coastguard Worker };
124*6777b538SAndroid Build Coastguard Worker 
125*6777b538SAndroid Build Coastguard Worker }  // namespace base
126*6777b538SAndroid Build Coastguard Worker 
127*6777b538SAndroid Build Coastguard Worker #endif  // BASE_FUNCTIONAL_CONCURRENT_CALLBACKS_H_
128