1 //! Interface to the select mechanism.
2 
3 use std::fmt;
4 use std::marker::PhantomData;
5 use std::mem;
6 use std::time::{Duration, Instant};
7 use std::vec::Vec;
8 
9 use crossbeam_utils::Backoff;
10 
11 use crate::channel::{self, Receiver, Sender};
12 use crate::context::Context;
13 use crate::err::{ReadyTimeoutError, TryReadyError};
14 use crate::err::{RecvError, SendError};
15 use crate::err::{SelectTimeoutError, TrySelectError};
16 use crate::flavors;
17 use crate::utils;
18 
19 /// Temporary data that gets initialized during select or a blocking operation, and is consumed by
20 /// `read` or `write`.
21 ///
22 /// Each field contains data associated with a specific channel flavor.
23 // This is a private API that is used by the select macro.
24 #[derive(Debug, Default)]
25 pub struct Token {
26     pub(crate) at: flavors::at::AtToken,
27     pub(crate) array: flavors::array::ArrayToken,
28     pub(crate) list: flavors::list::ListToken,
29     #[allow(dead_code)]
30     pub(crate) never: flavors::never::NeverToken,
31     pub(crate) tick: flavors::tick::TickToken,
32     pub(crate) zero: flavors::zero::ZeroToken,
33 }
34 
35 /// Identifier associated with an operation by a specific thread on a specific channel.
36 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
37 pub struct Operation(usize);
38 
39 impl Operation {
40     /// Creates an operation identifier from a mutable reference.
41     ///
42     /// This function essentially just turns the address of the reference into a number. The
43     /// reference should point to a variable that is specific to the thread and the operation,
44     /// and is alive for the entire duration of select or blocking operation.
45     #[inline]
hook<T>(r: &mut T) -> Operation46     pub fn hook<T>(r: &mut T) -> Operation {
47         let val = r as *mut T as usize;
48         // Make sure that the pointer address doesn't equal the numerical representation of
49         // `Selected::{Waiting, Aborted, Disconnected}`.
50         assert!(val > 2);
51         Operation(val)
52     }
53 }
54 
55 /// Current state of a select or a blocking operation.
56 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
57 pub enum Selected {
58     /// Still waiting for an operation.
59     Waiting,
60 
61     /// The attempt to block the current thread has been aborted.
62     Aborted,
63 
64     /// An operation became ready because a channel is disconnected.
65     Disconnected,
66 
67     /// An operation became ready because a message can be sent or received.
68     Operation(Operation),
69 }
70 
71 impl From<usize> for Selected {
72     #[inline]
from(val: usize) -> Selected73     fn from(val: usize) -> Selected {
74         match val {
75             0 => Selected::Waiting,
76             1 => Selected::Aborted,
77             2 => Selected::Disconnected,
78             oper => Selected::Operation(Operation(oper)),
79         }
80     }
81 }
82 
83 impl Into<usize> for Selected {
84     #[inline]
into(self) -> usize85     fn into(self) -> usize {
86         match self {
87             Selected::Waiting => 0,
88             Selected::Aborted => 1,
89             Selected::Disconnected => 2,
90             Selected::Operation(Operation(val)) => val,
91         }
92     }
93 }
94 
95 /// A receiver or a sender that can participate in select.
96 ///
97 /// This is a handle that assists select in executing an operation, registration, deciding on the
98 /// appropriate deadline for blocking, etc.
99 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
100 pub trait SelectHandle {
101     /// Attempts to select an operation and returns `true` on success.
try_select(&self, token: &mut Token) -> bool102     fn try_select(&self, token: &mut Token) -> bool;
103 
104     /// Returns a deadline for an operation, if there is one.
deadline(&self) -> Option<Instant>105     fn deadline(&self) -> Option<Instant>;
106 
107     /// Registers an operation for execution and returns `true` if it is now ready.
register(&self, oper: Operation, cx: &Context) -> bool108     fn register(&self, oper: Operation, cx: &Context) -> bool;
109 
110     /// Unregisters an operation for execution.
unregister(&self, oper: Operation)111     fn unregister(&self, oper: Operation);
112 
113     /// Attempts to select an operation the thread got woken up for and returns `true` on success.
accept(&self, token: &mut Token, cx: &Context) -> bool114     fn accept(&self, token: &mut Token, cx: &Context) -> bool;
115 
116     /// Returns `true` if an operation can be executed without blocking.
is_ready(&self) -> bool117     fn is_ready(&self) -> bool;
118 
119     /// Registers an operation for readiness notification and returns `true` if it is now ready.
watch(&self, oper: Operation, cx: &Context) -> bool120     fn watch(&self, oper: Operation, cx: &Context) -> bool;
121 
122     /// Unregisters an operation for readiness notification.
unwatch(&self, oper: Operation)123     fn unwatch(&self, oper: Operation);
124 }
125 
126 impl<T: SelectHandle> SelectHandle for &T {
try_select(&self, token: &mut Token) -> bool127     fn try_select(&self, token: &mut Token) -> bool {
128         (**self).try_select(token)
129     }
130 
deadline(&self) -> Option<Instant>131     fn deadline(&self) -> Option<Instant> {
132         (**self).deadline()
133     }
134 
register(&self, oper: Operation, cx: &Context) -> bool135     fn register(&self, oper: Operation, cx: &Context) -> bool {
136         (**self).register(oper, cx)
137     }
138 
unregister(&self, oper: Operation)139     fn unregister(&self, oper: Operation) {
140         (**self).unregister(oper);
141     }
142 
accept(&self, token: &mut Token, cx: &Context) -> bool143     fn accept(&self, token: &mut Token, cx: &Context) -> bool {
144         (**self).accept(token, cx)
145     }
146 
is_ready(&self) -> bool147     fn is_ready(&self) -> bool {
148         (**self).is_ready()
149     }
150 
watch(&self, oper: Operation, cx: &Context) -> bool151     fn watch(&self, oper: Operation, cx: &Context) -> bool {
152         (**self).watch(oper, cx)
153     }
154 
unwatch(&self, oper: Operation)155     fn unwatch(&self, oper: Operation) {
156         (**self).unwatch(oper)
157     }
158 }
159 
160 /// Determines when a select operation should time out.
161 #[derive(Clone, Copy, Eq, PartialEq)]
162 enum Timeout {
163     /// No blocking.
164     Now,
165 
166     /// Block forever.
167     Never,
168 
169     /// Time out after the time instant.
170     At(Instant),
171 }
172 
173 /// Runs until one of the operations is selected, potentially blocking the current thread.
174 ///
175 /// Successful receive operations will have to be followed up by `channel::read()` and successful
176 /// send operations by `channel::write()`.
run_select( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, is_biased: bool, ) -> Option<(Token, usize, *const u8)>177 fn run_select(
178     handles: &mut [(&dyn SelectHandle, usize, *const u8)],
179     timeout: Timeout,
180     is_biased: bool,
181 ) -> Option<(Token, usize, *const u8)> {
182     if handles.is_empty() {
183         // Wait until the timeout and return.
184         match timeout {
185             Timeout::Now => return None,
186             Timeout::Never => {
187                 utils::sleep_until(None);
188                 unreachable!();
189             }
190             Timeout::At(when) => {
191                 utils::sleep_until(Some(when));
192                 return None;
193             }
194         }
195     }
196 
197     if !is_biased {
198         // Shuffle the operations for fairness.
199         utils::shuffle(handles);
200     }
201 
202     // Create a token, which serves as a temporary variable that gets initialized in this function
203     // and is later used by a call to `channel::read()` or `channel::write()` that completes the
204     // selected operation.
205     let mut token = Token::default();
206 
207     // Try selecting one of the operations without blocking.
208     for &(handle, i, ptr) in handles.iter() {
209         if handle.try_select(&mut token) {
210             return Some((token, i, ptr));
211         }
212     }
213 
214     loop {
215         // Prepare for blocking.
216         let res = Context::with(|cx| {
217             let mut sel = Selected::Waiting;
218             let mut registered_count = 0;
219             let mut index_ready = None;
220 
221             if let Timeout::Now = timeout {
222                 cx.try_select(Selected::Aborted).unwrap();
223             }
224 
225             // Register all operations.
226             for (handle, i, _) in handles.iter_mut() {
227                 registered_count += 1;
228 
229                 // If registration returns `false`, that means the operation has just become ready.
230                 if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
231                     // Try aborting select.
232                     sel = match cx.try_select(Selected::Aborted) {
233                         Ok(()) => {
234                             index_ready = Some(*i);
235                             Selected::Aborted
236                         }
237                         Err(s) => s,
238                     };
239                     break;
240                 }
241 
242                 // If another thread has already selected one of the operations, stop registration.
243                 sel = cx.selected();
244                 if sel != Selected::Waiting {
245                     break;
246                 }
247             }
248 
249             if sel == Selected::Waiting {
250                 // Check with each operation for how long we're allowed to block, and compute the
251                 // earliest deadline.
252                 let mut deadline: Option<Instant> = match timeout {
253                     Timeout::Now => return None,
254                     Timeout::Never => None,
255                     Timeout::At(when) => Some(when),
256                 };
257                 for &(handle, _, _) in handles.iter() {
258                     if let Some(x) = handle.deadline() {
259                         deadline = deadline.map(|y| x.min(y)).or(Some(x));
260                     }
261                 }
262 
263                 // Block the current thread.
264                 sel = cx.wait_until(deadline);
265             }
266 
267             // Unregister all registered operations.
268             for (handle, _, _) in handles.iter_mut().take(registered_count) {
269                 handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
270             }
271 
272             match sel {
273                 Selected::Waiting => unreachable!(),
274                 Selected::Aborted => {
275                     // If an operation became ready during registration, try selecting it.
276                     if let Some(index_ready) = index_ready {
277                         for &(handle, i, ptr) in handles.iter() {
278                             if i == index_ready && handle.try_select(&mut token) {
279                                 return Some((i, ptr));
280                             }
281                         }
282                     }
283                 }
284                 Selected::Disconnected => {}
285                 Selected::Operation(_) => {
286                     // Find the selected operation.
287                     for (handle, i, ptr) in handles.iter_mut() {
288                         // Is this the selected operation?
289                         if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
290                         {
291                             // Try selecting this operation.
292                             if handle.accept(&mut token, cx) {
293                                 return Some((*i, *ptr));
294                             }
295                         }
296                     }
297                 }
298             }
299 
300             None
301         });
302 
303         // Return if an operation was selected.
304         if let Some((i, ptr)) = res {
305             return Some((token, i, ptr));
306         }
307 
308         // Try selecting one of the operations without blocking.
309         for &(handle, i, ptr) in handles.iter() {
310             if handle.try_select(&mut token) {
311                 return Some((token, i, ptr));
312             }
313         }
314 
315         match timeout {
316             Timeout::Now => return None,
317             Timeout::Never => {}
318             Timeout::At(when) => {
319                 if Instant::now() >= when {
320                     return None;
321                 }
322             }
323         }
324     }
325 }
326 
327 /// Runs until one of the operations becomes ready, potentially blocking the current thread.
run_ready( handles: &mut [(&dyn SelectHandle, usize, *const u8)], timeout: Timeout, is_biased: bool, ) -> Option<usize>328 fn run_ready(
329     handles: &mut [(&dyn SelectHandle, usize, *const u8)],
330     timeout: Timeout,
331     is_biased: bool,
332 ) -> Option<usize> {
333     if handles.is_empty() {
334         // Wait until the timeout and return.
335         match timeout {
336             Timeout::Now => return None,
337             Timeout::Never => {
338                 utils::sleep_until(None);
339                 unreachable!();
340             }
341             Timeout::At(when) => {
342                 utils::sleep_until(Some(when));
343                 return None;
344             }
345         }
346     }
347 
348     if !is_biased {
349         // Shuffle the operations for fairness.
350         utils::shuffle(handles);
351     }
352 
353     loop {
354         let backoff = Backoff::new();
355         loop {
356             // Check operations for readiness.
357             for &(handle, i, _) in handles.iter() {
358                 if handle.is_ready() {
359                     return Some(i);
360                 }
361             }
362 
363             if backoff.is_completed() {
364                 break;
365             } else {
366                 backoff.snooze();
367             }
368         }
369 
370         // Check for timeout.
371         match timeout {
372             Timeout::Now => return None,
373             Timeout::Never => {}
374             Timeout::At(when) => {
375                 if Instant::now() >= when {
376                     return None;
377                 }
378             }
379         }
380 
381         // Prepare for blocking.
382         let res = Context::with(|cx| {
383             let mut sel = Selected::Waiting;
384             let mut registered_count = 0;
385 
386             // Begin watching all operations.
387             for (handle, _, _) in handles.iter_mut() {
388                 registered_count += 1;
389                 let oper = Operation::hook::<&dyn SelectHandle>(handle);
390 
391                 // If registration returns `false`, that means the operation has just become ready.
392                 if handle.watch(oper, cx) {
393                     sel = match cx.try_select(Selected::Operation(oper)) {
394                         Ok(()) => Selected::Operation(oper),
395                         Err(s) => s,
396                     };
397                     break;
398                 }
399 
400                 // If another thread has already chosen one of the operations, stop registration.
401                 sel = cx.selected();
402                 if sel != Selected::Waiting {
403                     break;
404                 }
405             }
406 
407             if sel == Selected::Waiting {
408                 // Check with each operation for how long we're allowed to block, and compute the
409                 // earliest deadline.
410                 let mut deadline: Option<Instant> = match timeout {
411                     Timeout::Now => unreachable!(),
412                     Timeout::Never => None,
413                     Timeout::At(when) => Some(when),
414                 };
415                 for &(handle, _, _) in handles.iter() {
416                     if let Some(x) = handle.deadline() {
417                         deadline = deadline.map(|y| x.min(y)).or(Some(x));
418                     }
419                 }
420 
421                 // Block the current thread.
422                 sel = cx.wait_until(deadline);
423             }
424 
425             // Unwatch all operations.
426             for (handle, _, _) in handles.iter_mut().take(registered_count) {
427                 handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
428             }
429 
430             match sel {
431                 Selected::Waiting => unreachable!(),
432                 Selected::Aborted => {}
433                 Selected::Disconnected => {}
434                 Selected::Operation(_) => {
435                     for (handle, i, _) in handles.iter_mut() {
436                         let oper = Operation::hook::<&dyn SelectHandle>(handle);
437                         if sel == Selected::Operation(oper) {
438                             return Some(*i);
439                         }
440                     }
441                 }
442             }
443 
444             None
445         });
446 
447         // Return if an operation became ready.
448         if res.is_some() {
449             return res;
450         }
451     }
452 }
453 
454 /// Attempts to select one of the operations without blocking.
455 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
456 #[inline]
try_select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], is_biased: bool, ) -> Result<SelectedOperation<'a>, TrySelectError>457 pub fn try_select<'a>(
458     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
459     is_biased: bool,
460 ) -> Result<SelectedOperation<'a>, TrySelectError> {
461     match run_select(handles, Timeout::Now, is_biased) {
462         None => Err(TrySelectError),
463         Some((token, index, ptr)) => Ok(SelectedOperation {
464             token,
465             index,
466             ptr,
467             _marker: PhantomData,
468         }),
469     }
470 }
471 
472 /// Blocks until one of the operations becomes ready and selects it.
473 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
474 #[inline]
select<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], is_biased: bool, ) -> SelectedOperation<'a>475 pub fn select<'a>(
476     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
477     is_biased: bool,
478 ) -> SelectedOperation<'a> {
479     if handles.is_empty() {
480         panic!("no operations have been added to `Select`");
481     }
482 
483     let (token, index, ptr) = run_select(handles, Timeout::Never, is_biased).unwrap();
484     SelectedOperation {
485         token,
486         index,
487         ptr,
488         _marker: PhantomData,
489     }
490 }
491 
492 /// Blocks for a limited time until one of the operations becomes ready and selects it.
493 // This is a private API (exposed inside crossbeam_channel::internal module) that is used by the select macro.
494 #[inline]
select_timeout<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], timeout: Duration, is_biased: bool, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>495 pub fn select_timeout<'a>(
496     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
497     timeout: Duration,
498     is_biased: bool,
499 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
500     match Instant::now().checked_add(timeout) {
501         Some(deadline) => select_deadline(handles, deadline, is_biased),
502         None => Ok(select(handles, is_biased)),
503     }
504 }
505 
506 /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
507 #[inline]
select_deadline<'a>( handles: &mut [(&'a dyn SelectHandle, usize, *const u8)], deadline: Instant, is_biased: bool, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>508 pub(crate) fn select_deadline<'a>(
509     handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
510     deadline: Instant,
511     is_biased: bool,
512 ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
513     match run_select(handles, Timeout::At(deadline), is_biased) {
514         None => Err(SelectTimeoutError),
515         Some((token, index, ptr)) => Ok(SelectedOperation {
516             token,
517             index,
518             ptr,
519             _marker: PhantomData,
520         }),
521     }
522 }
523 
524 /// Selects from a set of channel operations.
525 ///
526 /// `Select` allows you to define a set of channel operations, wait until any one of them becomes
527 /// ready, and finally execute it. If multiple operations are ready at the same time, a random one
528 /// among them is selected.
529 ///
530 /// An operation is considered to be ready if it doesn't have to block. Note that it is ready even
531 /// when it will simply return an error because the channel is disconnected.
532 ///
533 /// The [`select!`] macro is a convenience wrapper around `Select`. However, it cannot select over a
534 /// dynamically created list of channel operations.
535 ///
536 /// [`select!`]: crate::select!
537 ///
538 /// Once a list of operations has been built with `Select`, there are two different ways of
539 /// proceeding:
540 ///
541 /// * Select an operation with [`try_select`], [`select`], or [`select_timeout`]. If successful,
542 ///   the returned selected operation has already begun and **must** be completed. If we don't
543 ///   complete it, a panic will occur.
544 ///
545 /// * Wait for an operation to become ready with [`try_ready`], [`ready`], or [`ready_timeout`]. If
546 ///   successful, we may attempt to execute the operation, but are not obliged to. In fact, it's
547 ///   possible for another thread to make the operation not ready just before we try executing it,
548 ///   so it's wise to use a retry loop. However, note that these methods might return with success
549 ///   spuriously, so it's a good idea to always double check if the operation is really ready.
550 ///
551 /// # Examples
552 ///
553 /// Use [`select`] to receive a message from a list of receivers:
554 ///
555 /// ```
556 /// use crossbeam_channel::{Receiver, RecvError, Select};
557 ///
558 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
559 ///     // Build a list of operations.
560 ///     let mut sel = Select::new();
561 ///     for r in rs {
562 ///         sel.recv(r);
563 ///     }
564 ///
565 ///     // Complete the selected operation.
566 ///     let oper = sel.select();
567 ///     let index = oper.index();
568 ///     oper.recv(&rs[index])
569 /// }
570 /// ```
571 ///
572 /// Use [`ready`] to receive a message from a list of receivers:
573 ///
574 /// ```
575 /// use crossbeam_channel::{Receiver, RecvError, Select};
576 ///
577 /// fn recv_multiple<T>(rs: &[Receiver<T>]) -> Result<T, RecvError> {
578 ///     // Build a list of operations.
579 ///     let mut sel = Select::new();
580 ///     for r in rs {
581 ///         sel.recv(r);
582 ///     }
583 ///
584 ///     loop {
585 ///         // Wait until a receive operation becomes ready and try executing it.
586 ///         let index = sel.ready();
587 ///         let res = rs[index].try_recv();
588 ///
589 ///         // If the operation turns out not to be ready, retry.
590 ///         if let Err(e) = res {
591 ///             if e.is_empty() {
592 ///                 continue;
593 ///             }
594 ///         }
595 ///
596 ///         // Success!
597 ///         return res.map_err(|_| RecvError);
598 ///     }
599 /// }
600 /// ```
601 ///
602 /// [`try_select`]: Select::try_select
603 /// [`select`]: Select::select
604 /// [`select_timeout`]: Select::select_timeout
605 /// [`try_ready`]: Select::try_ready
606 /// [`ready`]: Select::ready
607 /// [`ready_timeout`]: Select::ready_timeout
608 pub struct Select<'a> {
609     /// A list of senders and receivers participating in selection.
610     handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
611 
612     /// The next index to assign to an operation.
613     next_index: usize,
614 }
615 
616 unsafe impl Send for Select<'_> {}
617 unsafe impl Sync for Select<'_> {}
618 
619 impl<'a> Select<'a> {
620     /// Creates an empty list of channel operations for selection.
621     ///
622     /// # Examples
623     ///
624     /// ```
625     /// use crossbeam_channel::Select;
626     ///
627     /// let mut sel = Select::new();
628     ///
629     /// // The list of operations is empty, which means no operation can be selected.
630     /// assert!(sel.try_select().is_err());
631     /// ```
new() -> Select<'a>632     pub fn new() -> Select<'a> {
633         Select {
634             handles: Vec::with_capacity(4),
635             next_index: 0,
636         }
637     }
638 
639     /// Adds a send operation.
640     ///
641     /// Returns the index of the added operation.
642     ///
643     /// # Examples
644     ///
645     /// ```
646     /// use crossbeam_channel::{unbounded, Select};
647     ///
648     /// let (s, r) = unbounded::<i32>();
649     ///
650     /// let mut sel = Select::new();
651     /// let index = sel.send(&s);
652     /// ```
send<T>(&mut self, s: &'a Sender<T>) -> usize653     pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
654         let i = self.next_index;
655         let ptr = s as *const Sender<_> as *const u8;
656         self.handles.push((s, i, ptr));
657         self.next_index += 1;
658         i
659     }
660 
661     /// Adds a receive operation.
662     ///
663     /// Returns the index of the added operation.
664     ///
665     /// # Examples
666     ///
667     /// ```
668     /// use crossbeam_channel::{unbounded, Select};
669     ///
670     /// let (s, r) = unbounded::<i32>();
671     ///
672     /// let mut sel = Select::new();
673     /// let index = sel.recv(&r);
674     /// ```
recv<T>(&mut self, r: &'a Receiver<T>) -> usize675     pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
676         let i = self.next_index;
677         let ptr = r as *const Receiver<_> as *const u8;
678         self.handles.push((r, i, ptr));
679         self.next_index += 1;
680         i
681     }
682 
683     /// Removes a previously added operation.
684     ///
685     /// This is useful when an operation is selected because the channel got disconnected and we
686     /// want to try again to select a different operation instead.
687     ///
688     /// If new operations are added after removing some, the indices of removed operations will not
689     /// be reused.
690     ///
691     /// # Panics
692     ///
693     /// An attempt to remove a non-existing or already removed operation will panic.
694     ///
695     /// # Examples
696     ///
697     /// ```
698     /// use crossbeam_channel::{unbounded, Select};
699     ///
700     /// let (s1, r1) = unbounded::<i32>();
701     /// let (_, r2) = unbounded::<i32>();
702     ///
703     /// let mut sel = Select::new();
704     /// let oper1 = sel.recv(&r1);
705     /// let oper2 = sel.recv(&r2);
706     ///
707     /// // Both operations are initially ready, so a random one will be executed.
708     /// let oper = sel.select();
709     /// assert_eq!(oper.index(), oper2);
710     /// assert!(oper.recv(&r2).is_err());
711     /// sel.remove(oper2);
712     ///
713     /// s1.send(10).unwrap();
714     ///
715     /// let oper = sel.select();
716     /// assert_eq!(oper.index(), oper1);
717     /// assert_eq!(oper.recv(&r1), Ok(10));
718     /// ```
remove(&mut self, index: usize)719     pub fn remove(&mut self, index: usize) {
720         assert!(
721             index < self.next_index,
722             "index out of bounds; {} >= {}",
723             index,
724             self.next_index,
725         );
726 
727         let i = self
728             .handles
729             .iter()
730             .enumerate()
731             .find(|(_, (_, i, _))| *i == index)
732             .expect("no operation with this index")
733             .0;
734 
735         self.handles.swap_remove(i);
736     }
737 
738     /// Attempts to select one of the operations without blocking.
739     ///
740     /// If an operation is ready, it is selected and returned. If multiple operations are ready at
741     /// the same time, a random one among them is selected. If none of the operations are ready, an
742     /// error is returned.
743     ///
744     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
745     /// even when it will simply return an error because the channel is disconnected.
746     ///
747     /// The selected operation must be completed with [`SelectedOperation::send`]
748     /// or [`SelectedOperation::recv`].
749     ///
750     /// # Examples
751     ///
752     /// ```
753     /// use crossbeam_channel::{unbounded, Select};
754     ///
755     /// let (s1, r1) = unbounded();
756     /// let (s2, r2) = unbounded();
757     ///
758     /// s1.send(10).unwrap();
759     /// s2.send(20).unwrap();
760     ///
761     /// let mut sel = Select::new();
762     /// let oper1 = sel.recv(&r1);
763     /// let oper2 = sel.recv(&r2);
764     ///
765     /// // Both operations are initially ready, so a random one will be executed.
766     /// let oper = sel.try_select();
767     /// match oper {
768     ///     Err(_) => panic!("both operations should be ready"),
769     ///     Ok(oper) => match oper.index() {
770     ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
771     ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
772     ///         _ => unreachable!(),
773     ///     }
774     /// }
775     /// ```
try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError>776     pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
777         try_select(&mut self.handles, false)
778     }
779 
780     /// Blocks until one of the operations becomes ready and selects it.
781     ///
782     /// Once an operation becomes ready, it is selected and returned. If multiple operations are
783     /// ready at the same time, a random one among them is selected.
784     ///
785     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
786     /// even when it will simply return an error because the channel is disconnected.
787     ///
788     /// The selected operation must be completed with [`SelectedOperation::send`]
789     /// or [`SelectedOperation::recv`].
790     ///
791     /// # Panics
792     ///
793     /// Panics if no operations have been added to `Select`.
794     ///
795     /// # Examples
796     ///
797     /// ```
798     /// use std::thread;
799     /// use std::time::Duration;
800     /// use crossbeam_channel::{unbounded, Select};
801     ///
802     /// let (s1, r1) = unbounded();
803     /// let (s2, r2) = unbounded();
804     ///
805     /// thread::spawn(move || {
806     ///     thread::sleep(Duration::from_secs(1));
807     ///     s1.send(10).unwrap();
808     /// });
809     /// thread::spawn(move || s2.send(20).unwrap());
810     ///
811     /// let mut sel = Select::new();
812     /// let oper1 = sel.recv(&r1);
813     /// let oper2 = sel.recv(&r2);
814     ///
815     /// // The second operation will be selected because it becomes ready first.
816     /// let oper = sel.select();
817     /// match oper.index() {
818     ///     i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
819     ///     i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
820     ///     _ => unreachable!(),
821     /// }
822     /// ```
select(&mut self) -> SelectedOperation<'a>823     pub fn select(&mut self) -> SelectedOperation<'a> {
824         select(&mut self.handles, false)
825     }
826 
827     /// Blocks for a limited time until one of the operations becomes ready and selects it.
828     ///
829     /// If an operation becomes ready, it is selected and returned. If multiple operations are
830     /// ready at the same time, a random one among them is selected. If none of the operations
831     /// become ready for the specified duration, an error is returned.
832     ///
833     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
834     /// even when it will simply return an error because the channel is disconnected.
835     ///
836     /// The selected operation must be completed with [`SelectedOperation::send`]
837     /// or [`SelectedOperation::recv`].
838     ///
839     /// # Examples
840     ///
841     /// ```
842     /// use std::thread;
843     /// use std::time::Duration;
844     /// use crossbeam_channel::{unbounded, Select};
845     ///
846     /// let (s1, r1) = unbounded();
847     /// let (s2, r2) = unbounded();
848     ///
849     /// thread::spawn(move || {
850     ///     thread::sleep(Duration::from_secs(1));
851     ///     s1.send(10).unwrap();
852     /// });
853     /// thread::spawn(move || s2.send(20).unwrap());
854     ///
855     /// let mut sel = Select::new();
856     /// let oper1 = sel.recv(&r1);
857     /// let oper2 = sel.recv(&r2);
858     ///
859     /// // The second operation will be selected because it becomes ready first.
860     /// let oper = sel.select_timeout(Duration::from_millis(500));
861     /// match oper {
862     ///     Err(_) => panic!("should not have timed out"),
863     ///     Ok(oper) => match oper.index() {
864     ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
865     ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
866     ///         _ => unreachable!(),
867     ///     }
868     /// }
869     /// ```
select_timeout( &mut self, timeout: Duration, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>870     pub fn select_timeout(
871         &mut self,
872         timeout: Duration,
873     ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
874         select_timeout(&mut self.handles, timeout, false)
875     }
876 
877     /// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
878     ///
879     /// If an operation becomes ready, it is selected and returned. If multiple operations are
880     /// ready at the same time, a random one among them is selected. If none of the operations
881     /// become ready before the given deadline, an error is returned.
882     ///
883     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
884     /// even when it will simply return an error because the channel is disconnected.
885     ///
886     /// The selected operation must be completed with [`SelectedOperation::send`]
887     /// or [`SelectedOperation::recv`].
888     ///
889     /// # Examples
890     ///
891     /// ```
892     /// use std::thread;
893     /// use std::time::{Instant, Duration};
894     /// use crossbeam_channel::{unbounded, Select};
895     ///
896     /// let (s1, r1) = unbounded();
897     /// let (s2, r2) = unbounded();
898     ///
899     /// thread::spawn(move || {
900     ///     thread::sleep(Duration::from_secs(1));
901     ///     s1.send(10).unwrap();
902     /// });
903     /// thread::spawn(move || s2.send(20).unwrap());
904     ///
905     /// let mut sel = Select::new();
906     /// let oper1 = sel.recv(&r1);
907     /// let oper2 = sel.recv(&r2);
908     ///
909     /// let deadline = Instant::now() + Duration::from_millis(500);
910     ///
911     /// // The second operation will be selected because it becomes ready first.
912     /// let oper = sel.select_deadline(deadline);
913     /// match oper {
914     ///     Err(_) => panic!("should not have timed out"),
915     ///     Ok(oper) => match oper.index() {
916     ///         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(10)),
917     ///         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(20)),
918     ///         _ => unreachable!(),
919     ///     }
920     /// }
921     /// ```
select_deadline( &mut self, deadline: Instant, ) -> Result<SelectedOperation<'a>, SelectTimeoutError>922     pub fn select_deadline(
923         &mut self,
924         deadline: Instant,
925     ) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
926         select_deadline(&mut self.handles, deadline, false)
927     }
928 
929     /// Attempts to find a ready operation without blocking.
930     ///
931     /// If an operation is ready, its index is returned. If multiple operations are ready at the
932     /// same time, a random one among them is chosen. If none of the operations are ready, an error
933     /// is returned.
934     ///
935     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
936     /// even when it will simply return an error because the channel is disconnected.
937     ///
938     /// Note that this method might return with success spuriously, so it's a good idea to always
939     /// double check if the operation is really ready.
940     ///
941     /// # Examples
942     ///
943     /// ```
944     /// use crossbeam_channel::{unbounded, Select};
945     ///
946     /// let (s1, r1) = unbounded();
947     /// let (s2, r2) = unbounded();
948     ///
949     /// s1.send(10).unwrap();
950     /// s2.send(20).unwrap();
951     ///
952     /// let mut sel = Select::new();
953     /// let oper1 = sel.recv(&r1);
954     /// let oper2 = sel.recv(&r2);
955     ///
956     /// // Both operations are initially ready, so a random one will be chosen.
957     /// match sel.try_ready() {
958     ///     Err(_) => panic!("both operations should be ready"),
959     ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
960     ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
961     ///     Ok(_) => unreachable!(),
962     /// }
963     /// ```
try_ready(&mut self) -> Result<usize, TryReadyError>964     pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
965         match run_ready(&mut self.handles, Timeout::Now, false) {
966             None => Err(TryReadyError),
967             Some(index) => Ok(index),
968         }
969     }
970 
971     /// Blocks until one of the operations becomes ready.
972     ///
973     /// Once an operation becomes ready, its index is returned. If multiple operations are ready at
974     /// the same time, a random one among them is chosen.
975     ///
976     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
977     /// even when it will simply return an error because the channel is disconnected.
978     ///
979     /// Note that this method might return with success spuriously, so it's a good idea to always
980     /// double check if the operation is really ready.
981     ///
982     /// # Panics
983     ///
984     /// Panics if no operations have been added to `Select`.
985     ///
986     /// # Examples
987     ///
988     /// ```
989     /// use std::thread;
990     /// use std::time::Duration;
991     /// use crossbeam_channel::{unbounded, Select};
992     ///
993     /// let (s1, r1) = unbounded();
994     /// let (s2, r2) = unbounded();
995     ///
996     /// thread::spawn(move || {
997     ///     thread::sleep(Duration::from_secs(1));
998     ///     s1.send(10).unwrap();
999     /// });
1000     /// thread::spawn(move || s2.send(20).unwrap());
1001     ///
1002     /// let mut sel = Select::new();
1003     /// let oper1 = sel.recv(&r1);
1004     /// let oper2 = sel.recv(&r2);
1005     ///
1006     /// // The second operation will be selected because it becomes ready first.
1007     /// match sel.ready() {
1008     ///     i if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1009     ///     i if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1010     ///     _ => unreachable!(),
1011     /// }
1012     /// ```
ready(&mut self) -> usize1013     pub fn ready(&mut self) -> usize {
1014         if self.handles.is_empty() {
1015             panic!("no operations have been added to `Select`");
1016         }
1017 
1018         run_ready(&mut self.handles, Timeout::Never, false).unwrap()
1019     }
1020 
1021     /// Blocks for a limited time until one of the operations becomes ready.
1022     ///
1023     /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1024     /// the same time, a random one among them is chosen. If none of the operations become ready
1025     /// for the specified duration, an error is returned.
1026     ///
1027     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1028     /// even when it will simply return an error because the channel is disconnected.
1029     ///
1030     /// Note that this method might return with success spuriously, so it's a good idea to double
1031     /// check if the operation is really ready.
1032     ///
1033     /// # Examples
1034     ///
1035     /// ```
1036     /// use std::thread;
1037     /// use std::time::Duration;
1038     /// use crossbeam_channel::{unbounded, Select};
1039     ///
1040     /// let (s1, r1) = unbounded();
1041     /// let (s2, r2) = unbounded();
1042     ///
1043     /// thread::spawn(move || {
1044     ///     thread::sleep(Duration::from_secs(1));
1045     ///     s1.send(10).unwrap();
1046     /// });
1047     /// thread::spawn(move || s2.send(20).unwrap());
1048     ///
1049     /// let mut sel = Select::new();
1050     /// let oper1 = sel.recv(&r1);
1051     /// let oper2 = sel.recv(&r2);
1052     ///
1053     /// // The second operation will be selected because it becomes ready first.
1054     /// match sel.ready_timeout(Duration::from_millis(500)) {
1055     ///     Err(_) => panic!("should not have timed out"),
1056     ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1057     ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1058     ///     Ok(_) => unreachable!(),
1059     /// }
1060     /// ```
ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError>1061     pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
1062         match Instant::now().checked_add(timeout) {
1063             Some(deadline) => self.ready_deadline(deadline),
1064             None => Ok(self.ready()),
1065         }
1066     }
1067 
1068     /// Blocks until a given deadline, or until one of the operations becomes ready.
1069     ///
1070     /// If an operation becomes ready, its index is returned. If multiple operations are ready at
1071     /// the same time, a random one among them is chosen. If none of the operations become ready
1072     /// before the deadline, an error is returned.
1073     ///
1074     /// An operation is considered to be ready if it doesn't have to block. Note that it is ready
1075     /// even when it will simply return an error because the channel is disconnected.
1076     ///
1077     /// Note that this method might return with success spuriously, so it's a good idea to double
1078     /// check if the operation is really ready.
1079     ///
1080     /// # Examples
1081     ///
1082     /// ```
1083     /// use std::thread;
1084     /// use std::time::{Duration, Instant};
1085     /// use crossbeam_channel::{unbounded, Select};
1086     ///
1087     /// let deadline = Instant::now() + Duration::from_millis(500);
1088     ///
1089     /// let (s1, r1) = unbounded();
1090     /// let (s2, r2) = unbounded();
1091     ///
1092     /// thread::spawn(move || {
1093     ///     thread::sleep(Duration::from_secs(1));
1094     ///     s1.send(10).unwrap();
1095     /// });
1096     /// thread::spawn(move || s2.send(20).unwrap());
1097     ///
1098     /// let mut sel = Select::new();
1099     /// let oper1 = sel.recv(&r1);
1100     /// let oper2 = sel.recv(&r2);
1101     ///
1102     /// // The second operation will be selected because it becomes ready first.
1103     /// match sel.ready_deadline(deadline) {
1104     ///     Err(_) => panic!("should not have timed out"),
1105     ///     Ok(i) if i == oper1 => assert_eq!(r1.try_recv(), Ok(10)),
1106     ///     Ok(i) if i == oper2 => assert_eq!(r2.try_recv(), Ok(20)),
1107     ///     Ok(_) => unreachable!(),
1108     /// }
1109     /// ```
ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError>1110     pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
1111         match run_ready(&mut self.handles, Timeout::At(deadline), false) {
1112             None => Err(ReadyTimeoutError),
1113             Some(index) => Ok(index),
1114         }
1115     }
1116 }
1117 
1118 impl<'a> Clone for Select<'a> {
clone(&self) -> Select<'a>1119     fn clone(&self) -> Select<'a> {
1120         Select {
1121             handles: self.handles.clone(),
1122             next_index: self.next_index,
1123         }
1124     }
1125 }
1126 
1127 impl<'a> Default for Select<'a> {
default() -> Select<'a>1128     fn default() -> Select<'a> {
1129         Select::new()
1130     }
1131 }
1132 
1133 impl fmt::Debug for Select<'_> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1134     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1135         f.pad("Select { .. }")
1136     }
1137 }
1138 
1139 /// A selected operation that needs to be completed.
1140 ///
1141 /// To complete the operation, call [`send`] or [`recv`].
1142 ///
1143 /// # Panics
1144 ///
1145 /// Forgetting to complete the operation is an error and might lead to deadlocks. If a
1146 /// `SelectedOperation` is dropped without completion, a panic occurs.
1147 ///
1148 /// [`send`]: SelectedOperation::send
1149 /// [`recv`]: SelectedOperation::recv
1150 #[must_use]
1151 pub struct SelectedOperation<'a> {
1152     /// Token needed to complete the operation.
1153     token: Token,
1154 
1155     /// The index of the selected operation.
1156     index: usize,
1157 
1158     /// The address of the selected `Sender` or `Receiver`.
1159     ptr: *const u8,
1160 
1161     /// Indicates that `Sender`s and `Receiver`s are borrowed.
1162     _marker: PhantomData<&'a ()>,
1163 }
1164 
1165 impl SelectedOperation<'_> {
1166     /// Returns the index of the selected operation.
1167     ///
1168     /// # Examples
1169     ///
1170     /// ```
1171     /// use crossbeam_channel::{bounded, Select};
1172     ///
1173     /// let (s1, r1) = bounded::<()>(0);
1174     /// let (s2, r2) = bounded::<()>(0);
1175     /// let (s3, r3) = bounded::<()>(1);
1176     ///
1177     /// let mut sel = Select::new();
1178     /// let oper1 = sel.send(&s1);
1179     /// let oper2 = sel.recv(&r2);
1180     /// let oper3 = sel.send(&s3);
1181     ///
1182     /// // Only the last operation is ready.
1183     /// let oper = sel.select();
1184     /// assert_eq!(oper.index(), 2);
1185     /// assert_eq!(oper.index(), oper3);
1186     ///
1187     /// // Complete the operation.
1188     /// oper.send(&s3, ()).unwrap();
1189     /// ```
index(&self) -> usize1190     pub fn index(&self) -> usize {
1191         self.index
1192     }
1193 
1194     /// Completes the send operation.
1195     ///
1196     /// The passed [`Sender`] reference must be the same one that was used in [`Select::send`]
1197     /// when the operation was added.
1198     ///
1199     /// # Panics
1200     ///
1201     /// Panics if an incorrect [`Sender`] reference is passed.
1202     ///
1203     /// # Examples
1204     ///
1205     /// ```
1206     /// use crossbeam_channel::{bounded, Select, SendError};
1207     ///
1208     /// let (s, r) = bounded::<i32>(0);
1209     /// drop(r);
1210     ///
1211     /// let mut sel = Select::new();
1212     /// let oper1 = sel.send(&s);
1213     ///
1214     /// let oper = sel.select();
1215     /// assert_eq!(oper.index(), oper1);
1216     /// assert_eq!(oper.send(&s, 10), Err(SendError(10)));
1217     /// ```
send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>>1218     pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
1219         assert!(
1220             s as *const Sender<T> as *const u8 == self.ptr,
1221             "passed a sender that wasn't selected",
1222         );
1223         let res = unsafe { channel::write(s, &mut self.token, msg) };
1224         mem::forget(self);
1225         res.map_err(SendError)
1226     }
1227 
1228     /// Completes the receive operation.
1229     ///
1230     /// The passed [`Receiver`] reference must be the same one that was used in [`Select::recv`]
1231     /// when the operation was added.
1232     ///
1233     /// # Panics
1234     ///
1235     /// Panics if an incorrect [`Receiver`] reference is passed.
1236     ///
1237     /// # Examples
1238     ///
1239     /// ```
1240     /// use crossbeam_channel::{bounded, Select, RecvError};
1241     ///
1242     /// let (s, r) = bounded::<i32>(0);
1243     /// drop(s);
1244     ///
1245     /// let mut sel = Select::new();
1246     /// let oper1 = sel.recv(&r);
1247     ///
1248     /// let oper = sel.select();
1249     /// assert_eq!(oper.index(), oper1);
1250     /// assert_eq!(oper.recv(&r), Err(RecvError));
1251     /// ```
recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError>1252     pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
1253         assert!(
1254             r as *const Receiver<T> as *const u8 == self.ptr,
1255             "passed a receiver that wasn't selected",
1256         );
1257         let res = unsafe { channel::read(r, &mut self.token) };
1258         mem::forget(self);
1259         res.map_err(|_| RecvError)
1260     }
1261 }
1262 
1263 impl fmt::Debug for SelectedOperation<'_> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1264     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1265         f.pad("SelectedOperation { .. }")
1266     }
1267 }
1268 
1269 impl Drop for SelectedOperation<'_> {
drop(&mut self)1270     fn drop(&mut self) {
1271         panic!("dropped `SelectedOperation` without completing the operation");
1272     }
1273 }
1274