xref: /aosp_15_r20/external/federated-compute/fcp/base/future.h (revision 14675a029014e728ec732f129a32e299b2da0601)
1 /*
2  * Copyright 2019 Google LLC
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /**
18  * This file provides a pair of types Future<T> (a value to wait for) and
19  * Promise<T> (allows providing the value for an associated future).
20  *
21  * These serve the same purpose as std::future and std::promise, but with a few
22  * differences:
23  *  - They do not represent exceptions (i.e. std::promise::set_exception).
24  *    Consider representing failure conditions with StatusOr or std::variant
25  *  - They do not throw future-related exceptions (e.g. std::future::get throws
26  *    if the promise was 'abandoned'; this one indicates that with a value).
27  *  - There is no integration with std::async etc.
28  *  - They use absl::Duration / absl::Time for waiting with a timeout.
29  *  - They are created as a pair (vs. std::promise::get_future(), which throws
30  *    an exception if called twice).
31  *  - Setting (promise) and taking (future) require rvalues (you might need to
32  *    use std::move). This is to indicate that these are 'consuming' operations
33  *    (to humans and static analysis tools).
34  */
35 
36 #ifndef FCP_BASE_FUTURE_H_
37 #define FCP_BASE_FUTURE_H_
38 
39 #include <memory>
40 #include <optional>
41 #include <tuple>
42 #include <variant>
43 
44 #include "absl/base/macros.h"
45 #include "absl/synchronization/notification.h"
46 #include "fcp/base/meta.h"
47 #include "fcp/base/monitoring.h"
48 #include "fcp/base/move_to_lambda.h"
49 #include "fcp/base/scheduler.h"
50 #include "fcp/base/unique_value.h"
51 
52 namespace fcp {
53 
54 // Since fcp::Promise is already defined by the reactive streams library
55 // (fcp/reactive/), we'll define fcp::thread::{Promise, Future}.
56 namespace thread {
57 
58 // Forward declarations; see doc comments below
59 template <typename T>
60 class Future;
61 template <typename T>
62 class Promise;
63 
64 template <typename T>
65 struct FuturePair {
66   Promise<T> promise;
67   Future<T> future;
68 };
69 
70 namespace future_internal {
71 // We want Promise and Future to be created only as a pair, with MakeFuture.
72 // This type is given permission to construct them.
73 struct Maker {
74   template <typename T>
75   static FuturePair<T> Make();
76 };
77 
78 // Common state of a Promise / Future pair. Destructed when *both* the promise
79 // and future are gone.
80 //
81 // States: NotSet, Set, Taken
82 // Transitions:
83 //   NotSet -> Set: When a value is provided (std::nullopt indicates an
84 //                  abandoned promise). *Before* ready_ is signalled.
85 //   Set -> Taken: When a future takes a value. *After* ready_ is signalled.
86 template <typename T>
87 class FutureState {
88  public:
89   bool Wait(absl::Duration timeout) const;
90   std::optional<T> Take();
91   void Set(std::optional<T> val);
92 
93  private:
94   enum class State { kNotSet, kSet, kTaken };
95 
96   absl::Notification ready_;
97   State state_ = State::kNotSet;
98   std::optional<T> value_;
99 };
100 
101 // A Future and Promise share a single FutureState. That is, FutureState
102 // is ref-counted, with two initial refs (no additional refs can be created,
103 // since Future and Promise are move-only). So, we define FutureStateRef as a
104 // move-only std::shared_ptr.
105 template <typename T>
106 using FutureStateRef = UniqueValue<std::shared_ptr<FutureState<T>>>;
107 }  // namespace future_internal
108 
109 /**
110  * Allows waiting for and retrieving a value (provided eventually by a paired
111  * Promise).
112  *
113  * If the paired Promise is 'abandoned' (destructed without having a value set),
114  * then the Future's value is std::nullopt.
115  */
116 template <typename T>
117 class Future {
118  public:
119   Future(Future&&) = default;
120   Future& operator=(Future&&) = default;
121 
122   /**
123    * Retrieves the future value, waiting until it is available.
124    * Taking from a future *consumes* it, and so requires an rvalue. To take
125    * from a Future<T> f:
126    *   std::move(f).Take()
127    *
128    * If the paired promise is 'abandoned' (destructed before a real value is
129    * provided), the value is std::nullopt.
130    */
131   ABSL_MUST_USE_RESULT
Take()132   std::optional<T> Take() && {
133     future_internal::FutureStateRef<T> state = std::move(state_);
134     FCP_CHECK(state.has_value());
135     return (*state)->Take();
136   }
137 
138   /**
139    * Waits for the value to become available, with a timeout. Unlike Take(),
140    * this does *not* consume the value.
141    *
142    * Returns a bool indicating if the value is available (if so, Take() will
143    * return immediately).
144    */
145   ABSL_MUST_USE_RESULT
Wait(absl::Duration timeout)146   bool Wait(absl::Duration timeout) const {
147     FCP_CHECK(state_.has_value());
148     return (*state_)->Wait(timeout);
149   }
150 
151  private:
152   friend struct future_internal::Maker;
153 
Future(future_internal::FutureStateRef<T> state)154   explicit Future(future_internal::FutureStateRef<T> state)
155       : state_(std::move(state)) {}
156 
157   future_internal::FutureStateRef<T> state_;
158 };
159 
160 /**
161  * Allows providing a value to satisfy a paired Future.
162  *
163  * If this Promise is 'abandoned' (destructed without having a value set),
164  * then the Future gets the value std::nullopt.
165  */
166 template <typename T>
167 class Promise {
168  public:
169   Promise(Promise&&) = default;
170   Promise& operator=(Promise&&) = default;
171 
~Promise()172   ~Promise() {
173     if (state_.has_value()) {
174       // Abandoned
175       (*state_)->Set(std::nullopt);
176     }
177   }
178 
179   /**
180    * Provides a value to the paired Future. Setting a promise *consumes* it,
181    * and so requires an rvalue. To set a Promise<T> p:
182    *   std::move(p).Set(...)
183    */
Set(T value)184   void Set(T value) && {
185     future_internal::FutureStateRef<T> state = std::move(state_);
186     FCP_CHECK(state.has_value());
187     (*state)->Set(std::move(value));
188   }
189 
190  private:
191   friend struct future_internal::Maker;
192 
Promise(future_internal::FutureStateRef<T> state)193   explicit Promise(future_internal::FutureStateRef<T> state)
194       : state_(std::move(state)) {}
195 
196   future_internal::FutureStateRef<T> state_;
197 };
198 
199 /** Creates a paired Future and Promise. */
200 template <typename T>
MakeFuture()201 FuturePair<T> MakeFuture() {
202   return future_internal::Maker::Make<T>();
203 }
204 
205 /**
206  * Schedules a task which calls a function computing a value. Returns a future
207  * to wait for and access the value once it is computed.
208  */
209 template <typename T>
ScheduleFuture(Scheduler * scheduler,std::function<T ()> func)210 Future<T> ScheduleFuture(Scheduler* scheduler, std::function<T()> func) {
211   thread::FuturePair<T> p = thread::MakeFuture<T>();
212   MoveToLambdaWrapper<thread::Promise<T>> promise_capture =
213       MoveToLambda(std::move(p.promise));
214   // Lambda is stateful (since the promise is consumed). This is okay, since
215   // it should only be called once.
216   scheduler->Schedule([promise_capture, func]() mutable {
217     std::move(*promise_capture).Set(func());
218   });
219 
220   return std::move(p.future);
221 }
222 
223 namespace future_internal {
224 
225 template <typename T>
Make()226 FuturePair<T> Maker::Make() {
227   std::shared_ptr<FutureState<T>> state = std::make_shared<FutureState<T>>();
228 
229   auto promise_ref = FutureStateRef<T>(state);
230   // Note that we use std::move this time, to avoid ref-count churn.
231   auto future_ref = FutureStateRef<T>(std::move(state));
232   return {Promise<T>(std::move(promise_ref)), Future<T>(std::move(future_ref))};
233 }
234 
235 template <typename T>
Wait(absl::Duration timeout)236 bool FutureState<T>::Wait(absl::Duration timeout) const {
237   return ready_.WaitForNotificationWithTimeout(timeout);
238 }
239 
240 template <typename T>
Set(std::optional<T> val)241 void FutureState<T>::Set(std::optional<T> val) {
242   FCP_CHECK(!ready_.HasBeenNotified())
243       << "Attempted to set a FutureState which has already been notified";
244   // Not notified => value_ has *not* been set, and the Promise has exclusive
245   // access (no atomics or locks needed below).
246   switch (state_) {
247     case State::kNotSet:
248       state_ = State::kSet;
249       value_ = std::move(val);
250       // This has release semantics; stores to state_ and value_ will be visible
251       // to whomever sees that the notification.
252       ready_.Notify();
253       return;
254     case State::kSet:
255       FCP_CHECK(false) << "FutureState has been notified, so state_ should be "
256                           "kTaken or kSet";
257       abort();  // Compiler thinks FCP_CHECK(false) might return
258     case State::kTaken:
259       FCP_CHECK(false) << "FutureState has already been taken from";
260       abort();  // Compiler thinks FCP_CHECK(false) might return
261   }
262 }
263 
264 template <typename T>
Take()265 std::optional<T> FutureState<T>::Take() {
266   ready_.WaitForNotification();
267   // Notified => value_ has been set, and exclusive access has been transferred
268   // from the Promise to the Future (no atomics or locks needed below).
269   switch (state_) {
270     case State::kSet:
271       state_ = State::kTaken;
272       // value_.has_value() will still be set, but we won't read it again
273       // in the kTaken state.
274       return std::move(value_);
275     case State::kNotSet:
276       FCP_CHECK(false) << "FutureState has been notified, so state_ should be "
277                           "kTaken or kSet";
278       abort();  // Compiler thinks FCP_CHECK(false) might return
279     case State::kTaken:
280       FCP_CHECK(false) << "FutureState has already been taken from";
281       abort();  // Compiler thinks FCP_CHECK(false) might return
282   }
283 }
284 
285 }  // namespace future_internal
286 
287 }  // namespace thread
288 }  // namespace fcp
289 
290 #endif  // FCP_BASE_FUTURE_H_
291