1*6777b538SAndroid Build Coastguard Worker // Copyright 2024 The Chromium Authors 2*6777b538SAndroid Build Coastguard Worker // Use of this source code is governed by a BSD-style license that can be 3*6777b538SAndroid Build Coastguard Worker // found in the LICENSE file. 4*6777b538SAndroid Build Coastguard Worker 5*6777b538SAndroid Build Coastguard Worker #ifndef BASE_FUNCTIONAL_CONCURRENT_CALLBACKS_H_ 6*6777b538SAndroid Build Coastguard Worker #define BASE_FUNCTIONAL_CONCURRENT_CALLBACKS_H_ 7*6777b538SAndroid Build Coastguard Worker 8*6777b538SAndroid Build Coastguard Worker #include <memory> 9*6777b538SAndroid Build Coastguard Worker #include <type_traits> 10*6777b538SAndroid Build Coastguard Worker #include <vector> 11*6777b538SAndroid Build Coastguard Worker 12*6777b538SAndroid Build Coastguard Worker #include "base/functional/bind.h" 13*6777b538SAndroid Build Coastguard Worker #include "base/functional/callback.h" 14*6777b538SAndroid Build Coastguard Worker #include "base/location.h" 15*6777b538SAndroid Build Coastguard Worker #include "base/memory/raw_ptr.h" 16*6777b538SAndroid Build Coastguard Worker #include "base/task/bind_post_task.h" 17*6777b538SAndroid Build Coastguard Worker #include "base/task/sequenced_task_runner.h" 18*6777b538SAndroid Build Coastguard Worker 19*6777b538SAndroid Build Coastguard Worker // OVERVIEW: 20*6777b538SAndroid Build Coastguard Worker // 21*6777b538SAndroid Build Coastguard Worker // ConcurrentCallbacks<T> is an alternative to BarrierCallback<T>, it dispenses 22*6777b538SAndroid Build Coastguard Worker // OnceCallbacks via CreateCallback() and invokes the callback passed to Done() 23*6777b538SAndroid Build Coastguard Worker // after all prior callbacks have been run. 24*6777b538SAndroid Build Coastguard Worker // 25*6777b538SAndroid Build Coastguard Worker // ConcurrentCallbacks<T> is intended to be used over BarrierCallback<T> in 26*6777b538SAndroid Build Coastguard Worker // cases where the count is unknown prior to requiring a callback to start a 27*6777b538SAndroid Build Coastguard Worker // task, and for cases where the count is manually derived from the code and 28*6777b538SAndroid Build Coastguard Worker // subject to human error. 29*6777b538SAndroid Build Coastguard Worker // 30*6777b538SAndroid Build Coastguard Worker // IMPORTANT NOTES: 31*6777b538SAndroid Build Coastguard Worker // 32*6777b538SAndroid Build Coastguard Worker // - ConcurrentCallbacks<T> is NOT thread safe. 33*6777b538SAndroid Build Coastguard Worker // - The done callback will NOT be run synchronously, it will be PostTask() to 34*6777b538SAndroid Build Coastguard Worker // the sequence that Done() was invoked on. 35*6777b538SAndroid Build Coastguard Worker // - ConcurrentCallbacks<T> cannot be used after Done() is called, a CHECK 36*6777b538SAndroid Build Coastguard Worker // verifies this. 37*6777b538SAndroid Build Coastguard Worker // 38*6777b538SAndroid Build Coastguard Worker // TYPICAL USAGE: 39*6777b538SAndroid Build Coastguard Worker // 40*6777b538SAndroid Build Coastguard Worker // class Example { 41*6777b538SAndroid Build Coastguard Worker // void OnRequestsReceived(std::vector<Request> requests) { 42*6777b538SAndroid Build Coastguard Worker // base::ConcurrentCallbacks<Result> concurrent; 43*6777b538SAndroid Build Coastguard Worker // 44*6777b538SAndroid Build Coastguard Worker // for (Request& request : requests) { 45*6777b538SAndroid Build Coastguard Worker // if (IsValidRequest(request)) { 46*6777b538SAndroid Build Coastguard Worker // StartRequest(std::move(request), concurrent.CreateCallback()); 47*6777b538SAndroid Build Coastguard Worker // } 48*6777b538SAndroid Build Coastguard Worker // } 49*6777b538SAndroid Build Coastguard Worker // 50*6777b538SAndroid Build Coastguard Worker // std::move(concurrent).Done( 51*6777b538SAndroid Build Coastguard Worker // base::BindOnce(&Example::OnRequestsComplete, GetWeakPtr())); 52*6777b538SAndroid Build Coastguard Worker // } 53*6777b538SAndroid Build Coastguard Worker // 54*6777b538SAndroid Build Coastguard Worker // void StartRequest(Request request, 55*6777b538SAndroid Build Coastguard Worker // base::OnceCallback<void(Result)> callback) { 56*6777b538SAndroid Build Coastguard Worker // // Process the request asynchronously and call callback with a Result. 57*6777b538SAndroid Build Coastguard Worker // } 58*6777b538SAndroid Build Coastguard Worker // 59*6777b538SAndroid Build Coastguard Worker // void OnRequestsComplete(std::vector<Result> results) { 60*6777b538SAndroid Build Coastguard Worker // // Invoked after all requests are completed and receives the results of 61*6777b538SAndroid Build Coastguard Worker // // all of them. 62*6777b538SAndroid Build Coastguard Worker // } 63*6777b538SAndroid Build Coastguard Worker // }; 64*6777b538SAndroid Build Coastguard Worker 65*6777b538SAndroid Build Coastguard Worker namespace base { 66*6777b538SAndroid Build Coastguard Worker 67*6777b538SAndroid Build Coastguard Worker template <typename T> 68*6777b538SAndroid Build Coastguard Worker class ConcurrentCallbacks { 69*6777b538SAndroid Build Coastguard Worker public: 70*6777b538SAndroid Build Coastguard Worker using Results = std::vector<std::remove_cvref_t<T>>; 71*6777b538SAndroid Build Coastguard Worker ConcurrentCallbacks()72*6777b538SAndroid Build Coastguard Worker ConcurrentCallbacks() { 73*6777b538SAndroid Build Coastguard Worker auto info_owner = std::make_unique<Info>(); 74*6777b538SAndroid Build Coastguard Worker info_ = info_owner.get(); 75*6777b538SAndroid Build Coastguard Worker info_run_callback_ = BindRepeating(&Info::Run, std::move(info_owner)); 76*6777b538SAndroid Build Coastguard Worker } 77*6777b538SAndroid Build Coastguard Worker 78*6777b538SAndroid Build Coastguard Worker // Create a callback for the done callback to wait for. CreateCallback()79*6777b538SAndroid Build Coastguard Worker [[nodiscard]] OnceCallback<void(T)> CreateCallback() { 80*6777b538SAndroid Build Coastguard Worker CHECK(info_); 81*6777b538SAndroid Build Coastguard Worker ++info_->pending_; 82*6777b538SAndroid Build Coastguard Worker return info_run_callback_; 83*6777b538SAndroid Build Coastguard Worker } 84*6777b538SAndroid Build Coastguard Worker 85*6777b538SAndroid Build Coastguard Worker // Finish creating concurrent callbacks and provide done callback to run once 86*6777b538SAndroid Build Coastguard Worker // all prior callbacks have executed. 87*6777b538SAndroid Build Coastguard Worker // `this` is no longer usable after calling Done(), must be called with 88*6777b538SAndroid Build Coastguard Worker // std::move(). 89*6777b538SAndroid Build Coastguard Worker void Done(OnceCallback<void(Results)> done_callback, 90*6777b538SAndroid Build Coastguard Worker const Location& location = FROM_HERE) && { 91*6777b538SAndroid Build Coastguard Worker CHECK(info_); 92*6777b538SAndroid Build Coastguard Worker info_->done_callback_ = 93*6777b538SAndroid Build Coastguard Worker BindPostTask(SequencedTaskRunner::GetCurrentDefault(), 94*6777b538SAndroid Build Coastguard Worker std::move(done_callback), location); 95*6777b538SAndroid Build Coastguard Worker if (info_->pending_ == 0u) { 96*6777b538SAndroid Build Coastguard Worker std::move(info_->done_callback_).Run(std::move(info_->results_)); 97*6777b538SAndroid Build Coastguard Worker } 98*6777b538SAndroid Build Coastguard Worker info_ = nullptr; 99*6777b538SAndroid Build Coastguard Worker } 100*6777b538SAndroid Build Coastguard Worker 101*6777b538SAndroid Build Coastguard Worker private: 102*6777b538SAndroid Build Coastguard Worker class Info { 103*6777b538SAndroid Build Coastguard Worker public: 104*6777b538SAndroid Build Coastguard Worker Info() = default; 105*6777b538SAndroid Build Coastguard Worker Run(T value)106*6777b538SAndroid Build Coastguard Worker void Run(T value) { 107*6777b538SAndroid Build Coastguard Worker CHECK_GT(pending_, 0u); 108*6777b538SAndroid Build Coastguard Worker --pending_; 109*6777b538SAndroid Build Coastguard Worker results_.push_back(std::move(value)); 110*6777b538SAndroid Build Coastguard Worker if (done_callback_ && pending_ == 0u) { 111*6777b538SAndroid Build Coastguard Worker std::move(done_callback_).Run(std::move(results_)); 112*6777b538SAndroid Build Coastguard Worker } 113*6777b538SAndroid Build Coastguard Worker } 114*6777b538SAndroid Build Coastguard Worker 115*6777b538SAndroid Build Coastguard Worker size_t pending_ = 0u; 116*6777b538SAndroid Build Coastguard Worker Results results_; 117*6777b538SAndroid Build Coastguard Worker OnceCallback<void(Results)> done_callback_; 118*6777b538SAndroid Build Coastguard Worker }; 119*6777b538SAndroid Build Coastguard Worker 120*6777b538SAndroid Build Coastguard Worker RepeatingCallback<void(T)> info_run_callback_; 121*6777b538SAndroid Build Coastguard Worker // info_ is owned by info_run_callback_. 122*6777b538SAndroid Build Coastguard Worker raw_ptr<Info> info_; 123*6777b538SAndroid Build Coastguard Worker }; 124*6777b538SAndroid Build Coastguard Worker 125*6777b538SAndroid Build Coastguard Worker } // namespace base 126*6777b538SAndroid Build Coastguard Worker 127*6777b538SAndroid Build Coastguard Worker #endif // BASE_FUNCTIONAL_CONCURRENT_CALLBACKS_H_ 128