// Copyright 2024 The Chromium Authors // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #ifndef BASE_FUNCTIONAL_CONCURRENT_CALLBACKS_H_ #define BASE_FUNCTIONAL_CONCURRENT_CALLBACKS_H_ #include #include #include #include "base/functional/bind.h" #include "base/functional/callback.h" #include "base/location.h" #include "base/memory/raw_ptr.h" #include "base/task/bind_post_task.h" #include "base/task/sequenced_task_runner.h" // OVERVIEW: // // ConcurrentCallbacks is an alternative to BarrierCallback, it dispenses // OnceCallbacks via CreateCallback() and invokes the callback passed to Done() // after all prior callbacks have been run. // // ConcurrentCallbacks is intended to be used over BarrierCallback in // cases where the count is unknown prior to requiring a callback to start a // task, and for cases where the count is manually derived from the code and // subject to human error. // // IMPORTANT NOTES: // // - ConcurrentCallbacks is NOT thread safe. // - The done callback will NOT be run synchronously, it will be PostTask() to // the sequence that Done() was invoked on. // - ConcurrentCallbacks cannot be used after Done() is called, a CHECK // verifies this. // // TYPICAL USAGE: // // class Example { // void OnRequestsReceived(std::vector requests) { // base::ConcurrentCallbacks concurrent; // // for (Request& request : requests) { // if (IsValidRequest(request)) { // StartRequest(std::move(request), concurrent.CreateCallback()); // } // } // // std::move(concurrent).Done( // base::BindOnce(&Example::OnRequestsComplete, GetWeakPtr())); // } // // void StartRequest(Request request, // base::OnceCallback callback) { // // Process the request asynchronously and call callback with a Result. // } // // void OnRequestsComplete(std::vector results) { // // Invoked after all requests are completed and receives the results of // // all of them. // } // }; namespace base { template class ConcurrentCallbacks { public: using Results = std::vector>; ConcurrentCallbacks() { auto info_owner = std::make_unique(); info_ = info_owner.get(); info_run_callback_ = BindRepeating(&Info::Run, std::move(info_owner)); } // Create a callback for the done callback to wait for. [[nodiscard]] OnceCallback CreateCallback() { CHECK(info_); ++info_->pending_; return info_run_callback_; } // Finish creating concurrent callbacks and provide done callback to run once // all prior callbacks have executed. // `this` is no longer usable after calling Done(), must be called with // std::move(). void Done(OnceCallback done_callback, const Location& location = FROM_HERE) && { CHECK(info_); info_->done_callback_ = BindPostTask(SequencedTaskRunner::GetCurrentDefault(), std::move(done_callback), location); if (info_->pending_ == 0u) { std::move(info_->done_callback_).Run(std::move(info_->results_)); } info_ = nullptr; } private: class Info { public: Info() = default; void Run(T value) { CHECK_GT(pending_, 0u); --pending_; results_.push_back(std::move(value)); if (done_callback_ && pending_ == 0u) { std::move(done_callback_).Run(std::move(results_)); } } size_t pending_ = 0u; Results results_; OnceCallback done_callback_; }; RepeatingCallback info_run_callback_; // info_ is owned by info_run_callback_. raw_ptr info_; }; } // namespace base #endif // BASE_FUNCTIONAL_CONCURRENT_CALLBACKS_H_