1 use core::iter::FromIterator;
2 use core::mem::{self, ManuallyDrop};
3 use core::ops::{Deref, RangeBounds};
4 use core::ptr::NonNull;
5 use core::{cmp, fmt, hash, ptr, slice, usize};
6 
7 use alloc::{
8     alloc::{dealloc, Layout},
9     borrow::Borrow,
10     boxed::Box,
11     string::String,
12     vec::Vec,
13 };
14 
15 use crate::buf::IntoIter;
16 #[allow(unused)]
17 use crate::loom::sync::atomic::AtomicMut;
18 use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
19 use crate::{offset_from, Buf, BytesMut};
20 
21 /// A cheaply cloneable and sliceable chunk of contiguous memory.
22 ///
23 /// `Bytes` is an efficient container for storing and operating on contiguous
24 /// slices of memory. It is intended for use primarily in networking code, but
25 /// could have applications elsewhere as well.
26 ///
27 /// `Bytes` values facilitate zero-copy network programming by allowing multiple
28 /// `Bytes` objects to point to the same underlying memory.
29 ///
30 /// `Bytes` does not have a single implementation. It is an interface, whose
31 /// exact behavior is implemented through dynamic dispatch in several underlying
32 /// implementations of `Bytes`.
33 ///
34 /// All `Bytes` implementations must fulfill the following requirements:
35 /// - They are cheaply cloneable and thereby shareable between an unlimited amount
36 ///   of components, for example by modifying a reference count.
37 /// - Instances can be sliced to refer to a subset of the original buffer.
38 ///
39 /// ```
40 /// use bytes::Bytes;
41 ///
42 /// let mut mem = Bytes::from("Hello world");
43 /// let a = mem.slice(0..5);
44 ///
45 /// assert_eq!(a, "Hello");
46 ///
47 /// let b = mem.split_to(6);
48 ///
49 /// assert_eq!(mem, "world");
50 /// assert_eq!(b, "Hello ");
51 /// ```
52 ///
53 /// # Memory layout
54 ///
55 /// The `Bytes` struct itself is fairly small, limited to 4 `usize` fields used
56 /// to track information about which segment of the underlying memory the
57 /// `Bytes` handle has access to.
58 ///
59 /// `Bytes` keeps both a pointer to the shared state containing the full memory
60 /// slice and a pointer to the start of the region visible by the handle.
61 /// `Bytes` also tracks the length of its view into the memory.
62 ///
63 /// # Sharing
64 ///
65 /// `Bytes` contains a vtable, which allows implementations of `Bytes` to define
66 /// how sharing/cloning is implemented in detail.
67 /// When `Bytes::clone()` is called, `Bytes` will call the vtable function for
68 /// cloning the backing storage in order to share it behind multiple `Bytes`
69 /// instances.
70 ///
71 /// For `Bytes` implementations which refer to constant memory (e.g. created
72 /// via `Bytes::from_static()`) the cloning implementation will be a no-op.
73 ///
74 /// For `Bytes` implementations which point to a reference counted shared storage
75 /// (e.g. an `Arc<[u8]>`), sharing will be implemented by increasing the
76 /// reference count.
77 ///
78 /// Due to this mechanism, multiple `Bytes` instances may point to the same
79 /// shared memory region.
80 /// Each `Bytes` instance can point to different sections within that
81 /// memory region, and `Bytes` instances may or may not have overlapping views
82 /// into the memory.
83 ///
84 /// The following diagram visualizes a scenario where 2 `Bytes` instances make
85 /// use of an `Arc`-based backing storage, and provide access to different views:
86 ///
87 /// ```text
88 ///
89 ///    Arc ptrs                   ┌─────────┐
90 ///    ________________________ / │ Bytes 2 │
91 ///   /                           └─────────┘
92 ///  /          ┌───────────┐     |         |
93 /// |_________/ │  Bytes 1  │     |         |
94 /// |           └───────────┘     |         |
95 /// |           |           | ___/ data     | tail
96 /// |      data |      tail |/              |
97 /// v           v           v               v
98 /// ┌─────┬─────┬───────────┬───────────────┬─────┐
99 /// │ Arc │     │           │               │     │
100 /// └─────┴─────┴───────────┴───────────────┴─────┘
101 /// ```
102 pub struct Bytes {
103     ptr: *const u8,
104     len: usize,
105     // inlined "trait object"
106     data: AtomicPtr<()>,
107     vtable: &'static Vtable,
108 }
109 
110 pub(crate) struct Vtable {
111     /// fn(data, ptr, len)
112     pub clone: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> Bytes,
113     /// fn(data, ptr, len)
114     ///
115     /// takes `Bytes` to value
116     pub to_vec: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> Vec<u8>,
117     pub to_mut: unsafe fn(&AtomicPtr<()>, *const u8, usize) -> BytesMut,
118     /// fn(data)
119     pub is_unique: unsafe fn(&AtomicPtr<()>) -> bool,
120     /// fn(data, ptr, len)
121     pub drop: unsafe fn(&mut AtomicPtr<()>, *const u8, usize),
122 }
123 
124 impl Bytes {
125     /// Creates a new empty `Bytes`.
126     ///
127     /// This will not allocate and the returned `Bytes` handle will be empty.
128     ///
129     /// # Examples
130     ///
131     /// ```
132     /// use bytes::Bytes;
133     ///
134     /// let b = Bytes::new();
135     /// assert_eq!(&b[..], b"");
136     /// ```
137     #[inline]
138     #[cfg(not(all(loom, test)))]
new() -> Self139     pub const fn new() -> Self {
140         // Make it a named const to work around
141         // "unsizing casts are not allowed in const fn"
142         const EMPTY: &[u8] = &[];
143         Bytes::from_static(EMPTY)
144     }
145 
146     /// Creates a new empty `Bytes`.
147     #[cfg(all(loom, test))]
new() -> Self148     pub fn new() -> Self {
149         const EMPTY: &[u8] = &[];
150         Bytes::from_static(EMPTY)
151     }
152 
153     /// Creates a new `Bytes` from a static slice.
154     ///
155     /// The returned `Bytes` will point directly to the static slice. There is
156     /// no allocating or copying.
157     ///
158     /// # Examples
159     ///
160     /// ```
161     /// use bytes::Bytes;
162     ///
163     /// let b = Bytes::from_static(b"hello");
164     /// assert_eq!(&b[..], b"hello");
165     /// ```
166     #[inline]
167     #[cfg(not(all(loom, test)))]
from_static(bytes: &'static [u8]) -> Self168     pub const fn from_static(bytes: &'static [u8]) -> Self {
169         Bytes {
170             ptr: bytes.as_ptr(),
171             len: bytes.len(),
172             data: AtomicPtr::new(ptr::null_mut()),
173             vtable: &STATIC_VTABLE,
174         }
175     }
176 
177     /// Creates a new `Bytes` from a static slice.
178     #[cfg(all(loom, test))]
from_static(bytes: &'static [u8]) -> Self179     pub fn from_static(bytes: &'static [u8]) -> Self {
180         Bytes {
181             ptr: bytes.as_ptr(),
182             len: bytes.len(),
183             data: AtomicPtr::new(ptr::null_mut()),
184             vtable: &STATIC_VTABLE,
185         }
186     }
187 
188     /// Creates a new `Bytes` with length zero and the given pointer as the address.
new_empty_with_ptr(ptr: *const u8) -> Self189     fn new_empty_with_ptr(ptr: *const u8) -> Self {
190         debug_assert!(!ptr.is_null());
191 
192         // Detach this pointer's provenance from whichever allocation it came from, and reattach it
193         // to the provenance of the fake ZST [u8;0] at the same address.
194         let ptr = without_provenance(ptr as usize);
195 
196         Bytes {
197             ptr,
198             len: 0,
199             data: AtomicPtr::new(ptr::null_mut()),
200             vtable: &STATIC_VTABLE,
201         }
202     }
203 
204     /// Create [Bytes] with a buffer whose lifetime is controlled
205     /// via an explicit owner.
206     ///
207     /// A common use case is to zero-copy construct from mapped memory.
208     ///
209     /// ```
210     /// # struct File;
211     /// #
212     /// # impl File {
213     /// #     pub fn open(_: &str) -> Result<Self, ()> {
214     /// #         Ok(Self)
215     /// #     }
216     /// # }
217     /// #
218     /// # mod memmap2 {
219     /// #     pub struct Mmap;
220     /// #
221     /// #     impl Mmap {
222     /// #         pub unsafe fn map(_file: &super::File) -> Result<Self, ()> {
223     /// #             Ok(Self)
224     /// #         }
225     /// #     }
226     /// #
227     /// #     impl AsRef<[u8]> for Mmap {
228     /// #         fn as_ref(&self) -> &[u8] {
229     /// #             b"buf"
230     /// #         }
231     /// #     }
232     /// # }
233     /// use bytes::Bytes;
234     /// use memmap2::Mmap;
235     ///
236     /// # fn main() -> Result<(), ()> {
237     /// let file = File::open("upload_bundle.tar.gz")?;
238     /// let mmap = unsafe { Mmap::map(&file) }?;
239     /// let b = Bytes::from_owner(mmap);
240     /// # Ok(())
241     /// # }
242     /// ```
243     ///
244     /// The `owner` will be transferred to the constructed [Bytes] object, which
245     /// will ensure it is dropped once all remaining clones of the constructed
246     /// object are dropped. The owner will then be responsible for dropping the
247     /// specified region of memory as part of its [Drop] implementation.
248     ///
249     /// Note that converting [Bytes] constructed from an owner into a [BytesMut]
250     /// will always create a deep copy of the buffer into newly allocated memory.
from_owner<T>(owner: T) -> Self where T: AsRef<[u8]> + Send + 'static,251     pub fn from_owner<T>(owner: T) -> Self
252     where
253         T: AsRef<[u8]> + Send + 'static,
254     {
255         // Safety & Miri:
256         // The ownership of `owner` is first transferred to the `Owned` wrapper and `Bytes` object.
257         // This ensures that the owner is pinned in memory, allowing us to call `.as_ref()` safely
258         // since the lifetime of the owner is controlled by the lifetime of the new `Bytes` object,
259         // and the lifetime of the resulting borrowed `&[u8]` matches that of the owner.
260         // Note that this remains safe so long as we only call `.as_ref()` once.
261         //
262         // There are some additional special considerations here:
263         //   * We rely on Bytes's Drop impl to clean up memory should `.as_ref()` panic.
264         //   * Setting the `ptr` and `len` on the bytes object last (after moving the owner to
265         //     Bytes) allows Miri checks to pass since it avoids obtaining the `&[u8]` slice
266         //     from a stack-owned Box.
267         // More details on this: https://github.com/tokio-rs/bytes/pull/742/#discussion_r1813375863
268         //                  and: https://github.com/tokio-rs/bytes/pull/742/#discussion_r1813316032
269 
270         let owned = Box::into_raw(Box::new(Owned {
271             lifetime: OwnedLifetime {
272                 ref_cnt: AtomicUsize::new(1),
273                 drop: owned_box_and_drop::<T>,
274             },
275             owner,
276         }));
277 
278         let mut ret = Bytes {
279             ptr: NonNull::dangling().as_ptr(),
280             len: 0,
281             data: AtomicPtr::new(owned.cast()),
282             vtable: &OWNED_VTABLE,
283         };
284 
285         let buf = unsafe { &*owned }.owner.as_ref();
286         ret.ptr = buf.as_ptr();
287         ret.len = buf.len();
288 
289         ret
290     }
291 
292     /// Returns the number of bytes contained in this `Bytes`.
293     ///
294     /// # Examples
295     ///
296     /// ```
297     /// use bytes::Bytes;
298     ///
299     /// let b = Bytes::from(&b"hello"[..]);
300     /// assert_eq!(b.len(), 5);
301     /// ```
302     #[inline]
len(&self) -> usize303     pub const fn len(&self) -> usize {
304         self.len
305     }
306 
307     /// Returns true if the `Bytes` has a length of 0.
308     ///
309     /// # Examples
310     ///
311     /// ```
312     /// use bytes::Bytes;
313     ///
314     /// let b = Bytes::new();
315     /// assert!(b.is_empty());
316     /// ```
317     #[inline]
is_empty(&self) -> bool318     pub const fn is_empty(&self) -> bool {
319         self.len == 0
320     }
321 
322     /// Returns true if this is the only reference to the data and
323     /// `Into<BytesMut>` would avoid cloning the underlying buffer.
324     ///
325     /// Always returns false if the data is backed by a [static slice](Bytes::from_static),
326     /// or an [owner](Bytes::from_owner).
327     ///
328     /// The result of this method may be invalidated immediately if another
329     /// thread clones this value while this is being called. Ensure you have
330     /// unique access to this value (`&mut Bytes`) first if you need to be
331     /// certain the result is valid (i.e. for safety reasons).
332     /// # Examples
333     ///
334     /// ```
335     /// use bytes::Bytes;
336     ///
337     /// let a = Bytes::from(vec![1, 2, 3]);
338     /// assert!(a.is_unique());
339     /// let b = a.clone();
340     /// assert!(!a.is_unique());
341     /// ```
is_unique(&self) -> bool342     pub fn is_unique(&self) -> bool {
343         unsafe { (self.vtable.is_unique)(&self.data) }
344     }
345 
346     /// Creates `Bytes` instance from slice, by copying it.
copy_from_slice(data: &[u8]) -> Self347     pub fn copy_from_slice(data: &[u8]) -> Self {
348         data.to_vec().into()
349     }
350 
351     /// Returns a slice of self for the provided range.
352     ///
353     /// This will increment the reference count for the underlying memory and
354     /// return a new `Bytes` handle set to the slice.
355     ///
356     /// This operation is `O(1)`.
357     ///
358     /// # Examples
359     ///
360     /// ```
361     /// use bytes::Bytes;
362     ///
363     /// let a = Bytes::from(&b"hello world"[..]);
364     /// let b = a.slice(2..5);
365     ///
366     /// assert_eq!(&b[..], b"llo");
367     /// ```
368     ///
369     /// # Panics
370     ///
371     /// Requires that `begin <= end` and `end <= self.len()`, otherwise slicing
372     /// will panic.
slice(&self, range: impl RangeBounds<usize>) -> Self373     pub fn slice(&self, range: impl RangeBounds<usize>) -> Self {
374         use core::ops::Bound;
375 
376         let len = self.len();
377 
378         let begin = match range.start_bound() {
379             Bound::Included(&n) => n,
380             Bound::Excluded(&n) => n.checked_add(1).expect("out of range"),
381             Bound::Unbounded => 0,
382         };
383 
384         let end = match range.end_bound() {
385             Bound::Included(&n) => n.checked_add(1).expect("out of range"),
386             Bound::Excluded(&n) => n,
387             Bound::Unbounded => len,
388         };
389 
390         assert!(
391             begin <= end,
392             "range start must not be greater than end: {:?} <= {:?}",
393             begin,
394             end,
395         );
396         assert!(
397             end <= len,
398             "range end out of bounds: {:?} <= {:?}",
399             end,
400             len,
401         );
402 
403         if end == begin {
404             return Bytes::new();
405         }
406 
407         let mut ret = self.clone();
408 
409         ret.len = end - begin;
410         ret.ptr = unsafe { ret.ptr.add(begin) };
411 
412         ret
413     }
414 
415     /// Returns a slice of self that is equivalent to the given `subset`.
416     ///
417     /// When processing a `Bytes` buffer with other tools, one often gets a
418     /// `&[u8]` which is in fact a slice of the `Bytes`, i.e. a subset of it.
419     /// This function turns that `&[u8]` into another `Bytes`, as if one had
420     /// called `self.slice()` with the offsets that correspond to `subset`.
421     ///
422     /// This operation is `O(1)`.
423     ///
424     /// # Examples
425     ///
426     /// ```
427     /// use bytes::Bytes;
428     ///
429     /// let bytes = Bytes::from(&b"012345678"[..]);
430     /// let as_slice = bytes.as_ref();
431     /// let subset = &as_slice[2..6];
432     /// let subslice = bytes.slice_ref(&subset);
433     /// assert_eq!(&subslice[..], b"2345");
434     /// ```
435     ///
436     /// # Panics
437     ///
438     /// Requires that the given `sub` slice is in fact contained within the
439     /// `Bytes` buffer; otherwise this function will panic.
slice_ref(&self, subset: &[u8]) -> Self440     pub fn slice_ref(&self, subset: &[u8]) -> Self {
441         // Empty slice and empty Bytes may have their pointers reset
442         // so explicitly allow empty slice to be a subslice of any slice.
443         if subset.is_empty() {
444             return Bytes::new();
445         }
446 
447         let bytes_p = self.as_ptr() as usize;
448         let bytes_len = self.len();
449 
450         let sub_p = subset.as_ptr() as usize;
451         let sub_len = subset.len();
452 
453         assert!(
454             sub_p >= bytes_p,
455             "subset pointer ({:p}) is smaller than self pointer ({:p})",
456             subset.as_ptr(),
457             self.as_ptr(),
458         );
459         assert!(
460             sub_p + sub_len <= bytes_p + bytes_len,
461             "subset is out of bounds: self = ({:p}, {}), subset = ({:p}, {})",
462             self.as_ptr(),
463             bytes_len,
464             subset.as_ptr(),
465             sub_len,
466         );
467 
468         let sub_offset = sub_p - bytes_p;
469 
470         self.slice(sub_offset..(sub_offset + sub_len))
471     }
472 
473     /// Splits the bytes into two at the given index.
474     ///
475     /// Afterwards `self` contains elements `[0, at)`, and the returned `Bytes`
476     /// contains elements `[at, len)`. It's guaranteed that the memory does not
477     /// move, that is, the address of `self` does not change, and the address of
478     /// the returned slice is `at` bytes after that.
479     ///
480     /// This is an `O(1)` operation that just increases the reference count and
481     /// sets a few indices.
482     ///
483     /// # Examples
484     ///
485     /// ```
486     /// use bytes::Bytes;
487     ///
488     /// let mut a = Bytes::from(&b"hello world"[..]);
489     /// let b = a.split_off(5);
490     ///
491     /// assert_eq!(&a[..], b"hello");
492     /// assert_eq!(&b[..], b" world");
493     /// ```
494     ///
495     /// # Panics
496     ///
497     /// Panics if `at > len`.
498     #[must_use = "consider Bytes::truncate if you don't need the other half"]
split_off(&mut self, at: usize) -> Self499     pub fn split_off(&mut self, at: usize) -> Self {
500         if at == self.len() {
501             return Bytes::new_empty_with_ptr(self.ptr.wrapping_add(at));
502         }
503 
504         if at == 0 {
505             return mem::replace(self, Bytes::new_empty_with_ptr(self.ptr));
506         }
507 
508         assert!(
509             at <= self.len(),
510             "split_off out of bounds: {:?} <= {:?}",
511             at,
512             self.len(),
513         );
514 
515         let mut ret = self.clone();
516 
517         self.len = at;
518 
519         unsafe { ret.inc_start(at) };
520 
521         ret
522     }
523 
524     /// Splits the bytes into two at the given index.
525     ///
526     /// Afterwards `self` contains elements `[at, len)`, and the returned
527     /// `Bytes` contains elements `[0, at)`.
528     ///
529     /// This is an `O(1)` operation that just increases the reference count and
530     /// sets a few indices.
531     ///
532     /// # Examples
533     ///
534     /// ```
535     /// use bytes::Bytes;
536     ///
537     /// let mut a = Bytes::from(&b"hello world"[..]);
538     /// let b = a.split_to(5);
539     ///
540     /// assert_eq!(&a[..], b" world");
541     /// assert_eq!(&b[..], b"hello");
542     /// ```
543     ///
544     /// # Panics
545     ///
546     /// Panics if `at > len`.
547     #[must_use = "consider Bytes::advance if you don't need the other half"]
split_to(&mut self, at: usize) -> Self548     pub fn split_to(&mut self, at: usize) -> Self {
549         if at == self.len() {
550             let end_ptr = self.ptr.wrapping_add(at);
551             return mem::replace(self, Bytes::new_empty_with_ptr(end_ptr));
552         }
553 
554         if at == 0 {
555             return Bytes::new_empty_with_ptr(self.ptr);
556         }
557 
558         assert!(
559             at <= self.len(),
560             "split_to out of bounds: {:?} <= {:?}",
561             at,
562             self.len(),
563         );
564 
565         let mut ret = self.clone();
566 
567         unsafe { self.inc_start(at) };
568 
569         ret.len = at;
570         ret
571     }
572 
573     /// Shortens the buffer, keeping the first `len` bytes and dropping the
574     /// rest.
575     ///
576     /// If `len` is greater than the buffer's current length, this has no
577     /// effect.
578     ///
579     /// The [split_off](`Self::split_off()`) method can emulate `truncate`, but this causes the
580     /// excess bytes to be returned instead of dropped.
581     ///
582     /// # Examples
583     ///
584     /// ```
585     /// use bytes::Bytes;
586     ///
587     /// let mut buf = Bytes::from(&b"hello world"[..]);
588     /// buf.truncate(5);
589     /// assert_eq!(buf, b"hello"[..]);
590     /// ```
591     #[inline]
truncate(&mut self, len: usize)592     pub fn truncate(&mut self, len: usize) {
593         if len < self.len {
594             // The Vec "promotable" vtables do not store the capacity,
595             // so we cannot truncate while using this repr. We *have* to
596             // promote using `split_off` so the capacity can be stored.
597             if self.vtable as *const Vtable == &PROMOTABLE_EVEN_VTABLE
598                 || self.vtable as *const Vtable == &PROMOTABLE_ODD_VTABLE
599             {
600                 drop(self.split_off(len));
601             } else {
602                 self.len = len;
603             }
604         }
605     }
606 
607     /// Clears the buffer, removing all data.
608     ///
609     /// # Examples
610     ///
611     /// ```
612     /// use bytes::Bytes;
613     ///
614     /// let mut buf = Bytes::from(&b"hello world"[..]);
615     /// buf.clear();
616     /// assert!(buf.is_empty());
617     /// ```
618     #[inline]
clear(&mut self)619     pub fn clear(&mut self) {
620         self.truncate(0);
621     }
622 
623     /// Try to convert self into `BytesMut`.
624     ///
625     /// If `self` is unique for the entire original buffer, this will succeed
626     /// and return a `BytesMut` with the contents of `self` without copying.
627     /// If `self` is not unique for the entire original buffer, this will fail
628     /// and return self.
629     ///
630     /// This will also always fail if the buffer was constructed via either
631     /// [from_owner](Bytes::from_owner) or [from_static](Bytes::from_static).
632     ///
633     /// # Examples
634     ///
635     /// ```
636     /// use bytes::{Bytes, BytesMut};
637     ///
638     /// let bytes = Bytes::from(b"hello".to_vec());
639     /// assert_eq!(bytes.try_into_mut(), Ok(BytesMut::from(&b"hello"[..])));
640     /// ```
try_into_mut(self) -> Result<BytesMut, Bytes>641     pub fn try_into_mut(self) -> Result<BytesMut, Bytes> {
642         if self.is_unique() {
643             Ok(self.into())
644         } else {
645             Err(self)
646         }
647     }
648 
649     #[inline]
with_vtable( ptr: *const u8, len: usize, data: AtomicPtr<()>, vtable: &'static Vtable, ) -> Bytes650     pub(crate) unsafe fn with_vtable(
651         ptr: *const u8,
652         len: usize,
653         data: AtomicPtr<()>,
654         vtable: &'static Vtable,
655     ) -> Bytes {
656         Bytes {
657             ptr,
658             len,
659             data,
660             vtable,
661         }
662     }
663 
664     // private
665 
666     #[inline]
as_slice(&self) -> &[u8]667     fn as_slice(&self) -> &[u8] {
668         unsafe { slice::from_raw_parts(self.ptr, self.len) }
669     }
670 
671     #[inline]
inc_start(&mut self, by: usize)672     unsafe fn inc_start(&mut self, by: usize) {
673         // should already be asserted, but debug assert for tests
674         debug_assert!(self.len >= by, "internal: inc_start out of bounds");
675         self.len -= by;
676         self.ptr = self.ptr.add(by);
677     }
678 }
679 
680 // Vtable must enforce this behavior
681 unsafe impl Send for Bytes {}
682 unsafe impl Sync for Bytes {}
683 
684 impl Drop for Bytes {
685     #[inline]
drop(&mut self)686     fn drop(&mut self) {
687         unsafe { (self.vtable.drop)(&mut self.data, self.ptr, self.len) }
688     }
689 }
690 
691 impl Clone for Bytes {
692     #[inline]
clone(&self) -> Bytes693     fn clone(&self) -> Bytes {
694         unsafe { (self.vtable.clone)(&self.data, self.ptr, self.len) }
695     }
696 }
697 
698 impl Buf for Bytes {
699     #[inline]
remaining(&self) -> usize700     fn remaining(&self) -> usize {
701         self.len()
702     }
703 
704     #[inline]
chunk(&self) -> &[u8]705     fn chunk(&self) -> &[u8] {
706         self.as_slice()
707     }
708 
709     #[inline]
advance(&mut self, cnt: usize)710     fn advance(&mut self, cnt: usize) {
711         assert!(
712             cnt <= self.len(),
713             "cannot advance past `remaining`: {:?} <= {:?}",
714             cnt,
715             self.len(),
716         );
717 
718         unsafe {
719             self.inc_start(cnt);
720         }
721     }
722 
copy_to_bytes(&mut self, len: usize) -> Self723     fn copy_to_bytes(&mut self, len: usize) -> Self {
724         self.split_to(len)
725     }
726 }
727 
728 impl Deref for Bytes {
729     type Target = [u8];
730 
731     #[inline]
deref(&self) -> &[u8]732     fn deref(&self) -> &[u8] {
733         self.as_slice()
734     }
735 }
736 
737 impl AsRef<[u8]> for Bytes {
738     #[inline]
as_ref(&self) -> &[u8]739     fn as_ref(&self) -> &[u8] {
740         self.as_slice()
741     }
742 }
743 
744 impl hash::Hash for Bytes {
hash<H>(&self, state: &mut H) where H: hash::Hasher,745     fn hash<H>(&self, state: &mut H)
746     where
747         H: hash::Hasher,
748     {
749         self.as_slice().hash(state);
750     }
751 }
752 
753 impl Borrow<[u8]> for Bytes {
borrow(&self) -> &[u8]754     fn borrow(&self) -> &[u8] {
755         self.as_slice()
756     }
757 }
758 
759 impl IntoIterator for Bytes {
760     type Item = u8;
761     type IntoIter = IntoIter<Bytes>;
762 
into_iter(self) -> Self::IntoIter763     fn into_iter(self) -> Self::IntoIter {
764         IntoIter::new(self)
765     }
766 }
767 
768 impl<'a> IntoIterator for &'a Bytes {
769     type Item = &'a u8;
770     type IntoIter = core::slice::Iter<'a, u8>;
771 
into_iter(self) -> Self::IntoIter772     fn into_iter(self) -> Self::IntoIter {
773         self.as_slice().iter()
774     }
775 }
776 
777 impl FromIterator<u8> for Bytes {
from_iter<T: IntoIterator<Item = u8>>(into_iter: T) -> Self778     fn from_iter<T: IntoIterator<Item = u8>>(into_iter: T) -> Self {
779         Vec::from_iter(into_iter).into()
780     }
781 }
782 
783 // impl Eq
784 
785 impl PartialEq for Bytes {
eq(&self, other: &Bytes) -> bool786     fn eq(&self, other: &Bytes) -> bool {
787         self.as_slice() == other.as_slice()
788     }
789 }
790 
791 impl PartialOrd for Bytes {
partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering>792     fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> {
793         self.as_slice().partial_cmp(other.as_slice())
794     }
795 }
796 
797 impl Ord for Bytes {
cmp(&self, other: &Bytes) -> cmp::Ordering798     fn cmp(&self, other: &Bytes) -> cmp::Ordering {
799         self.as_slice().cmp(other.as_slice())
800     }
801 }
802 
803 impl Eq for Bytes {}
804 
805 impl PartialEq<[u8]> for Bytes {
eq(&self, other: &[u8]) -> bool806     fn eq(&self, other: &[u8]) -> bool {
807         self.as_slice() == other
808     }
809 }
810 
811 impl PartialOrd<[u8]> for Bytes {
partial_cmp(&self, other: &[u8]) -> Option<cmp::Ordering>812     fn partial_cmp(&self, other: &[u8]) -> Option<cmp::Ordering> {
813         self.as_slice().partial_cmp(other)
814     }
815 }
816 
817 impl PartialEq<Bytes> for [u8] {
eq(&self, other: &Bytes) -> bool818     fn eq(&self, other: &Bytes) -> bool {
819         *other == *self
820     }
821 }
822 
823 impl PartialOrd<Bytes> for [u8] {
partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering>824     fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> {
825         <[u8] as PartialOrd<[u8]>>::partial_cmp(self, other)
826     }
827 }
828 
829 impl PartialEq<str> for Bytes {
eq(&self, other: &str) -> bool830     fn eq(&self, other: &str) -> bool {
831         self.as_slice() == other.as_bytes()
832     }
833 }
834 
835 impl PartialOrd<str> for Bytes {
partial_cmp(&self, other: &str) -> Option<cmp::Ordering>836     fn partial_cmp(&self, other: &str) -> Option<cmp::Ordering> {
837         self.as_slice().partial_cmp(other.as_bytes())
838     }
839 }
840 
841 impl PartialEq<Bytes> for str {
eq(&self, other: &Bytes) -> bool842     fn eq(&self, other: &Bytes) -> bool {
843         *other == *self
844     }
845 }
846 
847 impl PartialOrd<Bytes> for str {
partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering>848     fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> {
849         <[u8] as PartialOrd<[u8]>>::partial_cmp(self.as_bytes(), other)
850     }
851 }
852 
853 impl PartialEq<Vec<u8>> for Bytes {
eq(&self, other: &Vec<u8>) -> bool854     fn eq(&self, other: &Vec<u8>) -> bool {
855         *self == other[..]
856     }
857 }
858 
859 impl PartialOrd<Vec<u8>> for Bytes {
partial_cmp(&self, other: &Vec<u8>) -> Option<cmp::Ordering>860     fn partial_cmp(&self, other: &Vec<u8>) -> Option<cmp::Ordering> {
861         self.as_slice().partial_cmp(&other[..])
862     }
863 }
864 
865 impl PartialEq<Bytes> for Vec<u8> {
eq(&self, other: &Bytes) -> bool866     fn eq(&self, other: &Bytes) -> bool {
867         *other == *self
868     }
869 }
870 
871 impl PartialOrd<Bytes> for Vec<u8> {
partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering>872     fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> {
873         <[u8] as PartialOrd<[u8]>>::partial_cmp(self, other)
874     }
875 }
876 
877 impl PartialEq<String> for Bytes {
eq(&self, other: &String) -> bool878     fn eq(&self, other: &String) -> bool {
879         *self == other[..]
880     }
881 }
882 
883 impl PartialOrd<String> for Bytes {
partial_cmp(&self, other: &String) -> Option<cmp::Ordering>884     fn partial_cmp(&self, other: &String) -> Option<cmp::Ordering> {
885         self.as_slice().partial_cmp(other.as_bytes())
886     }
887 }
888 
889 impl PartialEq<Bytes> for String {
eq(&self, other: &Bytes) -> bool890     fn eq(&self, other: &Bytes) -> bool {
891         *other == *self
892     }
893 }
894 
895 impl PartialOrd<Bytes> for String {
partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering>896     fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> {
897         <[u8] as PartialOrd<[u8]>>::partial_cmp(self.as_bytes(), other)
898     }
899 }
900 
901 impl PartialEq<Bytes> for &[u8] {
eq(&self, other: &Bytes) -> bool902     fn eq(&self, other: &Bytes) -> bool {
903         *other == *self
904     }
905 }
906 
907 impl PartialOrd<Bytes> for &[u8] {
partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering>908     fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> {
909         <[u8] as PartialOrd<[u8]>>::partial_cmp(self, other)
910     }
911 }
912 
913 impl PartialEq<Bytes> for &str {
eq(&self, other: &Bytes) -> bool914     fn eq(&self, other: &Bytes) -> bool {
915         *other == *self
916     }
917 }
918 
919 impl PartialOrd<Bytes> for &str {
partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering>920     fn partial_cmp(&self, other: &Bytes) -> Option<cmp::Ordering> {
921         <[u8] as PartialOrd<[u8]>>::partial_cmp(self.as_bytes(), other)
922     }
923 }
924 
925 impl<'a, T: ?Sized> PartialEq<&'a T> for Bytes
926 where
927     Bytes: PartialEq<T>,
928 {
eq(&self, other: &&'a T) -> bool929     fn eq(&self, other: &&'a T) -> bool {
930         *self == **other
931     }
932 }
933 
934 impl<'a, T: ?Sized> PartialOrd<&'a T> for Bytes
935 where
936     Bytes: PartialOrd<T>,
937 {
partial_cmp(&self, other: &&'a T) -> Option<cmp::Ordering>938     fn partial_cmp(&self, other: &&'a T) -> Option<cmp::Ordering> {
939         self.partial_cmp(&**other)
940     }
941 }
942 
943 // impl From
944 
945 impl Default for Bytes {
946     #[inline]
default() -> Bytes947     fn default() -> Bytes {
948         Bytes::new()
949     }
950 }
951 
952 impl From<&'static [u8]> for Bytes {
from(slice: &'static [u8]) -> Bytes953     fn from(slice: &'static [u8]) -> Bytes {
954         Bytes::from_static(slice)
955     }
956 }
957 
958 impl From<&'static str> for Bytes {
from(slice: &'static str) -> Bytes959     fn from(slice: &'static str) -> Bytes {
960         Bytes::from_static(slice.as_bytes())
961     }
962 }
963 
964 impl From<Vec<u8>> for Bytes {
from(vec: Vec<u8>) -> Bytes965     fn from(vec: Vec<u8>) -> Bytes {
966         let mut vec = ManuallyDrop::new(vec);
967         let ptr = vec.as_mut_ptr();
968         let len = vec.len();
969         let cap = vec.capacity();
970 
971         // Avoid an extra allocation if possible.
972         if len == cap {
973             let vec = ManuallyDrop::into_inner(vec);
974             return Bytes::from(vec.into_boxed_slice());
975         }
976 
977         let shared = Box::new(Shared {
978             buf: ptr,
979             cap,
980             ref_cnt: AtomicUsize::new(1),
981         });
982 
983         let shared = Box::into_raw(shared);
984         // The pointer should be aligned, so this assert should
985         // always succeed.
986         debug_assert!(
987             0 == (shared as usize & KIND_MASK),
988             "internal: Box<Shared> should have an aligned pointer",
989         );
990         Bytes {
991             ptr,
992             len,
993             data: AtomicPtr::new(shared as _),
994             vtable: &SHARED_VTABLE,
995         }
996     }
997 }
998 
999 impl From<Box<[u8]>> for Bytes {
from(slice: Box<[u8]>) -> Bytes1000     fn from(slice: Box<[u8]>) -> Bytes {
1001         // Box<[u8]> doesn't contain a heap allocation for empty slices,
1002         // so the pointer isn't aligned enough for the KIND_VEC stashing to
1003         // work.
1004         if slice.is_empty() {
1005             return Bytes::new();
1006         }
1007 
1008         let len = slice.len();
1009         let ptr = Box::into_raw(slice) as *mut u8;
1010 
1011         if ptr as usize & 0x1 == 0 {
1012             let data = ptr_map(ptr, |addr| addr | KIND_VEC);
1013             Bytes {
1014                 ptr,
1015                 len,
1016                 data: AtomicPtr::new(data.cast()),
1017                 vtable: &PROMOTABLE_EVEN_VTABLE,
1018             }
1019         } else {
1020             Bytes {
1021                 ptr,
1022                 len,
1023                 data: AtomicPtr::new(ptr.cast()),
1024                 vtable: &PROMOTABLE_ODD_VTABLE,
1025             }
1026         }
1027     }
1028 }
1029 
1030 impl From<Bytes> for BytesMut {
1031     /// Convert self into `BytesMut`.
1032     ///
1033     /// If `bytes` is unique for the entire original buffer, this will return a
1034     /// `BytesMut` with the contents of `bytes` without copying.
1035     /// If `bytes` is not unique for the entire original buffer, this will make
1036     /// a copy of `bytes` subset of the original buffer in a new `BytesMut`.
1037     ///
1038     /// # Examples
1039     ///
1040     /// ```
1041     /// use bytes::{Bytes, BytesMut};
1042     ///
1043     /// let bytes = Bytes::from(b"hello".to_vec());
1044     /// assert_eq!(BytesMut::from(bytes), BytesMut::from(&b"hello"[..]));
1045     /// ```
from(bytes: Bytes) -> Self1046     fn from(bytes: Bytes) -> Self {
1047         let bytes = ManuallyDrop::new(bytes);
1048         unsafe { (bytes.vtable.to_mut)(&bytes.data, bytes.ptr, bytes.len) }
1049     }
1050 }
1051 
1052 impl From<String> for Bytes {
from(s: String) -> Bytes1053     fn from(s: String) -> Bytes {
1054         Bytes::from(s.into_bytes())
1055     }
1056 }
1057 
1058 impl From<Bytes> for Vec<u8> {
from(bytes: Bytes) -> Vec<u8>1059     fn from(bytes: Bytes) -> Vec<u8> {
1060         let bytes = ManuallyDrop::new(bytes);
1061         unsafe { (bytes.vtable.to_vec)(&bytes.data, bytes.ptr, bytes.len) }
1062     }
1063 }
1064 
1065 // ===== impl Vtable =====
1066 
1067 impl fmt::Debug for Vtable {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1068     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1069         f.debug_struct("Vtable")
1070             .field("clone", &(self.clone as *const ()))
1071             .field("drop", &(self.drop as *const ()))
1072             .finish()
1073     }
1074 }
1075 
1076 // ===== impl StaticVtable =====
1077 
1078 const STATIC_VTABLE: Vtable = Vtable {
1079     clone: static_clone,
1080     to_vec: static_to_vec,
1081     to_mut: static_to_mut,
1082     is_unique: static_is_unique,
1083     drop: static_drop,
1084 };
1085 
static_clone(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes1086 unsafe fn static_clone(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
1087     let slice = slice::from_raw_parts(ptr, len);
1088     Bytes::from_static(slice)
1089 }
1090 
static_to_vec(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8>1091 unsafe fn static_to_vec(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> {
1092     let slice = slice::from_raw_parts(ptr, len);
1093     slice.to_vec()
1094 }
1095 
static_to_mut(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut1096 unsafe fn static_to_mut(_: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
1097     let slice = slice::from_raw_parts(ptr, len);
1098     BytesMut::from(slice)
1099 }
1100 
static_is_unique(_: &AtomicPtr<()>) -> bool1101 fn static_is_unique(_: &AtomicPtr<()>) -> bool {
1102     false
1103 }
1104 
static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize)1105 unsafe fn static_drop(_: &mut AtomicPtr<()>, _: *const u8, _: usize) {
1106     // nothing to drop for &'static [u8]
1107 }
1108 
1109 // ===== impl OwnedVtable =====
1110 
1111 #[repr(C)]
1112 struct OwnedLifetime {
1113     ref_cnt: AtomicUsize,
1114     drop: unsafe fn(*mut ()),
1115 }
1116 
1117 #[repr(C)]
1118 struct Owned<T> {
1119     lifetime: OwnedLifetime,
1120     owner: T,
1121 }
1122 
owned_box_and_drop<T>(ptr: *mut ())1123 unsafe fn owned_box_and_drop<T>(ptr: *mut ()) {
1124     let b: Box<Owned<T>> = Box::from_raw(ptr as _);
1125     drop(b);
1126 }
1127 
owned_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes1128 unsafe fn owned_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
1129     let owned = data.load(Ordering::Relaxed);
1130     let ref_cnt = &(*owned.cast::<OwnedLifetime>()).ref_cnt;
1131     let old_cnt = ref_cnt.fetch_add(1, Ordering::Relaxed);
1132     if old_cnt > usize::MAX >> 1 {
1133         crate::abort()
1134     }
1135 
1136     Bytes {
1137         ptr,
1138         len,
1139         data: AtomicPtr::new(owned as _),
1140         vtable: &OWNED_VTABLE,
1141     }
1142 }
1143 
owned_to_vec(_data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8>1144 unsafe fn owned_to_vec(_data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> {
1145     let slice = slice::from_raw_parts(ptr, len);
1146     slice.to_vec()
1147 }
1148 
owned_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut1149 unsafe fn owned_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
1150     let bytes_mut = BytesMut::from_vec(owned_to_vec(data, ptr, len));
1151     owned_drop_impl(data.load(Ordering::Relaxed));
1152     bytes_mut
1153 }
1154 
owned_is_unique(_data: &AtomicPtr<()>) -> bool1155 unsafe fn owned_is_unique(_data: &AtomicPtr<()>) -> bool {
1156     false
1157 }
1158 
owned_drop_impl(owned: *mut ())1159 unsafe fn owned_drop_impl(owned: *mut ()) {
1160     let lifetime = owned.cast::<OwnedLifetime>();
1161     let ref_cnt = &(*lifetime).ref_cnt;
1162 
1163     let old_cnt = ref_cnt.fetch_sub(1, Ordering::Release);
1164     if old_cnt != 1 {
1165         return;
1166     }
1167     ref_cnt.load(Ordering::Acquire);
1168 
1169     let drop_fn = &(*lifetime).drop;
1170     drop_fn(owned)
1171 }
1172 
owned_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize)1173 unsafe fn owned_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) {
1174     let owned = data.load(Ordering::Relaxed);
1175     owned_drop_impl(owned);
1176 }
1177 
1178 static OWNED_VTABLE: Vtable = Vtable {
1179     clone: owned_clone,
1180     to_vec: owned_to_vec,
1181     to_mut: owned_to_mut,
1182     is_unique: owned_is_unique,
1183     drop: owned_drop,
1184 };
1185 
1186 // ===== impl PromotableVtable =====
1187 
1188 static PROMOTABLE_EVEN_VTABLE: Vtable = Vtable {
1189     clone: promotable_even_clone,
1190     to_vec: promotable_even_to_vec,
1191     to_mut: promotable_even_to_mut,
1192     is_unique: promotable_is_unique,
1193     drop: promotable_even_drop,
1194 };
1195 
1196 static PROMOTABLE_ODD_VTABLE: Vtable = Vtable {
1197     clone: promotable_odd_clone,
1198     to_vec: promotable_odd_to_vec,
1199     to_mut: promotable_odd_to_mut,
1200     is_unique: promotable_is_unique,
1201     drop: promotable_odd_drop,
1202 };
1203 
promotable_even_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes1204 unsafe fn promotable_even_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
1205     let shared = data.load(Ordering::Acquire);
1206     let kind = shared as usize & KIND_MASK;
1207 
1208     if kind == KIND_ARC {
1209         shallow_clone_arc(shared.cast(), ptr, len)
1210     } else {
1211         debug_assert_eq!(kind, KIND_VEC);
1212         let buf = ptr_map(shared.cast(), |addr| addr & !KIND_MASK);
1213         shallow_clone_vec(data, shared, buf, ptr, len)
1214     }
1215 }
1216 
promotable_to_vec( data: &AtomicPtr<()>, ptr: *const u8, len: usize, f: fn(*mut ()) -> *mut u8, ) -> Vec<u8>1217 unsafe fn promotable_to_vec(
1218     data: &AtomicPtr<()>,
1219     ptr: *const u8,
1220     len: usize,
1221     f: fn(*mut ()) -> *mut u8,
1222 ) -> Vec<u8> {
1223     let shared = data.load(Ordering::Acquire);
1224     let kind = shared as usize & KIND_MASK;
1225 
1226     if kind == KIND_ARC {
1227         shared_to_vec_impl(shared.cast(), ptr, len)
1228     } else {
1229         // If Bytes holds a Vec, then the offset must be 0.
1230         debug_assert_eq!(kind, KIND_VEC);
1231 
1232         let buf = f(shared);
1233 
1234         let cap = offset_from(ptr, buf) + len;
1235 
1236         // Copy back buffer
1237         ptr::copy(ptr, buf, len);
1238 
1239         Vec::from_raw_parts(buf, len, cap)
1240     }
1241 }
1242 
promotable_to_mut( data: &AtomicPtr<()>, ptr: *const u8, len: usize, f: fn(*mut ()) -> *mut u8, ) -> BytesMut1243 unsafe fn promotable_to_mut(
1244     data: &AtomicPtr<()>,
1245     ptr: *const u8,
1246     len: usize,
1247     f: fn(*mut ()) -> *mut u8,
1248 ) -> BytesMut {
1249     let shared = data.load(Ordering::Acquire);
1250     let kind = shared as usize & KIND_MASK;
1251 
1252     if kind == KIND_ARC {
1253         shared_to_mut_impl(shared.cast(), ptr, len)
1254     } else {
1255         // KIND_VEC is a view of an underlying buffer at a certain offset.
1256         // The ptr + len always represents the end of that buffer.
1257         // Before truncating it, it is first promoted to KIND_ARC.
1258         // Thus, we can safely reconstruct a Vec from it without leaking memory.
1259         debug_assert_eq!(kind, KIND_VEC);
1260 
1261         let buf = f(shared);
1262         let off = offset_from(ptr, buf);
1263         let cap = off + len;
1264         let v = Vec::from_raw_parts(buf, cap, cap);
1265 
1266         let mut b = BytesMut::from_vec(v);
1267         b.advance_unchecked(off);
1268         b
1269     }
1270 }
1271 
promotable_even_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8>1272 unsafe fn promotable_even_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> {
1273     promotable_to_vec(data, ptr, len, |shared| {
1274         ptr_map(shared.cast(), |addr| addr & !KIND_MASK)
1275     })
1276 }
1277 
promotable_even_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut1278 unsafe fn promotable_even_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
1279     promotable_to_mut(data, ptr, len, |shared| {
1280         ptr_map(shared.cast(), |addr| addr & !KIND_MASK)
1281     })
1282 }
1283 
promotable_even_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize)1284 unsafe fn promotable_even_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) {
1285     data.with_mut(|shared| {
1286         let shared = *shared;
1287         let kind = shared as usize & KIND_MASK;
1288 
1289         if kind == KIND_ARC {
1290             release_shared(shared.cast());
1291         } else {
1292             debug_assert_eq!(kind, KIND_VEC);
1293             let buf = ptr_map(shared.cast(), |addr| addr & !KIND_MASK);
1294             free_boxed_slice(buf, ptr, len);
1295         }
1296     });
1297 }
1298 
promotable_odd_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes1299 unsafe fn promotable_odd_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
1300     let shared = data.load(Ordering::Acquire);
1301     let kind = shared as usize & KIND_MASK;
1302 
1303     if kind == KIND_ARC {
1304         shallow_clone_arc(shared as _, ptr, len)
1305     } else {
1306         debug_assert_eq!(kind, KIND_VEC);
1307         shallow_clone_vec(data, shared, shared.cast(), ptr, len)
1308     }
1309 }
1310 
promotable_odd_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8>1311 unsafe fn promotable_odd_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> {
1312     promotable_to_vec(data, ptr, len, |shared| shared.cast())
1313 }
1314 
promotable_odd_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut1315 unsafe fn promotable_odd_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
1316     promotable_to_mut(data, ptr, len, |shared| shared.cast())
1317 }
1318 
promotable_odd_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize)1319 unsafe fn promotable_odd_drop(data: &mut AtomicPtr<()>, ptr: *const u8, len: usize) {
1320     data.with_mut(|shared| {
1321         let shared = *shared;
1322         let kind = shared as usize & KIND_MASK;
1323 
1324         if kind == KIND_ARC {
1325             release_shared(shared.cast());
1326         } else {
1327             debug_assert_eq!(kind, KIND_VEC);
1328 
1329             free_boxed_slice(shared.cast(), ptr, len);
1330         }
1331     });
1332 }
1333 
promotable_is_unique(data: &AtomicPtr<()>) -> bool1334 unsafe fn promotable_is_unique(data: &AtomicPtr<()>) -> bool {
1335     let shared = data.load(Ordering::Acquire);
1336     let kind = shared as usize & KIND_MASK;
1337 
1338     if kind == KIND_ARC {
1339         let ref_cnt = (*shared.cast::<Shared>()).ref_cnt.load(Ordering::Relaxed);
1340         ref_cnt == 1
1341     } else {
1342         true
1343     }
1344 }
1345 
free_boxed_slice(buf: *mut u8, offset: *const u8, len: usize)1346 unsafe fn free_boxed_slice(buf: *mut u8, offset: *const u8, len: usize) {
1347     let cap = offset_from(offset, buf) + len;
1348     dealloc(buf, Layout::from_size_align(cap, 1).unwrap())
1349 }
1350 
1351 // ===== impl SharedVtable =====
1352 
1353 struct Shared {
1354     // Holds arguments to dealloc upon Drop, but otherwise doesn't use them
1355     buf: *mut u8,
1356     cap: usize,
1357     ref_cnt: AtomicUsize,
1358 }
1359 
1360 impl Drop for Shared {
drop(&mut self)1361     fn drop(&mut self) {
1362         unsafe { dealloc(self.buf, Layout::from_size_align(self.cap, 1).unwrap()) }
1363     }
1364 }
1365 
1366 // Assert that the alignment of `Shared` is divisible by 2.
1367 // This is a necessary invariant since we depend on allocating `Shared` a
1368 // shared object to implicitly carry the `KIND_ARC` flag in its pointer.
1369 // This flag is set when the LSB is 0.
1370 const _: [(); 0 - mem::align_of::<Shared>() % 2] = []; // Assert that the alignment of `Shared` is divisible by 2.
1371 
1372 static SHARED_VTABLE: Vtable = Vtable {
1373     clone: shared_clone,
1374     to_vec: shared_to_vec,
1375     to_mut: shared_to_mut,
1376     is_unique: shared_is_unique,
1377     drop: shared_drop,
1378 };
1379 
1380 const KIND_ARC: usize = 0b0;
1381 const KIND_VEC: usize = 0b1;
1382 const KIND_MASK: usize = 0b1;
1383 
shared_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes1384 unsafe fn shared_clone(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Bytes {
1385     let shared = data.load(Ordering::Relaxed);
1386     shallow_clone_arc(shared as _, ptr, len)
1387 }
1388 
shared_to_vec_impl(shared: *mut Shared, ptr: *const u8, len: usize) -> Vec<u8>1389 unsafe fn shared_to_vec_impl(shared: *mut Shared, ptr: *const u8, len: usize) -> Vec<u8> {
1390     // Check that the ref_cnt is 1 (unique).
1391     //
1392     // If it is unique, then it is set to 0 with AcqRel fence for the same
1393     // reason in release_shared.
1394     //
1395     // Otherwise, we take the other branch and call release_shared.
1396     if (*shared)
1397         .ref_cnt
1398         .compare_exchange(1, 0, Ordering::AcqRel, Ordering::Relaxed)
1399         .is_ok()
1400     {
1401         // Deallocate the `Shared` instance without running its destructor.
1402         let shared = *Box::from_raw(shared);
1403         let shared = ManuallyDrop::new(shared);
1404         let buf = shared.buf;
1405         let cap = shared.cap;
1406 
1407         // Copy back buffer
1408         ptr::copy(ptr, buf, len);
1409 
1410         Vec::from_raw_parts(buf, len, cap)
1411     } else {
1412         let v = slice::from_raw_parts(ptr, len).to_vec();
1413         release_shared(shared);
1414         v
1415     }
1416 }
1417 
shared_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8>1418 unsafe fn shared_to_vec(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> Vec<u8> {
1419     shared_to_vec_impl(data.load(Ordering::Relaxed).cast(), ptr, len)
1420 }
1421 
shared_to_mut_impl(shared: *mut Shared, ptr: *const u8, len: usize) -> BytesMut1422 unsafe fn shared_to_mut_impl(shared: *mut Shared, ptr: *const u8, len: usize) -> BytesMut {
1423     // The goal is to check if the current handle is the only handle
1424     // that currently has access to the buffer. This is done by
1425     // checking if the `ref_cnt` is currently 1.
1426     //
1427     // The `Acquire` ordering synchronizes with the `Release` as
1428     // part of the `fetch_sub` in `release_shared`. The `fetch_sub`
1429     // operation guarantees that any mutations done in other threads
1430     // are ordered before the `ref_cnt` is decremented. As such,
1431     // this `Acquire` will guarantee that those mutations are
1432     // visible to the current thread.
1433     //
1434     // Otherwise, we take the other branch, copy the data and call `release_shared`.
1435     if (*shared).ref_cnt.load(Ordering::Acquire) == 1 {
1436         // Deallocate the `Shared` instance without running its destructor.
1437         let shared = *Box::from_raw(shared);
1438         let shared = ManuallyDrop::new(shared);
1439         let buf = shared.buf;
1440         let cap = shared.cap;
1441 
1442         // Rebuild Vec
1443         let off = offset_from(ptr, buf);
1444         let v = Vec::from_raw_parts(buf, len + off, cap);
1445 
1446         let mut b = BytesMut::from_vec(v);
1447         b.advance_unchecked(off);
1448         b
1449     } else {
1450         // Copy the data from Shared in a new Vec, then release it
1451         let v = slice::from_raw_parts(ptr, len).to_vec();
1452         release_shared(shared);
1453         BytesMut::from_vec(v)
1454     }
1455 }
1456 
shared_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut1457 unsafe fn shared_to_mut(data: &AtomicPtr<()>, ptr: *const u8, len: usize) -> BytesMut {
1458     shared_to_mut_impl(data.load(Ordering::Relaxed).cast(), ptr, len)
1459 }
1460 
shared_is_unique(data: &AtomicPtr<()>) -> bool1461 pub(crate) unsafe fn shared_is_unique(data: &AtomicPtr<()>) -> bool {
1462     let shared = data.load(Ordering::Acquire);
1463     let ref_cnt = (*shared.cast::<Shared>()).ref_cnt.load(Ordering::Relaxed);
1464     ref_cnt == 1
1465 }
1466 
shared_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize)1467 unsafe fn shared_drop(data: &mut AtomicPtr<()>, _ptr: *const u8, _len: usize) {
1468     data.with_mut(|shared| {
1469         release_shared(shared.cast());
1470     });
1471 }
1472 
shallow_clone_arc(shared: *mut Shared, ptr: *const u8, len: usize) -> Bytes1473 unsafe fn shallow_clone_arc(shared: *mut Shared, ptr: *const u8, len: usize) -> Bytes {
1474     let old_size = (*shared).ref_cnt.fetch_add(1, Ordering::Relaxed);
1475 
1476     if old_size > usize::MAX >> 1 {
1477         crate::abort();
1478     }
1479 
1480     Bytes {
1481         ptr,
1482         len,
1483         data: AtomicPtr::new(shared as _),
1484         vtable: &SHARED_VTABLE,
1485     }
1486 }
1487 
1488 #[cold]
shallow_clone_vec( atom: &AtomicPtr<()>, ptr: *const (), buf: *mut u8, offset: *const u8, len: usize, ) -> Bytes1489 unsafe fn shallow_clone_vec(
1490     atom: &AtomicPtr<()>,
1491     ptr: *const (),
1492     buf: *mut u8,
1493     offset: *const u8,
1494     len: usize,
1495 ) -> Bytes {
1496     // If the buffer is still tracked in a `Vec<u8>`. It is time to
1497     // promote the vec to an `Arc`. This could potentially be called
1498     // concurrently, so some care must be taken.
1499 
1500     // First, allocate a new `Shared` instance containing the
1501     // `Vec` fields. It's important to note that `ptr`, `len`,
1502     // and `cap` cannot be mutated without having `&mut self`.
1503     // This means that these fields will not be concurrently
1504     // updated and since the buffer hasn't been promoted to an
1505     // `Arc`, those three fields still are the components of the
1506     // vector.
1507     let shared = Box::new(Shared {
1508         buf,
1509         cap: offset_from(offset, buf) + len,
1510         // Initialize refcount to 2. One for this reference, and one
1511         // for the new clone that will be returned from
1512         // `shallow_clone`.
1513         ref_cnt: AtomicUsize::new(2),
1514     });
1515 
1516     let shared = Box::into_raw(shared);
1517 
1518     // The pointer should be aligned, so this assert should
1519     // always succeed.
1520     debug_assert!(
1521         0 == (shared as usize & KIND_MASK),
1522         "internal: Box<Shared> should have an aligned pointer",
1523     );
1524 
1525     // Try compare & swapping the pointer into the `arc` field.
1526     // `Release` is used synchronize with other threads that
1527     // will load the `arc` field.
1528     //
1529     // If the `compare_exchange` fails, then the thread lost the
1530     // race to promote the buffer to shared. The `Acquire`
1531     // ordering will synchronize with the `compare_exchange`
1532     // that happened in the other thread and the `Shared`
1533     // pointed to by `actual` will be visible.
1534     match atom.compare_exchange(ptr as _, shared as _, Ordering::AcqRel, Ordering::Acquire) {
1535         Ok(actual) => {
1536             debug_assert!(actual as usize == ptr as usize);
1537             // The upgrade was successful, the new handle can be
1538             // returned.
1539             Bytes {
1540                 ptr: offset,
1541                 len,
1542                 data: AtomicPtr::new(shared as _),
1543                 vtable: &SHARED_VTABLE,
1544             }
1545         }
1546         Err(actual) => {
1547             // The upgrade failed, a concurrent clone happened. Release
1548             // the allocation that was made in this thread, it will not
1549             // be needed.
1550             let shared = Box::from_raw(shared);
1551             mem::forget(*shared);
1552 
1553             // Buffer already promoted to shared storage, so increment ref
1554             // count.
1555             shallow_clone_arc(actual as _, offset, len)
1556         }
1557     }
1558 }
1559 
release_shared(ptr: *mut Shared)1560 unsafe fn release_shared(ptr: *mut Shared) {
1561     // `Shared` storage... follow the drop steps from Arc.
1562     if (*ptr).ref_cnt.fetch_sub(1, Ordering::Release) != 1 {
1563         return;
1564     }
1565 
1566     // This fence is needed to prevent reordering of use of the data and
1567     // deletion of the data.  Because it is marked `Release`, the decreasing
1568     // of the reference count synchronizes with this `Acquire` fence. This
1569     // means that use of the data happens before decreasing the reference
1570     // count, which happens before this fence, which happens before the
1571     // deletion of the data.
1572     //
1573     // As explained in the [Boost documentation][1],
1574     //
1575     // > It is important to enforce any possible access to the object in one
1576     // > thread (through an existing reference) to *happen before* deleting
1577     // > the object in a different thread. This is achieved by a "release"
1578     // > operation after dropping a reference (any access to the object
1579     // > through this reference must obviously happened before), and an
1580     // > "acquire" operation before deleting the object.
1581     //
1582     // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
1583     //
1584     // Thread sanitizer does not support atomic fences. Use an atomic load
1585     // instead.
1586     (*ptr).ref_cnt.load(Ordering::Acquire);
1587 
1588     // Drop the data
1589     drop(Box::from_raw(ptr));
1590 }
1591 
1592 // Ideally we would always use this version of `ptr_map` since it is strict
1593 // provenance compatible, but it results in worse codegen. We will however still
1594 // use it on miri because it gives better diagnostics for people who test bytes
1595 // code with miri.
1596 //
1597 // See https://github.com/tokio-rs/bytes/pull/545 for more info.
1598 #[cfg(miri)]
ptr_map<F>(ptr: *mut u8, f: F) -> *mut u8 where F: FnOnce(usize) -> usize,1599 fn ptr_map<F>(ptr: *mut u8, f: F) -> *mut u8
1600 where
1601     F: FnOnce(usize) -> usize,
1602 {
1603     let old_addr = ptr as usize;
1604     let new_addr = f(old_addr);
1605     let diff = new_addr.wrapping_sub(old_addr);
1606     ptr.wrapping_add(diff)
1607 }
1608 
1609 #[cfg(not(miri))]
ptr_map<F>(ptr: *mut u8, f: F) -> *mut u8 where F: FnOnce(usize) -> usize,1610 fn ptr_map<F>(ptr: *mut u8, f: F) -> *mut u8
1611 where
1612     F: FnOnce(usize) -> usize,
1613 {
1614     let old_addr = ptr as usize;
1615     let new_addr = f(old_addr);
1616     new_addr as *mut u8
1617 }
1618 
without_provenance(ptr: usize) -> *const u81619 fn without_provenance(ptr: usize) -> *const u8 {
1620     core::ptr::null::<u8>().wrapping_add(ptr)
1621 }
1622 
1623 // compile-fails
1624 
1625 /// ```compile_fail
1626 /// use bytes::Bytes;
1627 /// #[deny(unused_must_use)]
1628 /// {
1629 ///     let mut b1 = Bytes::from("hello world");
1630 ///     b1.split_to(6);
1631 /// }
1632 /// ```
_split_to_must_use()1633 fn _split_to_must_use() {}
1634 
1635 /// ```compile_fail
1636 /// use bytes::Bytes;
1637 /// #[deny(unused_must_use)]
1638 /// {
1639 ///     let mut b1 = Bytes::from("hello world");
1640 ///     b1.split_off(6);
1641 /// }
1642 /// ```
_split_off_must_use()1643 fn _split_off_must_use() {}
1644 
1645 // fuzz tests
1646 #[cfg(all(test, loom))]
1647 mod fuzz {
1648     use loom::sync::Arc;
1649     use loom::thread;
1650 
1651     use super::Bytes;
1652     #[test]
bytes_cloning_vec()1653     fn bytes_cloning_vec() {
1654         loom::model(|| {
1655             let a = Bytes::from(b"abcdefgh".to_vec());
1656             let addr = a.as_ptr() as usize;
1657 
1658             // test the Bytes::clone is Sync by putting it in an Arc
1659             let a1 = Arc::new(a);
1660             let a2 = a1.clone();
1661 
1662             let t1 = thread::spawn(move || {
1663                 let b: Bytes = (*a1).clone();
1664                 assert_eq!(b.as_ptr() as usize, addr);
1665             });
1666 
1667             let t2 = thread::spawn(move || {
1668                 let b: Bytes = (*a2).clone();
1669                 assert_eq!(b.as_ptr() as usize, addr);
1670             });
1671 
1672             t1.join().unwrap();
1673             t2.join().unwrap();
1674         });
1675     }
1676 }
1677