1 #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
2 //! # Implementation Details.
3 //!
4 //! The semaphore is implemented using an intrusive linked list of waiters. An
5 //! atomic counter tracks the number of available permits. If the semaphore does
6 //! not contain the required number of permits, the task attempting to acquire
7 //! permits places its waker at the end of a queue. When new permits are made
8 //! available (such as by releasing an initial acquisition), they are assigned
9 //! to the task at the front of the queue, waking that task if its requested
10 //! number of permits is met.
11 //!
12 //! Because waiters are enqueued at the back of the linked list and dequeued
13 //! from the front, the semaphore is fair. Tasks trying to acquire large numbers
14 //! of permits at a time will always be woken eventually, even if many other
15 //! tasks are acquiring smaller numbers of permits. This means that in a
16 //! use-case like tokio's read-write lock, writers will not be starved by
17 //! readers.
18 use crate::loom::cell::UnsafeCell;
19 use crate::loom::sync::atomic::AtomicUsize;
20 use crate::loom::sync::{Mutex, MutexGuard};
21 use crate::util::linked_list::{self, LinkedList};
22 #[cfg(all(tokio_unstable, feature = "tracing"))]
23 use crate::util::trace;
24 use crate::util::WakeList;
25 
26 use std::future::Future;
27 use std::marker::PhantomPinned;
28 use std::pin::Pin;
29 use std::ptr::NonNull;
30 use std::sync::atomic::Ordering::*;
31 use std::task::{ready, Context, Poll, Waker};
32 use std::{cmp, fmt};
33 
34 /// An asynchronous counting semaphore which permits waiting on multiple permits at once.
35 pub(crate) struct Semaphore {
36     waiters: Mutex<Waitlist>,
37     /// The current number of available permits in the semaphore.
38     permits: AtomicUsize,
39     #[cfg(all(tokio_unstable, feature = "tracing"))]
40     resource_span: tracing::Span,
41 }
42 
43 struct Waitlist {
44     queue: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
45     closed: bool,
46 }
47 
48 /// Error returned from the [`Semaphore::try_acquire`] function.
49 ///
50 /// [`Semaphore::try_acquire`]: crate::sync::Semaphore::try_acquire
51 #[derive(Debug, PartialEq, Eq)]
52 pub enum TryAcquireError {
53     /// The semaphore has been [closed] and cannot issue new permits.
54     ///
55     /// [closed]: crate::sync::Semaphore::close
56     Closed,
57 
58     /// The semaphore has no available permits.
59     NoPermits,
60 }
61 /// Error returned from the [`Semaphore::acquire`] function.
62 ///
63 /// An `acquire` operation can only fail if the semaphore has been
64 /// [closed].
65 ///
66 /// [closed]: crate::sync::Semaphore::close
67 /// [`Semaphore::acquire`]: crate::sync::Semaphore::acquire
68 #[derive(Debug)]
69 pub struct AcquireError(());
70 
71 pub(crate) struct Acquire<'a> {
72     node: Waiter,
73     semaphore: &'a Semaphore,
74     num_permits: usize,
75     queued: bool,
76 }
77 
78 /// An entry in the wait queue.
79 struct Waiter {
80     /// The current state of the waiter.
81     ///
82     /// This is either the number of remaining permits required by
83     /// the waiter, or a flag indicating that the waiter is not yet queued.
84     state: AtomicUsize,
85 
86     /// The waker to notify the task awaiting permits.
87     ///
88     /// # Safety
89     ///
90     /// This may only be accessed while the wait queue is locked.
91     waker: UnsafeCell<Option<Waker>>,
92 
93     /// Intrusive linked-list pointers.
94     ///
95     /// # Safety
96     ///
97     /// This may only be accessed while the wait queue is locked.
98     ///
99     /// TODO: Ideally, we would be able to use loom to enforce that
100     /// this isn't accessed concurrently. However, it is difficult to
101     /// use a `UnsafeCell` here, since the `Link` trait requires _returning_
102     /// references to `Pointers`, and `UnsafeCell` requires that checked access
103     /// take place inside a closure. We should consider changing `Pointers` to
104     /// use `UnsafeCell` internally.
105     pointers: linked_list::Pointers<Waiter>,
106 
107     #[cfg(all(tokio_unstable, feature = "tracing"))]
108     ctx: trace::AsyncOpTracingCtx,
109 
110     /// Should not be `Unpin`.
111     _p: PhantomPinned,
112 }
113 
114 generate_addr_of_methods! {
115     impl<> Waiter {
116         unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
117             &self.pointers
118         }
119     }
120 }
121 
122 impl Semaphore {
123     /// The maximum number of permits which a semaphore can hold.
124     ///
125     /// Note that this reserves three bits of flags in the permit counter, but
126     /// we only actually use one of them. However, the previous semaphore
127     /// implementation used three bits, so we will continue to reserve them to
128     /// avoid a breaking change if additional flags need to be added in the
129     /// future.
130     pub(crate) const MAX_PERMITS: usize = usize::MAX >> 3;
131     const CLOSED: usize = 1;
132     // The least-significant bit in the number of permits is reserved to use
133     // as a flag indicating that the semaphore has been closed. Consequently
134     // PERMIT_SHIFT is used to leave that bit for that purpose.
135     const PERMIT_SHIFT: usize = 1;
136 
137     /// Creates a new semaphore with the initial number of permits
138     ///
139     /// Maximum number of permits on 32-bit platforms is `1<<29`.
new(permits: usize) -> Self140     pub(crate) fn new(permits: usize) -> Self {
141         assert!(
142             permits <= Self::MAX_PERMITS,
143             "a semaphore may not have more than MAX_PERMITS permits ({})",
144             Self::MAX_PERMITS
145         );
146 
147         #[cfg(all(tokio_unstable, feature = "tracing"))]
148         let resource_span = {
149             let resource_span = tracing::trace_span!(
150                 parent: None,
151                 "runtime.resource",
152                 concrete_type = "Semaphore",
153                 kind = "Sync",
154                 is_internal = true
155             );
156 
157             resource_span.in_scope(|| {
158                 tracing::trace!(
159                     target: "runtime::resource::state_update",
160                     permits = permits,
161                     permits.op = "override",
162                 )
163             });
164             resource_span
165         };
166 
167         Self {
168             permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
169             waiters: Mutex::new(Waitlist {
170                 queue: LinkedList::new(),
171                 closed: false,
172             }),
173             #[cfg(all(tokio_unstable, feature = "tracing"))]
174             resource_span,
175         }
176     }
177 
178     /// Creates a new semaphore with the initial number of permits.
179     ///
180     /// Maximum number of permits on 32-bit platforms is `1<<29`.
181     #[cfg(not(all(loom, test)))]
const_new(permits: usize) -> Self182     pub(crate) const fn const_new(permits: usize) -> Self {
183         assert!(permits <= Self::MAX_PERMITS);
184 
185         Self {
186             permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
187             waiters: Mutex::const_new(Waitlist {
188                 queue: LinkedList::new(),
189                 closed: false,
190             }),
191             #[cfg(all(tokio_unstable, feature = "tracing"))]
192             resource_span: tracing::Span::none(),
193         }
194     }
195 
196     /// Creates a new closed semaphore with 0 permits.
new_closed() -> Self197     pub(crate) fn new_closed() -> Self {
198         Self {
199             permits: AtomicUsize::new(Self::CLOSED),
200             waiters: Mutex::new(Waitlist {
201                 queue: LinkedList::new(),
202                 closed: true,
203             }),
204             #[cfg(all(tokio_unstable, feature = "tracing"))]
205             resource_span: tracing::Span::none(),
206         }
207     }
208 
209     /// Creates a new closed semaphore with 0 permits.
210     #[cfg(not(all(loom, test)))]
const_new_closed() -> Self211     pub(crate) const fn const_new_closed() -> Self {
212         Self {
213             permits: AtomicUsize::new(Self::CLOSED),
214             waiters: Mutex::const_new(Waitlist {
215                 queue: LinkedList::new(),
216                 closed: true,
217             }),
218             #[cfg(all(tokio_unstable, feature = "tracing"))]
219             resource_span: tracing::Span::none(),
220         }
221     }
222 
223     /// Returns the current number of available permits.
available_permits(&self) -> usize224     pub(crate) fn available_permits(&self) -> usize {
225         self.permits.load(Acquire) >> Self::PERMIT_SHIFT
226     }
227 
228     /// Adds `added` new permits to the semaphore.
229     ///
230     /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded.
release(&self, added: usize)231     pub(crate) fn release(&self, added: usize) {
232         if added == 0 {
233             return;
234         }
235 
236         // Assign permits to the wait queue
237         self.add_permits_locked(added, self.waiters.lock());
238     }
239 
240     /// Closes the semaphore. This prevents the semaphore from issuing new
241     /// permits and notifies all pending waiters.
close(&self)242     pub(crate) fn close(&self) {
243         let mut waiters = self.waiters.lock();
244         // If the semaphore's permits counter has enough permits for an
245         // unqueued waiter to acquire all the permits it needs immediately,
246         // it won't touch the wait list. Therefore, we have to set a bit on
247         // the permit counter as well. However, we must do this while
248         // holding the lock --- otherwise, if we set the bit and then wait
249         // to acquire the lock we'll enter an inconsistent state where the
250         // permit counter is closed, but the wait list is not.
251         self.permits.fetch_or(Self::CLOSED, Release);
252         waiters.closed = true;
253         while let Some(mut waiter) = waiters.queue.pop_back() {
254             let waker = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) };
255             if let Some(waker) = waker {
256                 waker.wake();
257             }
258         }
259     }
260 
261     /// Returns true if the semaphore is closed.
is_closed(&self) -> bool262     pub(crate) fn is_closed(&self) -> bool {
263         self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED
264     }
265 
try_acquire(&self, num_permits: usize) -> Result<(), TryAcquireError>266     pub(crate) fn try_acquire(&self, num_permits: usize) -> Result<(), TryAcquireError> {
267         assert!(
268             num_permits <= Self::MAX_PERMITS,
269             "a semaphore may not have more than MAX_PERMITS permits ({})",
270             Self::MAX_PERMITS
271         );
272         let num_permits = num_permits << Self::PERMIT_SHIFT;
273         let mut curr = self.permits.load(Acquire);
274         loop {
275             // Has the semaphore closed?
276             if curr & Self::CLOSED == Self::CLOSED {
277                 return Err(TryAcquireError::Closed);
278             }
279 
280             // Are there enough permits remaining?
281             if curr < num_permits {
282                 return Err(TryAcquireError::NoPermits);
283             }
284 
285             let next = curr - num_permits;
286 
287             match self.permits.compare_exchange(curr, next, AcqRel, Acquire) {
288                 Ok(_) => {
289                     // TODO: Instrument once issue has been solved
290                     return Ok(());
291                 }
292                 Err(actual) => curr = actual,
293             }
294         }
295     }
296 
acquire(&self, num_permits: usize) -> Acquire<'_>297     pub(crate) fn acquire(&self, num_permits: usize) -> Acquire<'_> {
298         Acquire::new(self, num_permits)
299     }
300 
301     /// Release `rem` permits to the semaphore's wait list, starting from the
302     /// end of the queue.
303     ///
304     /// If `rem` exceeds the number of permits needed by the wait list, the
305     /// remainder are assigned back to the semaphore.
add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>)306     fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) {
307         let mut wakers = WakeList::new();
308         let mut lock = Some(waiters);
309         let mut is_empty = false;
310         while rem > 0 {
311             let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock());
312             'inner: while wakers.can_push() {
313                 // Was the waiter assigned enough permits to wake it?
314                 match waiters.queue.last() {
315                     Some(waiter) => {
316                         if !waiter.assign_permits(&mut rem) {
317                             break 'inner;
318                         }
319                     }
320                     None => {
321                         is_empty = true;
322                         // If we assigned permits to all the waiters in the queue, and there are
323                         // still permits left over, assign them back to the semaphore.
324                         break 'inner;
325                     }
326                 };
327                 let mut waiter = waiters.queue.pop_back().unwrap();
328                 if let Some(waker) =
329                     unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }
330                 {
331                     wakers.push(waker);
332                 }
333             }
334 
335             if rem > 0 && is_empty {
336                 let permits = rem;
337                 assert!(
338                     permits <= Self::MAX_PERMITS,
339                     "cannot add more than MAX_PERMITS permits ({})",
340                     Self::MAX_PERMITS
341                 );
342                 let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release);
343                 let prev = prev >> Self::PERMIT_SHIFT;
344                 assert!(
345                     prev + permits <= Self::MAX_PERMITS,
346                     "number of added permits ({}) would overflow MAX_PERMITS ({})",
347                     rem,
348                     Self::MAX_PERMITS
349                 );
350 
351                 // add remaining permits back
352                 #[cfg(all(tokio_unstable, feature = "tracing"))]
353                 self.resource_span.in_scope(|| {
354                     tracing::trace!(
355                     target: "runtime::resource::state_update",
356                     permits = rem,
357                     permits.op = "add",
358                     )
359                 });
360 
361                 rem = 0;
362             }
363 
364             drop(waiters); // release the lock
365 
366             wakers.wake_all();
367         }
368 
369         assert_eq!(rem, 0);
370     }
371 
372     /// Decrease a semaphore's permits by a maximum of `n`.
373     ///
374     /// If there are insufficient permits and it's not possible to reduce by `n`,
375     /// return the number of permits that were actually reduced.
forget_permits(&self, n: usize) -> usize376     pub(crate) fn forget_permits(&self, n: usize) -> usize {
377         if n == 0 {
378             return 0;
379         }
380 
381         let mut curr_bits = self.permits.load(Acquire);
382         loop {
383             let curr = curr_bits >> Self::PERMIT_SHIFT;
384             let new = curr.saturating_sub(n);
385             match self.permits.compare_exchange_weak(
386                 curr_bits,
387                 new << Self::PERMIT_SHIFT,
388                 AcqRel,
389                 Acquire,
390             ) {
391                 Ok(_) => return std::cmp::min(curr, n),
392                 Err(actual) => curr_bits = actual,
393             };
394         }
395     }
396 
poll_acquire( &self, cx: &mut Context<'_>, num_permits: usize, node: Pin<&mut Waiter>, queued: bool, ) -> Poll<Result<(), AcquireError>>397     fn poll_acquire(
398         &self,
399         cx: &mut Context<'_>,
400         num_permits: usize,
401         node: Pin<&mut Waiter>,
402         queued: bool,
403     ) -> Poll<Result<(), AcquireError>> {
404         let mut acquired = 0;
405 
406         let needed = if queued {
407             node.state.load(Acquire) << Self::PERMIT_SHIFT
408         } else {
409             num_permits << Self::PERMIT_SHIFT
410         };
411 
412         let mut lock = None;
413         // First, try to take the requested number of permits from the
414         // semaphore.
415         let mut curr = self.permits.load(Acquire);
416         let mut waiters = loop {
417             // Has the semaphore closed?
418             if curr & Self::CLOSED > 0 {
419                 return Poll::Ready(Err(AcquireError::closed()));
420             }
421 
422             let mut remaining = 0;
423             let total = curr
424                 .checked_add(acquired)
425                 .expect("number of permits must not overflow");
426             let (next, acq) = if total >= needed {
427                 let next = curr - (needed - acquired);
428                 (next, needed >> Self::PERMIT_SHIFT)
429             } else {
430                 remaining = (needed - acquired) - curr;
431                 (0, curr >> Self::PERMIT_SHIFT)
432             };
433 
434             if remaining > 0 && lock.is_none() {
435                 // No permits were immediately available, so this permit will
436                 // (probably) need to wait. We'll need to acquire a lock on the
437                 // wait queue before continuing. We need to do this _before_ the
438                 // CAS that sets the new value of the semaphore's `permits`
439                 // counter. Otherwise, if we subtract the permits and then
440                 // acquire the lock, we might miss additional permits being
441                 // added while waiting for the lock.
442                 lock = Some(self.waiters.lock());
443             }
444 
445             match self.permits.compare_exchange(curr, next, AcqRel, Acquire) {
446                 Ok(_) => {
447                     acquired += acq;
448                     if remaining == 0 {
449                         if !queued {
450                             #[cfg(all(tokio_unstable, feature = "tracing"))]
451                             self.resource_span.in_scope(|| {
452                                 tracing::trace!(
453                                     target: "runtime::resource::state_update",
454                                     permits = acquired,
455                                     permits.op = "sub",
456                                 );
457                                 tracing::trace!(
458                                     target: "runtime::resource::async_op::state_update",
459                                     permits_obtained = acquired,
460                                     permits.op = "add",
461                                 )
462                             });
463 
464                             return Poll::Ready(Ok(()));
465                         } else if lock.is_none() {
466                             break self.waiters.lock();
467                         }
468                     }
469                     break lock.expect("lock must be acquired before waiting");
470                 }
471                 Err(actual) => curr = actual,
472             }
473         };
474 
475         if waiters.closed {
476             return Poll::Ready(Err(AcquireError::closed()));
477         }
478 
479         #[cfg(all(tokio_unstable, feature = "tracing"))]
480         self.resource_span.in_scope(|| {
481             tracing::trace!(
482                 target: "runtime::resource::state_update",
483                 permits = acquired,
484                 permits.op = "sub",
485             )
486         });
487 
488         if node.assign_permits(&mut acquired) {
489             self.add_permits_locked(acquired, waiters);
490             return Poll::Ready(Ok(()));
491         }
492 
493         assert_eq!(acquired, 0);
494         let mut old_waker = None;
495 
496         // Otherwise, register the waker & enqueue the node.
497         node.waker.with_mut(|waker| {
498             // Safety: the wait list is locked, so we may modify the waker.
499             let waker = unsafe { &mut *waker };
500             // Do we need to register the new waker?
501             if waker
502                 .as_ref()
503                 .map_or(true, |waker| !waker.will_wake(cx.waker()))
504             {
505                 old_waker = std::mem::replace(waker, Some(cx.waker().clone()));
506             }
507         });
508 
509         // If the waiter is not already in the wait queue, enqueue it.
510         if !queued {
511             let node = unsafe {
512                 let node = Pin::into_inner_unchecked(node) as *mut _;
513                 NonNull::new_unchecked(node)
514             };
515 
516             waiters.queue.push_front(node);
517         }
518         drop(waiters);
519         drop(old_waker);
520 
521         Poll::Pending
522     }
523 }
524 
525 impl fmt::Debug for Semaphore {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result526     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
527         fmt.debug_struct("Semaphore")
528             .field("permits", &self.available_permits())
529             .finish()
530     }
531 }
532 
533 impl Waiter {
new( num_permits: usize, #[cfg(all(tokio_unstable, feature = "tracing"))] ctx: trace::AsyncOpTracingCtx, ) -> Self534     fn new(
535         num_permits: usize,
536         #[cfg(all(tokio_unstable, feature = "tracing"))] ctx: trace::AsyncOpTracingCtx,
537     ) -> Self {
538         Waiter {
539             waker: UnsafeCell::new(None),
540             state: AtomicUsize::new(num_permits),
541             pointers: linked_list::Pointers::new(),
542             #[cfg(all(tokio_unstable, feature = "tracing"))]
543             ctx,
544             _p: PhantomPinned,
545         }
546     }
547 
548     /// Assign permits to the waiter.
549     ///
550     /// Returns `true` if the waiter should be removed from the queue
assign_permits(&self, n: &mut usize) -> bool551     fn assign_permits(&self, n: &mut usize) -> bool {
552         let mut curr = self.state.load(Acquire);
553         loop {
554             let assign = cmp::min(curr, *n);
555             let next = curr - assign;
556             match self.state.compare_exchange(curr, next, AcqRel, Acquire) {
557                 Ok(_) => {
558                     *n -= assign;
559                     #[cfg(all(tokio_unstable, feature = "tracing"))]
560                     self.ctx.async_op_span.in_scope(|| {
561                         tracing::trace!(
562                             target: "runtime::resource::async_op::state_update",
563                             permits_obtained = assign,
564                             permits.op = "add",
565                         );
566                     });
567                     return next == 0;
568                 }
569                 Err(actual) => curr = actual,
570             }
571         }
572     }
573 }
574 
575 impl Future for Acquire<'_> {
576     type Output = Result<(), AcquireError>;
577 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>578     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
579         ready!(crate::trace::trace_leaf(cx));
580 
581         #[cfg(all(tokio_unstable, feature = "tracing"))]
582         let _resource_span = self.node.ctx.resource_span.clone().entered();
583         #[cfg(all(tokio_unstable, feature = "tracing"))]
584         let _async_op_span = self.node.ctx.async_op_span.clone().entered();
585         #[cfg(all(tokio_unstable, feature = "tracing"))]
586         let _async_op_poll_span = self.node.ctx.async_op_poll_span.clone().entered();
587 
588         let (node, semaphore, needed, queued) = self.project();
589 
590         // First, ensure the current task has enough budget to proceed.
591         #[cfg(all(tokio_unstable, feature = "tracing"))]
592         let coop = ready!(trace_poll_op!(
593             "poll_acquire",
594             crate::runtime::coop::poll_proceed(cx),
595         ));
596 
597         #[cfg(not(all(tokio_unstable, feature = "tracing")))]
598         let coop = ready!(crate::runtime::coop::poll_proceed(cx));
599 
600         let result = match semaphore.poll_acquire(cx, needed, node, *queued) {
601             Poll::Pending => {
602                 *queued = true;
603                 Poll::Pending
604             }
605             Poll::Ready(r) => {
606                 coop.made_progress();
607                 r?;
608                 *queued = false;
609                 Poll::Ready(Ok(()))
610             }
611         };
612 
613         #[cfg(all(tokio_unstable, feature = "tracing"))]
614         return trace_poll_op!("poll_acquire", result);
615 
616         #[cfg(not(all(tokio_unstable, feature = "tracing")))]
617         return result;
618     }
619 }
620 
621 impl<'a> Acquire<'a> {
new(semaphore: &'a Semaphore, num_permits: usize) -> Self622     fn new(semaphore: &'a Semaphore, num_permits: usize) -> Self {
623         #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
624         return Self {
625             node: Waiter::new(num_permits),
626             semaphore,
627             num_permits,
628             queued: false,
629         };
630 
631         #[cfg(all(tokio_unstable, feature = "tracing"))]
632         return semaphore.resource_span.in_scope(|| {
633             let async_op_span =
634                 tracing::trace_span!("runtime.resource.async_op", source = "Acquire::new");
635             let async_op_poll_span = async_op_span.in_scope(|| {
636                 tracing::trace!(
637                     target: "runtime::resource::async_op::state_update",
638                     permits_requested = num_permits,
639                     permits.op = "override",
640                 );
641 
642                 tracing::trace!(
643                     target: "runtime::resource::async_op::state_update",
644                     permits_obtained = 0usize,
645                     permits.op = "override",
646                 );
647 
648                 tracing::trace_span!("runtime.resource.async_op.poll")
649             });
650 
651             let ctx = trace::AsyncOpTracingCtx {
652                 async_op_span,
653                 async_op_poll_span,
654                 resource_span: semaphore.resource_span.clone(),
655             };
656 
657             Self {
658                 node: Waiter::new(num_permits, ctx),
659                 semaphore,
660                 num_permits,
661                 queued: false,
662             }
663         });
664     }
665 
project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, usize, &mut bool)666     fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, usize, &mut bool) {
667         fn is_unpin<T: Unpin>() {}
668         unsafe {
669             // Safety: all fields other than `node` are `Unpin`
670 
671             is_unpin::<&Semaphore>();
672             is_unpin::<&mut bool>();
673             is_unpin::<usize>();
674 
675             let this = self.get_unchecked_mut();
676             (
677                 Pin::new_unchecked(&mut this.node),
678                 this.semaphore,
679                 this.num_permits,
680                 &mut this.queued,
681             )
682         }
683     }
684 }
685 
686 impl Drop for Acquire<'_> {
drop(&mut self)687     fn drop(&mut self) {
688         // If the future is completed, there is no node in the wait list, so we
689         // can skip acquiring the lock.
690         if !self.queued {
691             return;
692         }
693 
694         // This is where we ensure safety. The future is being dropped,
695         // which means we must ensure that the waiter entry is no longer stored
696         // in the linked list.
697         let mut waiters = self.semaphore.waiters.lock();
698 
699         // remove the entry from the list
700         let node = NonNull::from(&mut self.node);
701         // Safety: we have locked the wait list.
702         unsafe { waiters.queue.remove(node) };
703 
704         let acquired_permits = self.num_permits - self.node.state.load(Acquire);
705         if acquired_permits > 0 {
706             self.semaphore.add_permits_locked(acquired_permits, waiters);
707         }
708     }
709 }
710 
711 // Safety: the `Acquire` future is not `Sync` automatically because it contains
712 // a `Waiter`, which, in turn, contains an `UnsafeCell`. However, the
713 // `UnsafeCell` is only accessed when the future is borrowed mutably (either in
714 // `poll` or in `drop`). Therefore, it is safe (although not particularly
715 // _useful_) for the future to be borrowed immutably across threads.
716 unsafe impl Sync for Acquire<'_> {}
717 
718 // ===== impl AcquireError ====
719 
720 impl AcquireError {
closed() -> AcquireError721     fn closed() -> AcquireError {
722         AcquireError(())
723     }
724 }
725 
726 impl fmt::Display for AcquireError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result727     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
728         write!(fmt, "semaphore closed")
729     }
730 }
731 
732 impl std::error::Error for AcquireError {}
733 
734 // ===== impl TryAcquireError =====
735 
736 impl TryAcquireError {
737     /// Returns `true` if the error was caused by a closed semaphore.
738     #[allow(dead_code)] // may be used later!
is_closed(&self) -> bool739     pub(crate) fn is_closed(&self) -> bool {
740         matches!(self, TryAcquireError::Closed)
741     }
742 
743     /// Returns `true` if the error was caused by calling `try_acquire` on a
744     /// semaphore with no available permits.
745     #[allow(dead_code)] // may be used later!
is_no_permits(&self) -> bool746     pub(crate) fn is_no_permits(&self) -> bool {
747         matches!(self, TryAcquireError::NoPermits)
748     }
749 }
750 
751 impl fmt::Display for TryAcquireError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result752     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
753         match self {
754             TryAcquireError::Closed => write!(fmt, "semaphore closed"),
755             TryAcquireError::NoPermits => write!(fmt, "no permits available"),
756         }
757     }
758 }
759 
760 impl std::error::Error for TryAcquireError {}
761 
762 /// # Safety
763 ///
764 /// `Waiter` is forced to be !Unpin.
765 unsafe impl linked_list::Link for Waiter {
766     type Handle = NonNull<Waiter>;
767     type Target = Waiter;
768 
as_raw(handle: &Self::Handle) -> NonNull<Waiter>769     fn as_raw(handle: &Self::Handle) -> NonNull<Waiter> {
770         *handle
771     }
772 
from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>773     unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
774         ptr
775     }
776 
pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>777     unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
778         Waiter::addr_of_pointers(target)
779     }
780 }
781