1 /*
2 * Copyright 2019 Google LLC
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 /**
18 * This file provides a pair of types Future<T> (a value to wait for) and
19 * Promise<T> (allows providing the value for an associated future).
20 *
21 * These serve the same purpose as std::future and std::promise, but with a few
22 * differences:
23 * - They do not represent exceptions (i.e. std::promise::set_exception).
24 * Consider representing failure conditions with StatusOr or std::variant
25 * - They do not throw future-related exceptions (e.g. std::future::get throws
26 * if the promise was 'abandoned'; this one indicates that with a value).
27 * - There is no integration with std::async etc.
28 * - They use absl::Duration / absl::Time for waiting with a timeout.
29 * - They are created as a pair (vs. std::promise::get_future(), which throws
30 * an exception if called twice).
31 * - Setting (promise) and taking (future) require rvalues (you might need to
32 * use std::move). This is to indicate that these are 'consuming' operations
33 * (to humans and static analysis tools).
34 */
35
36 #ifndef FCP_BASE_FUTURE_H_
37 #define FCP_BASE_FUTURE_H_
38
39 #include <memory>
40 #include <optional>
41 #include <tuple>
42 #include <variant>
43
44 #include "absl/base/macros.h"
45 #include "absl/synchronization/notification.h"
46 #include "fcp/base/meta.h"
47 #include "fcp/base/monitoring.h"
48 #include "fcp/base/move_to_lambda.h"
49 #include "fcp/base/scheduler.h"
50 #include "fcp/base/unique_value.h"
51
52 namespace fcp {
53
54 // Since fcp::Promise is already defined by the reactive streams library
55 // (fcp/reactive/), we'll define fcp::thread::{Promise, Future}.
56 namespace thread {
57
58 // Forward declarations; see doc comments below
59 template <typename T>
60 class Future;
61 template <typename T>
62 class Promise;
63
64 template <typename T>
65 struct FuturePair {
66 Promise<T> promise;
67 Future<T> future;
68 };
69
70 namespace future_internal {
71 // We want Promise and Future to be created only as a pair, with MakeFuture.
72 // This type is given permission to construct them.
73 struct Maker {
74 template <typename T>
75 static FuturePair<T> Make();
76 };
77
78 // Common state of a Promise / Future pair. Destructed when *both* the promise
79 // and future are gone.
80 //
81 // States: NotSet, Set, Taken
82 // Transitions:
83 // NotSet -> Set: When a value is provided (std::nullopt indicates an
84 // abandoned promise). *Before* ready_ is signalled.
85 // Set -> Taken: When a future takes a value. *After* ready_ is signalled.
86 template <typename T>
87 class FutureState {
88 public:
89 bool Wait(absl::Duration timeout) const;
90 std::optional<T> Take();
91 void Set(std::optional<T> val);
92
93 private:
94 enum class State { kNotSet, kSet, kTaken };
95
96 absl::Notification ready_;
97 State state_ = State::kNotSet;
98 std::optional<T> value_;
99 };
100
101 // A Future and Promise share a single FutureState. That is, FutureState
102 // is ref-counted, with two initial refs (no additional refs can be created,
103 // since Future and Promise are move-only). So, we define FutureStateRef as a
104 // move-only std::shared_ptr.
105 template <typename T>
106 using FutureStateRef = UniqueValue<std::shared_ptr<FutureState<T>>>;
107 } // namespace future_internal
108
109 /**
110 * Allows waiting for and retrieving a value (provided eventually by a paired
111 * Promise).
112 *
113 * If the paired Promise is 'abandoned' (destructed without having a value set),
114 * then the Future's value is std::nullopt.
115 */
116 template <typename T>
117 class Future {
118 public:
119 Future(Future&&) = default;
120 Future& operator=(Future&&) = default;
121
122 /**
123 * Retrieves the future value, waiting until it is available.
124 * Taking from a future *consumes* it, and so requires an rvalue. To take
125 * from a Future<T> f:
126 * std::move(f).Take()
127 *
128 * If the paired promise is 'abandoned' (destructed before a real value is
129 * provided), the value is std::nullopt.
130 */
131 ABSL_MUST_USE_RESULT
Take()132 std::optional<T> Take() && {
133 future_internal::FutureStateRef<T> state = std::move(state_);
134 FCP_CHECK(state.has_value());
135 return (*state)->Take();
136 }
137
138 /**
139 * Waits for the value to become available, with a timeout. Unlike Take(),
140 * this does *not* consume the value.
141 *
142 * Returns a bool indicating if the value is available (if so, Take() will
143 * return immediately).
144 */
145 ABSL_MUST_USE_RESULT
Wait(absl::Duration timeout)146 bool Wait(absl::Duration timeout) const {
147 FCP_CHECK(state_.has_value());
148 return (*state_)->Wait(timeout);
149 }
150
151 private:
152 friend struct future_internal::Maker;
153
Future(future_internal::FutureStateRef<T> state)154 explicit Future(future_internal::FutureStateRef<T> state)
155 : state_(std::move(state)) {}
156
157 future_internal::FutureStateRef<T> state_;
158 };
159
160 /**
161 * Allows providing a value to satisfy a paired Future.
162 *
163 * If this Promise is 'abandoned' (destructed without having a value set),
164 * then the Future gets the value std::nullopt.
165 */
166 template <typename T>
167 class Promise {
168 public:
169 Promise(Promise&&) = default;
170 Promise& operator=(Promise&&) = default;
171
~Promise()172 ~Promise() {
173 if (state_.has_value()) {
174 // Abandoned
175 (*state_)->Set(std::nullopt);
176 }
177 }
178
179 /**
180 * Provides a value to the paired Future. Setting a promise *consumes* it,
181 * and so requires an rvalue. To set a Promise<T> p:
182 * std::move(p).Set(...)
183 */
Set(T value)184 void Set(T value) && {
185 future_internal::FutureStateRef<T> state = std::move(state_);
186 FCP_CHECK(state.has_value());
187 (*state)->Set(std::move(value));
188 }
189
190 private:
191 friend struct future_internal::Maker;
192
Promise(future_internal::FutureStateRef<T> state)193 explicit Promise(future_internal::FutureStateRef<T> state)
194 : state_(std::move(state)) {}
195
196 future_internal::FutureStateRef<T> state_;
197 };
198
199 /** Creates a paired Future and Promise. */
200 template <typename T>
MakeFuture()201 FuturePair<T> MakeFuture() {
202 return future_internal::Maker::Make<T>();
203 }
204
205 /**
206 * Schedules a task which calls a function computing a value. Returns a future
207 * to wait for and access the value once it is computed.
208 */
209 template <typename T>
ScheduleFuture(Scheduler * scheduler,std::function<T ()> func)210 Future<T> ScheduleFuture(Scheduler* scheduler, std::function<T()> func) {
211 thread::FuturePair<T> p = thread::MakeFuture<T>();
212 MoveToLambdaWrapper<thread::Promise<T>> promise_capture =
213 MoveToLambda(std::move(p.promise));
214 // Lambda is stateful (since the promise is consumed). This is okay, since
215 // it should only be called once.
216 scheduler->Schedule([promise_capture, func]() mutable {
217 std::move(*promise_capture).Set(func());
218 });
219
220 return std::move(p.future);
221 }
222
223 namespace future_internal {
224
225 template <typename T>
Make()226 FuturePair<T> Maker::Make() {
227 std::shared_ptr<FutureState<T>> state = std::make_shared<FutureState<T>>();
228
229 auto promise_ref = FutureStateRef<T>(state);
230 // Note that we use std::move this time, to avoid ref-count churn.
231 auto future_ref = FutureStateRef<T>(std::move(state));
232 return {Promise<T>(std::move(promise_ref)), Future<T>(std::move(future_ref))};
233 }
234
235 template <typename T>
Wait(absl::Duration timeout)236 bool FutureState<T>::Wait(absl::Duration timeout) const {
237 return ready_.WaitForNotificationWithTimeout(timeout);
238 }
239
240 template <typename T>
Set(std::optional<T> val)241 void FutureState<T>::Set(std::optional<T> val) {
242 FCP_CHECK(!ready_.HasBeenNotified())
243 << "Attempted to set a FutureState which has already been notified";
244 // Not notified => value_ has *not* been set, and the Promise has exclusive
245 // access (no atomics or locks needed below).
246 switch (state_) {
247 case State::kNotSet:
248 state_ = State::kSet;
249 value_ = std::move(val);
250 // This has release semantics; stores to state_ and value_ will be visible
251 // to whomever sees that the notification.
252 ready_.Notify();
253 return;
254 case State::kSet:
255 FCP_CHECK(false) << "FutureState has been notified, so state_ should be "
256 "kTaken or kSet";
257 abort(); // Compiler thinks FCP_CHECK(false) might return
258 case State::kTaken:
259 FCP_CHECK(false) << "FutureState has already been taken from";
260 abort(); // Compiler thinks FCP_CHECK(false) might return
261 }
262 }
263
264 template <typename T>
Take()265 std::optional<T> FutureState<T>::Take() {
266 ready_.WaitForNotification();
267 // Notified => value_ has been set, and exclusive access has been transferred
268 // from the Promise to the Future (no atomics or locks needed below).
269 switch (state_) {
270 case State::kSet:
271 state_ = State::kTaken;
272 // value_.has_value() will still be set, but we won't read it again
273 // in the kTaken state.
274 return std::move(value_);
275 case State::kNotSet:
276 FCP_CHECK(false) << "FutureState has been notified, so state_ should be "
277 "kTaken or kSet";
278 abort(); // Compiler thinks FCP_CHECK(false) might return
279 case State::kTaken:
280 FCP_CHECK(false) << "FutureState has already been taken from";
281 abort(); // Compiler thinks FCP_CHECK(false) might return
282 }
283 }
284
285 } // namespace future_internal
286
287 } // namespace thread
288 } // namespace fcp
289
290 #endif // FCP_BASE_FUTURE_H_
291