1 #![doc(html_root_url = "https://docs.rs/want/0.3.1")]
2 #![deny(warnings)]
3 #![deny(missing_docs)]
4 #![deny(missing_debug_implementations)]
5 
6 //! A Futures channel-like utility to signal when a value is wanted.
7 //!
8 //! Futures are supposed to be lazy, and only starting work if `Future::poll`
9 //! is called. The same is true of `Stream`s, but when using a channel as
10 //! a `Stream`, it can be hard to know if the receiver is ready for the next
11 //! value.
12 //!
13 //! Put another way, given a `(tx, rx)` from `futures::sync::mpsc::channel()`,
14 //! how can the sender (`tx`) know when the receiver (`rx`) actually wants more
15 //! work to be produced? Just because there is room in the channel buffer
16 //! doesn't mean the work would be used by the receiver.
17 //!
18 //! This is where something like `want` comes in. Added to a channel, you can
19 //! make sure that the `tx` only creates the message and sends it when the `rx`
20 //! has `poll()` for it, and the buffer was empty.
21 //!
22 //! # Example
23 //!
24 //! ```nightly
25 //! # //#![feature(async_await)]
26 //! extern crate want;
27 //!
28 //! # fn spawn<T>(_t: T) {}
29 //! # fn we_still_want_message() -> bool { true }
30 //! # fn mpsc_channel() -> (Tx, Rx) { (Tx, Rx) }
31 //! # struct Tx;
32 //! # impl Tx { fn send<T>(&mut self, _: T) {} }
33 //! # struct Rx;
34 //! # impl Rx { async fn recv(&mut self) -> Option<Expensive> { Some(Expensive) } }
35 //!
36 //! // Some message that is expensive to produce.
37 //! struct Expensive;
38 //!
39 //! // Some futures-aware MPSC channel...
40 //! let (mut tx, mut rx) = mpsc_channel();
41 //!
42 //! // And our `want` channel!
43 //! let (mut gv, mut tk) = want::new();
44 //!
45 //!
46 //! // Our receiving task...
47 //! spawn(async move {
48 //!     // Maybe something comes up that prevents us from ever
49 //!     // using the expensive message.
50 //!     //
51 //!     // Without `want`, the "send" task may have started to
52 //!     // produce the expensive message even though we wouldn't
53 //!     // be able to use it.
54 //!     if !we_still_want_message() {
55 //!         return;
56 //!     }
57 //!
58 //!     // But we can use it! So tell the `want` channel.
59 //!     tk.want();
60 //!
61 //!     match rx.recv().await {
62 //!         Some(_msg) => println!("got a message"),
63 //!         None => println!("DONE"),
64 //!     }
65 //! });
66 //!
67 //! // Our sending task
68 //! spawn(async move {
69 //!     // It's expensive to create a new message, so we wait until the
70 //!     // receiving end truly *wants* the message.
71 //!     if let Err(_closed) = gv.want().await {
72 //!         // Looks like they will never want it...
73 //!         return;
74 //!     }
75 //!
76 //!     // They want it, let's go!
77 //!     tx.send(Expensive);
78 //! });
79 //!
80 //! # fn main() {}
81 //! ```
82 
83 use std::fmt;
84 use std::future::Future;
85 use std::mem;
86 use std::pin::Pin;
87 use std::sync::Arc;
88 use std::sync::atomic::AtomicUsize;
89 // SeqCst is the only ordering used to ensure accessing the state and
90 // TryLock are never re-ordered.
91 use std::sync::atomic::Ordering::SeqCst;
92 use std::task::{self, Poll, Waker};
93 
94 
95 use try_lock::TryLock;
96 
97 /// Create a new `want` channel.
new() -> (Giver, Taker)98 pub fn new() -> (Giver, Taker) {
99     let inner = Arc::new(Inner {
100         state: AtomicUsize::new(State::Idle.into()),
101         task: TryLock::new(None),
102     });
103     let inner2 = inner.clone();
104     (
105         Giver {
106             inner,
107         },
108         Taker {
109             inner: inner2,
110         },
111     )
112 }
113 
114 /// An entity that gives a value when wanted.
115 pub struct Giver {
116     inner: Arc<Inner>,
117 }
118 
119 /// An entity that wants a value.
120 pub struct Taker {
121     inner: Arc<Inner>,
122 }
123 
124 /// A cloneable `Giver`.
125 ///
126 /// It differs from `Giver` in that you cannot poll for `want`. It's only
127 /// usable as a cancellation watcher.
128 #[derive(Clone)]
129 pub struct SharedGiver {
130     inner: Arc<Inner>,
131 }
132 
133 /// The `Taker` has canceled its interest in a value.
134 pub struct Closed {
135     _inner: (),
136 }
137 
138 #[derive(Clone, Copy, Debug)]
139 enum State {
140     Idle,
141     Want,
142     Give,
143     Closed,
144 }
145 
146 impl From<State> for usize {
from(s: State) -> usize147     fn from(s: State) -> usize {
148         match s {
149             State::Idle => 0,
150             State::Want => 1,
151             State::Give => 2,
152             State::Closed => 3,
153         }
154     }
155 }
156 
157 impl From<usize> for State {
from(num: usize) -> State158     fn from(num: usize) -> State {
159         match num {
160             0 => State::Idle,
161             1 => State::Want,
162             2 => State::Give,
163             3 => State::Closed,
164             _ => unreachable!("unknown state: {}", num),
165         }
166     }
167 }
168 
169 struct Inner {
170     state: AtomicUsize,
171     task: TryLock<Option<Waker>>,
172 }
173 
174 // ===== impl Giver ======
175 
176 impl Giver {
177     /// Returns a `Future` that fulfills when the `Taker` has done some action.
want(&mut self) -> impl Future<Output = Result<(), Closed>> + '_178     pub fn want(&mut self) -> impl Future<Output = Result<(), Closed>> + '_ {
179         Want(self)
180     }
181 
182     /// Poll whether the `Taker` has registered interest in another value.
183     ///
184     /// - If the `Taker` has called `want()`, this returns `Async::Ready(())`.
185     /// - If the `Taker` has not called `want()` since last poll, this
186     ///   returns `Async::NotReady`, and parks the current task to be notified
187     ///   when the `Taker` does call `want()`.
188     /// - If the `Taker` has canceled (or dropped), this returns `Closed`.
189     ///
190     /// After knowing that the Taker is wanting, the state can be reset by
191     /// calling [`give`](Giver::give).
poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Closed>>192     pub fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Closed>> {
193         loop {
194             let state = self.inner.state.load(SeqCst).into();
195             match state {
196                 State::Want => {
197                     return Poll::Ready(Ok(()));
198                 },
199                 State::Closed => {
200                     return Poll::Ready(Err(Closed { _inner: () }));
201                 },
202                 State::Idle | State::Give => {
203                     // Taker doesn't want anything yet, so park.
204                     if let Some(mut locked) = self.inner.task.try_lock_explicit(SeqCst, SeqCst) {
205 
206                         // While we have the lock, try to set to GIVE.
207                         let old = self.inner.state.compare_exchange(
208                             state.into(),
209                             State::Give.into(),
210                             SeqCst,
211                             SeqCst,
212                         );
213                         // If it's still the first state (Idle or Give), park current task.
214                         if old == Ok(state.into()) {
215                             let park = locked.as_ref()
216                                 .map(|w| !w.will_wake(cx.waker()))
217                                 .unwrap_or(true);
218                             if park {
219                                 let old = mem::replace(&mut *locked, Some(cx.waker().clone()));
220                                 drop(locked);
221                                 if let Some(prev_task) = old {
222                                     // there was an old task parked here.
223                                     // it might be waiting to be notified,
224                                     // so poke it before dropping.
225                                     prev_task.wake();
226                                 };
227                             }
228                             return Poll::Pending;
229                         }
230                         // Otherwise, something happened! Go around the loop again.
231                     } else {
232                         // if we couldn't take the lock, then a Taker has it.
233                         // The *ONLY* reason is because it is in the process of notifying us
234                         // of its want.
235                         //
236                         // We need to loop again to see what state it was changed to.
237                     }
238                 },
239             }
240         }
241     }
242 
243     /// Mark the state as idle, if the Taker currently is wanting.
244     ///
245     /// Returns true if Taker was wanting, false otherwise.
246     #[inline]
give(&self) -> bool247     pub fn give(&self) -> bool {
248         // only set to IDLE if it is still Want
249         let old = self.inner.state.compare_exchange(
250             State::Want.into(),
251             State::Idle.into(),
252             SeqCst,
253             SeqCst);
254         old == Ok(State::Want.into())
255     }
256 
257     /// Check if the `Taker` has called `want()` without parking a task.
258     ///
259     /// This is safe to call outside of a futures task context, but other
260     /// means of being notified is left to the user.
261     #[inline]
is_wanting(&self) -> bool262     pub fn is_wanting(&self) -> bool {
263         self.inner.state.load(SeqCst) == State::Want.into()
264     }
265 
266 
267     /// Check if the `Taker` has canceled interest without parking a task.
268     #[inline]
is_canceled(&self) -> bool269     pub fn is_canceled(&self) -> bool {
270         self.inner.state.load(SeqCst) == State::Closed.into()
271     }
272 
273     /// Converts this into a `SharedGiver`.
274     #[inline]
shared(self) -> SharedGiver275     pub fn shared(self) -> SharedGiver {
276         SharedGiver {
277             inner: self.inner,
278         }
279     }
280 }
281 
282 impl fmt::Debug for Giver {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result283     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
284         f.debug_struct("Giver")
285             .field("state", &self.inner.state())
286             .finish()
287     }
288 }
289 
290 // ===== impl SharedGiver ======
291 
292 impl SharedGiver {
293     /// Check if the `Taker` has called `want()` without parking a task.
294     ///
295     /// This is safe to call outside of a futures task context, but other
296     /// means of being notified is left to the user.
297     #[inline]
is_wanting(&self) -> bool298     pub fn is_wanting(&self) -> bool {
299         self.inner.state.load(SeqCst) == State::Want.into()
300     }
301 
302 
303     /// Check if the `Taker` has canceled interest without parking a task.
304     #[inline]
is_canceled(&self) -> bool305     pub fn is_canceled(&self) -> bool {
306         self.inner.state.load(SeqCst) == State::Closed.into()
307     }
308 }
309 
310 impl fmt::Debug for SharedGiver {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result311     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
312         f.debug_struct("SharedGiver")
313             .field("state", &self.inner.state())
314             .finish()
315     }
316 }
317 
318 // ===== impl Taker ======
319 
320 impl Taker {
321     /// Signal to the `Giver` that the want is canceled.
322     ///
323     /// This is useful to tell that the channel is closed if you cannot
324     /// drop the value yet.
325     #[inline]
cancel(&mut self)326     pub fn cancel(&mut self) {
327         self.signal(State::Closed)
328     }
329 
330     /// Signal to the `Giver` that a value is wanted.
331     #[inline]
want(&mut self)332     pub fn want(&mut self) {
333         debug_assert!(
334             self.inner.state.load(SeqCst) != State::Closed.into(),
335             "want called after cancel"
336         );
337         self.signal(State::Want)
338     }
339 
340     #[inline]
signal(&mut self, state: State)341     fn signal(&mut self, state: State) {
342         let old_state = self.inner.state.swap(state.into(), SeqCst).into();
343         match old_state {
344             State::Idle | State::Want | State::Closed => (),
345             State::Give => {
346                 loop {
347                     if let Some(mut locked) = self.inner.task.try_lock_explicit(SeqCst, SeqCst) {
348                         if let Some(task) = locked.take() {
349                             drop(locked);
350                             task.wake();
351                         }
352                         return;
353                     } else {
354                         // if we couldn't take the lock, then a Giver has it.
355                         // The *ONLY* reason is because it is in the process of parking.
356                         //
357                         // We need to loop and take the lock so we can notify this task.
358                     }
359                 }
360             },
361         }
362     }
363 }
364 
365 impl Drop for Taker {
366     #[inline]
drop(&mut self)367     fn drop(&mut self) {
368         self.signal(State::Closed);
369     }
370 }
371 
372 impl fmt::Debug for Taker {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result373     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
374         f.debug_struct("Taker")
375             .field("state", &self.inner.state())
376             .finish()
377     }
378 }
379 
380 // ===== impl Closed ======
381 
382 impl fmt::Debug for Closed {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result383     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384         f.debug_struct("Closed")
385             .finish()
386     }
387 }
388 
389 // ===== impl Inner ======
390 
391 impl Inner {
392     #[inline]
state(&self) -> State393     fn state(&self) -> State {
394         self.state.load(SeqCst).into()
395     }
396 }
397 
398 // ===== impl PollFn ======
399 
400 struct Want<'a>(&'a mut Giver);
401 
402 
403 impl Future for Want<'_> {
404     type Output = Result<(), Closed>;
405 
poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>406     fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
407         self.0.poll_want(cx)
408     }
409 }
410 
411 #[cfg(test)]
412 mod tests {
413     use std::thread;
414     use tokio_sync::oneshot;
415     use super::*;
416 
block_on<F: Future>(f: F) -> F::Output417     fn block_on<F: Future>(f: F) -> F::Output {
418         tokio_executor::enter()
419             .expect("block_on enter")
420             .block_on(f)
421     }
422 
423     #[test]
want_ready()424     fn want_ready() {
425         let (mut gv, mut tk) = new();
426 
427         tk.want();
428 
429         block_on(gv.want()).unwrap();
430     }
431 
432     #[test]
want_notify_0()433     fn want_notify_0() {
434         let (mut gv, mut tk) = new();
435         let (tx, rx) = oneshot::channel();
436 
437         thread::spawn(move || {
438             tk.want();
439             // use a oneshot to keep this thread alive
440             // until other thread was notified of want
441             block_on(rx).expect("rx");
442         });
443 
444         block_on(gv.want()).expect("want");
445 
446         assert!(gv.is_wanting(), "still wanting after poll_want success");
447         assert!(gv.give(), "give is true when wanting");
448 
449         assert!(!gv.is_wanting(), "no longer wanting after give");
450         assert!(!gv.is_canceled(), "give doesn't cancel");
451 
452         assert!(!gv.give(), "give is false if not wanting");
453 
454         tx.send(()).expect("tx");
455     }
456 
457     /*
458     /// This tests that if the Giver moves tasks after parking,
459     /// it will still wake up the correct task.
460     #[test]
461     fn want_notify_moving_tasks() {
462         use std::sync::Arc;
463         use futures::executor::{spawn, Notify, NotifyHandle};
464 
465         struct WantNotify;
466 
467         impl Notify for WantNotify {
468             fn notify(&self, _id: usize) {
469             }
470         }
471 
472         fn n() -> NotifyHandle {
473             Arc::new(WantNotify).into()
474         }
475 
476         let (mut gv, mut tk) = new();
477 
478         let mut s = spawn(poll_fn(move || {
479             gv.poll_want()
480         }));
481 
482         // Register with t1 as the task::current()
483         let t1 = n();
484         assert!(s.poll_future_notify(&t1, 1).unwrap().is_not_ready());
485 
486         thread::spawn(move || {
487             thread::sleep(::std::time::Duration::from_millis(100));
488             tk.want();
489         });
490 
491         // And now, move to a ThreadNotify task.
492         s.into_inner().wait().expect("poll_want");
493     }
494     */
495 
496     #[test]
cancel()497     fn cancel() {
498         // explicit
499         let (mut gv, mut tk) = new();
500 
501         assert!(!gv.is_canceled());
502 
503         tk.cancel();
504 
505         assert!(gv.is_canceled());
506         block_on(gv.want()).unwrap_err();
507 
508         // implicit
509         let (mut gv, tk) = new();
510 
511         assert!(!gv.is_canceled());
512 
513         drop(tk);
514 
515         assert!(gv.is_canceled());
516         block_on(gv.want()).unwrap_err();
517 
518         // notifies
519         let (mut gv, tk) = new();
520 
521         thread::spawn(move || {
522             let _tk = tk;
523             // and dropped
524         });
525 
526         block_on(gv.want()).unwrap_err();
527     }
528 
529     /*
530     #[test]
531     fn stress() {
532         let nthreads = 5;
533         let nwants = 100;
534 
535         for _ in 0..nthreads {
536             let (mut gv, mut tk) = new();
537             let (mut tx, mut rx) = mpsc::channel(0);
538 
539             // rx thread
540             thread::spawn(move || {
541                 let mut cnt = 0;
542                 poll_fn(move || {
543                     while cnt < nwants {
544                         let n = match rx.poll().expect("rx poll") {
545                             Async::Ready(n) => n.expect("rx opt"),
546                             Async::NotReady => {
547                                 tk.want();
548                                 return Ok(Async::NotReady);
549                             },
550                         };
551                         assert_eq!(cnt, n);
552                         cnt += 1;
553                     }
554                     Ok::<_, ()>(Async::Ready(()))
555                 }).wait().expect("rx wait");
556             });
557 
558             // tx thread
559             thread::spawn(move || {
560                 let mut cnt = 0;
561                 let nsent = poll_fn(move || {
562                     loop {
563                         while let Ok(()) = tx.try_send(cnt) {
564                             cnt += 1;
565                         }
566                         match gv.poll_want() {
567                             Ok(Async::Ready(_)) => (),
568                             Ok(Async::NotReady) => return Ok::<_, ()>(Async::NotReady),
569                             Err(_) => return Ok(Async::Ready(cnt)),
570                         }
571                     }
572                 }).wait().expect("tx wait");
573 
574                 assert_eq!(nsent, nwants);
575             }).join().expect("thread join");
576         }
577     }
578     */
579 }
580