1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 use grpcio_sys::*;
4 use std::cell::UnsafeCell;
5 use std::ffi::{c_void, CStr, CString};
6 use std::fmt::{self, Debug, Formatter};
7 use std::io::{self, BufRead, Read};
8 use std::mem::{self, ManuallyDrop, MaybeUninit};
9 
10 /// Copied from grpc-sys/grpc/include/grpc/impl/codegen/slice.h. Unfortunately bindgen doesn't
11 /// generate it automatically.
12 const INLINED_SIZE: usize = mem::size_of::<libc::size_t>() + mem::size_of::<*mut u8>() - 1
13     + mem::size_of::<*mut libc::c_void>();
14 
15 /// A convenient rust wrapper for the type `grpc_slice`.
16 ///
17 /// It's expected that the slice should be initialized.
18 #[repr(transparent)]
19 pub struct GrpcSlice(grpc_slice);
20 
21 impl GrpcSlice {
22     /// Get the length of the data.
len(&self) -> usize23     pub fn len(&self) -> usize {
24         unsafe {
25             if !self.0.refcount.is_null() {
26                 self.0.data.refcounted.length
27             } else {
28                 self.0.data.inlined.length as usize
29             }
30         }
31     }
32 
33     /// Returns a slice of inner buffer.
as_slice(&self) -> &[u8]34     pub fn as_slice(&self) -> &[u8] {
35         unsafe {
36             if !self.0.refcount.is_null() {
37                 let start = self.0.data.refcounted.bytes;
38                 let len = self.0.data.refcounted.length;
39                 std::slice::from_raw_parts(start, len)
40             } else {
41                 let len = self.0.data.inlined.length;
42                 &self.0.data.inlined.bytes[..len as usize]
43             }
44         }
45     }
46 
is_empty(&self) -> bool47     pub fn is_empty(&self) -> bool {
48         self.len() == 0
49     }
50 
51     /// Creates a slice from static rust slice.
52     ///
53     /// Same as `From<&[u8]>` but without copying the buffer.
54     #[inline]
from_static_slice(s: &'static [u8]) -> GrpcSlice55     pub fn from_static_slice(s: &'static [u8]) -> GrpcSlice {
56         GrpcSlice(unsafe { grpc_slice_from_static_buffer(s.as_ptr() as _, s.len()) })
57     }
58 
59     /// Creates a `GrpcSlice` from static rust str.
60     ///
61     /// Same as `from_str` but without allocation.
62     #[inline]
from_static_str(s: &'static str) -> GrpcSlice63     pub fn from_static_str(s: &'static str) -> GrpcSlice {
64         GrpcSlice::from_static_slice(s.as_bytes())
65     }
66 
67     /// Checks whether the slice stores bytes inline.
is_inline(&self) -> bool68     pub fn is_inline(&self) -> bool {
69         self.0.refcount.is_null()
70     }
71 
72     /// Reallocates current slice with given capacity.
73     ///
74     /// The length of returned slice is the exact same as given cap.
75     ///
76     /// ## Safety
77     ///
78     /// Caller is expected to initialize all available bytes to guarantee safety of this slice.
realloc(&mut self, cap: usize) -> &mut [MaybeUninit<u8>]79     pub unsafe fn realloc(&mut self, cap: usize) -> &mut [MaybeUninit<u8>] {
80         if cap <= INLINED_SIZE {
81             // Only inlined slice can be reused safely.
82             if !self.0.refcount.is_null() {
83                 *self = GrpcSlice::default();
84             }
85             self.0.data.inlined.length = cap as u8;
86             std::slice::from_raw_parts_mut(
87                 self.0.data.inlined.bytes.as_mut_ptr() as *mut MaybeUninit<u8>,
88                 cap,
89             )
90         } else {
91             *self = GrpcSlice(grpcio_sys::grpc_slice_malloc_large(cap));
92             let start = self.0.data.refcounted.bytes;
93             let len = self.0.data.refcounted.length;
94             std::slice::from_raw_parts_mut(start as *mut MaybeUninit<u8>, len)
95         }
96     }
97 
as_mut_ptr(&mut self) -> *mut grpc_slice98     pub fn as_mut_ptr(&mut self) -> *mut grpc_slice {
99         &mut self.0
100     }
101 }
102 
103 impl Clone for GrpcSlice {
104     /// Clone the slice.
105     ///
106     /// If the slice is not inlined, the reference count will be increased
107     /// instead of copy.
clone(&self) -> Self108     fn clone(&self) -> Self {
109         GrpcSlice(unsafe { grpc_slice_ref(self.0) })
110     }
111 }
112 
113 impl Default for GrpcSlice {
114     /// Returns a default slice, which is empty.
default() -> Self115     fn default() -> Self {
116         GrpcSlice(unsafe { grpc_empty_slice() })
117     }
118 }
119 
120 impl Debug for GrpcSlice {
fmt(&self, f: &mut Formatter) -> fmt::Result121     fn fmt(&self, f: &mut Formatter) -> fmt::Result {
122         self.as_slice().fmt(f)
123     }
124 }
125 
126 impl Drop for GrpcSlice {
drop(&mut self)127     fn drop(&mut self) {
128         unsafe {
129             grpc_slice_unref(self.0);
130         }
131     }
132 }
133 
134 unsafe impl Send for GrpcSlice {}
135 unsafe impl Sync for GrpcSlice {}
136 
137 impl PartialEq<[u8]> for GrpcSlice {
eq(&self, r: &[u8]) -> bool138     fn eq(&self, r: &[u8]) -> bool {
139         // Technically, the equal function inside vtable should be used.
140         // But it's not cheap or safe to create a grpc_slice from rust slice.
141         self.as_slice() == r
142     }
143 }
144 
145 impl PartialEq<GrpcSlice> for GrpcSlice {
eq(&self, r: &GrpcSlice) -> bool146     fn eq(&self, r: &GrpcSlice) -> bool {
147         unsafe { grpc_slice_eq(self.0, r.0) != 0 }
148     }
149 }
150 
drop_vec(ptr: *mut c_void, len: usize)151 unsafe extern "C" fn drop_vec(ptr: *mut c_void, len: usize) {
152     Vec::from_raw_parts(ptr as *mut u8, len, len);
153 }
154 
155 impl From<Vec<u8>> for GrpcSlice {
156     /// Converts a `Vec<u8>` into `GrpcSlice`.
157     ///
158     /// If v can't fit inline, there will be allocations.
159     #[inline]
from(mut v: Vec<u8>) -> GrpcSlice160     fn from(mut v: Vec<u8>) -> GrpcSlice {
161         if v.is_empty() {
162             return GrpcSlice::default();
163         }
164 
165         if v.len() == v.capacity() {
166             let slice = unsafe {
167                 grpcio_sys::grpc_slice_new_with_len(v.as_mut_ptr() as _, v.len(), Some(drop_vec))
168             };
169             mem::forget(v);
170             return GrpcSlice(slice);
171         }
172 
173         unsafe {
174             GrpcSlice(grpcio_sys::grpc_slice_from_copied_buffer(
175                 v.as_mut_ptr() as _,
176                 v.len(),
177             ))
178         }
179     }
180 }
181 
182 /// Creates a `GrpcSlice` from rust string.
183 ///
184 /// If the string can't fit inline, there will be allocations.
185 impl From<String> for GrpcSlice {
186     #[inline]
from(s: String) -> GrpcSlice187     fn from(s: String) -> GrpcSlice {
188         GrpcSlice::from(s.into_bytes())
189     }
190 }
191 
192 /// Creates a `GrpcSlice` from rust cstring.
193 ///
194 /// If the cstring can't fit inline, there will be allocations.
195 impl From<CString> for GrpcSlice {
196     #[inline]
from(s: CString) -> GrpcSlice197     fn from(s: CString) -> GrpcSlice {
198         GrpcSlice::from(s.into_bytes())
199     }
200 }
201 
202 /// Creates a `GrpcSlice` from rust slice.
203 ///
204 /// The data inside slice will be cloned. If the data can't fit inline,
205 /// necessary buffer will be allocated.
206 impl From<&'_ [u8]> for GrpcSlice {
207     #[inline]
from(s: &'_ [u8]) -> GrpcSlice208     fn from(s: &'_ [u8]) -> GrpcSlice {
209         GrpcSlice(unsafe { grpc_slice_from_copied_buffer(s.as_ptr() as _, s.len()) })
210     }
211 }
212 
213 /// Creates a `GrpcSlice` from rust str.
214 ///
215 /// The data inside str will be cloned. If the data can't fit inline,
216 /// necessary buffer will be allocated.
217 impl From<&'_ str> for GrpcSlice {
218     #[inline]
from(s: &'_ str) -> GrpcSlice219     fn from(s: &'_ str) -> GrpcSlice {
220         GrpcSlice::from(s.as_bytes())
221     }
222 }
223 
224 /// Creates a `GrpcSlice` from rust `CStr`.
225 ///
226 /// The data inside `CStr` will be cloned. If the data can't fit inline,
227 /// necessary buffer will be allocated.
228 impl From<&'_ CStr> for GrpcSlice {
229     #[inline]
from(s: &'_ CStr) -> GrpcSlice230     fn from(s: &'_ CStr) -> GrpcSlice {
231         GrpcSlice::from(s.to_bytes())
232     }
233 }
234 
235 /// A collection of `GrpcBytes`.
236 #[repr(C)]
237 pub struct GrpcByteBuffer(*mut grpc_byte_buffer);
238 
239 impl GrpcByteBuffer {
240     #[inline]
from_raw(ptr: *mut grpc_byte_buffer) -> GrpcByteBuffer241     pub unsafe fn from_raw(ptr: *mut grpc_byte_buffer) -> GrpcByteBuffer {
242         GrpcByteBuffer(ptr)
243     }
244 }
245 
246 impl<'a> From<&'a [GrpcSlice]> for GrpcByteBuffer {
247     /// Create a buffer from the given slice array.
248     ///
249     /// A buffer is allocated for the whole slice array, and every slice will
250     /// be `Clone::clone` into the buffer.
from(slice: &'a [GrpcSlice]) -> Self251     fn from(slice: &'a [GrpcSlice]) -> Self {
252         let len = slice.len();
253         unsafe {
254             let s = slice.as_ptr() as *const grpc_slice as *const UnsafeCell<grpc_slice>;
255             // hack: see From<&GrpcSlice>.
256             GrpcByteBuffer(grpc_raw_byte_buffer_create((*s).get(), len))
257         }
258     }
259 }
260 
261 impl<'a> From<&'a GrpcSlice> for GrpcByteBuffer {
262     /// Create a buffer from the given single slice.
263     ///
264     /// A buffer, which length is 1, is allocated for the slice.
265     #[allow(clippy::cast_ref_to_mut)]
from(s: &'a GrpcSlice) -> GrpcByteBuffer266     fn from(s: &'a GrpcSlice) -> GrpcByteBuffer {
267         unsafe {
268             // hack: buffer_create accepts an mutable pointer to indicate it mutate
269             // ref count. Ref count is recorded by atomic variable, which is considered
270             // `Sync` in rust. This is an interesting difference in what is *mutable*
271             // between C++ and rust.
272             // Using `UnsafeCell` to avoid raw cast, which is UB.
273             let s = &*(s as *const GrpcSlice as *const grpc_slice as *const UnsafeCell<grpc_slice>);
274             GrpcByteBuffer(grpc_raw_byte_buffer_create((*s).get(), 1))
275         }
276     }
277 }
278 
279 impl Clone for GrpcByteBuffer {
clone(&self) -> Self280     fn clone(&self) -> Self {
281         unsafe { GrpcByteBuffer(grpc_byte_buffer_copy(self.0)) }
282     }
283 }
284 
285 impl Drop for GrpcByteBuffer {
drop(&mut self)286     fn drop(&mut self) {
287         unsafe { grpc_byte_buffer_destroy(self.0) }
288     }
289 }
290 
291 /// A zero-copy reader for the message payload.
292 ///
293 /// To achieve zero-copy, use the BufRead API `fill_buf` and `consume`
294 /// to operate the reader.
295 #[repr(C)]
296 pub struct GrpcByteBufferReader {
297     reader: grpc_byte_buffer_reader,
298     /// Current reading buffer.
299     // This is a temporary buffer that may need to be dropped before every
300     // iteration. So use `ManuallyDrop` to control the behavior more clean
301     // and precisely.
302     slice: ManuallyDrop<GrpcSlice>,
303     /// The offset of `slice` that has not been read.
304     offset: usize,
305     /// How many bytes pending for reading.
306     remain: usize,
307 }
308 
309 impl GrpcByteBufferReader {
310     /// Creates a reader for the `GrpcByteBuffer`.
311     ///
312     /// `buf` is stored inside the reader, and dropped when the reader is dropped.
new(buf: GrpcByteBuffer) -> GrpcByteBufferReader313     pub fn new(buf: GrpcByteBuffer) -> GrpcByteBufferReader {
314         let mut reader = MaybeUninit::uninit();
315         let mut s = MaybeUninit::uninit();
316         unsafe {
317             let code = grpc_byte_buffer_reader_init(reader.as_mut_ptr(), buf.0);
318             assert_eq!(code, 1);
319             if 0 == grpc_byte_buffer_reader_next(reader.as_mut_ptr(), s.as_mut_ptr()) {
320                 s.as_mut_ptr().write(grpc_empty_slice());
321             }
322             let remain = grpc_byte_buffer_length((*reader.as_mut_ptr()).buffer_out);
323             // buf is stored inside `reader` as `buffer_in`, so do not drop it.
324             mem::forget(buf);
325 
326             GrpcByteBufferReader {
327                 reader: reader.assume_init(),
328                 slice: ManuallyDrop::new(GrpcSlice(s.assume_init())),
329                 offset: 0,
330                 remain,
331             }
332         }
333     }
334 
335     /// Get the next slice from reader.
load_next_slice(&mut self)336     fn load_next_slice(&mut self) {
337         unsafe {
338             ManuallyDrop::drop(&mut self.slice);
339             if 0 == grpc_byte_buffer_reader_next(&mut self.reader, &mut self.slice.0) {
340                 self.slice = ManuallyDrop::new(GrpcSlice::default());
341             }
342         }
343         self.offset = 0;
344     }
345 
346     #[inline]
len(&self) -> usize347     pub fn len(&self) -> usize {
348         self.remain
349     }
350 
351     #[inline]
is_empty(&self) -> bool352     pub fn is_empty(&self) -> bool {
353         self.remain == 0
354     }
355 }
356 
357 impl Read for GrpcByteBufferReader {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>358     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
359         let read = self.fill_buf()?.read(buf)?;
360         self.consume(read);
361         Ok(read)
362     }
363 
read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize>364     fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
365         let cap = self.remain;
366         buf.reserve(cap);
367         let old_len = buf.len();
368         while self.remain > 0 {
369             let read = {
370                 let s = match self.fill_buf() {
371                     Ok(s) => s,
372                     Err(e) => {
373                         unsafe {
374                             buf.set_len(old_len);
375                         }
376                         return Err(e);
377                     }
378                 };
379                 buf.extend_from_slice(s);
380                 s.len()
381             };
382             self.consume(read);
383         }
384         Ok(cap)
385     }
386 }
387 
388 impl BufRead for GrpcByteBufferReader {
389     #[inline]
fill_buf(&mut self) -> io::Result<&[u8]>390     fn fill_buf(&mut self) -> io::Result<&[u8]> {
391         if self.slice.is_empty() {
392             return Ok(&[]);
393         }
394         Ok(unsafe { self.slice.as_slice().get_unchecked(self.offset..) })
395     }
396 
consume(&mut self, mut amt: usize)397     fn consume(&mut self, mut amt: usize) {
398         if amt > self.remain {
399             amt = self.remain;
400         }
401         self.remain -= amt;
402         let mut offset = self.offset + amt;
403         while offset >= self.slice.len() && offset > 0 {
404             offset -= self.slice.len();
405             self.load_next_slice();
406         }
407         self.offset = offset;
408     }
409 }
410 
411 impl Drop for GrpcByteBufferReader {
drop(&mut self)412     fn drop(&mut self) {
413         unsafe {
414             grpc_byte_buffer_reader_destroy(&mut self.reader);
415             ManuallyDrop::drop(&mut self.slice);
416             grpc_byte_buffer_destroy(self.reader.buffer_in);
417         }
418     }
419 }
420 
421 unsafe impl Sync for GrpcByteBufferReader {}
422 unsafe impl Send for GrpcByteBufferReader {}
423 
424 #[cfg(feature = "prost-codec")]
425 impl bytes::Buf for GrpcByteBufferReader {
remaining(&self) -> usize426     fn remaining(&self) -> usize {
427         self.remain
428     }
429 
chunk(&self) -> &[u8]430     fn chunk(&self) -> &[u8] {
431         // This is similar but not identical to `BuffRead::fill_buf`, since `self`
432         // is not mutable, we can only return bytes up to the end of the current
433         // slice.
434 
435         // Optimization for empty slice
436         if self.slice.is_empty() {
437             return &[];
438         }
439 
440         unsafe { self.slice.as_slice().get_unchecked(self.offset..) }
441     }
442 
advance(&mut self, cnt: usize)443     fn advance(&mut self, cnt: usize) {
444         self.consume(cnt);
445     }
446 }
447 
448 #[cfg(test)]
449 mod tests {
450     use super::*;
451 
new_message_reader(seed: Vec<u8>, copy_count: usize) -> GrpcByteBufferReader452     fn new_message_reader(seed: Vec<u8>, copy_count: usize) -> GrpcByteBufferReader {
453         let data = vec![GrpcSlice::from(seed); copy_count];
454         let buf = GrpcByteBuffer::from(data.as_slice());
455         GrpcByteBufferReader::new(buf)
456     }
457 
458     #[test]
test_grpc_slice()459     fn test_grpc_slice() {
460         let empty = GrpcSlice::default();
461         assert!(empty.is_empty());
462         assert_eq!(empty.len(), 0);
463         assert!(empty.as_slice().is_empty());
464 
465         let a = vec![0, 2, 1, 3, 8];
466         let slice = GrpcSlice::from(a.clone());
467         assert_eq!(a.as_slice(), slice.as_slice());
468         assert_eq!(a.len(), slice.len());
469         assert_eq!(&slice, &*a);
470 
471         let a = vec![5; 64];
472         let slice = GrpcSlice::from(a.clone());
473         assert_eq!(a.as_slice(), slice.as_slice());
474         assert_eq!(a.len(), slice.len());
475         assert_eq!(&slice, &*a);
476 
477         let a = vec![];
478         let slice = GrpcSlice::from(a);
479         assert_eq!(empty, slice);
480     }
481 
482     #[test]
483     // Old code crashes under a very weird circumstance, due to a typo in `MessageReader::consume`
test_typo_len_offset()484     fn test_typo_len_offset() {
485         let data = vec![1, 2, 3, 4, 5, 6, 7, 8];
486         // half of the size of `data`
487         let half_size = data.len() / 2;
488         let slice = GrpcSlice::from(data.clone());
489         let buffer = GrpcByteBuffer::from(&slice);
490         let mut reader = GrpcByteBufferReader::new(buffer);
491         assert_eq!(reader.len(), data.len());
492         // first 3 elements of `data`
493         let mut buf = vec![0; half_size];
494         reader.read(buf.as_mut_slice()).unwrap();
495         assert_eq!(data[..half_size], *buf.as_slice());
496         assert_eq!(reader.len(), data.len() - half_size);
497         assert!(!reader.is_empty());
498         reader.read(&mut buf).unwrap();
499         assert_eq!(data[half_size..], *buf.as_slice());
500         assert_eq!(reader.len(), 0);
501         assert!(reader.is_empty());
502     }
503 
504     #[test]
test_message_reader()505     fn test_message_reader() {
506         for len in 0..=1024 {
507             for n_slice in 1..=4 {
508                 let source = vec![len as u8; len];
509                 let expect = vec![len as u8; len * n_slice];
510                 // Test read.
511                 let mut reader = new_message_reader(source.clone(), n_slice);
512                 let mut dest = [0; 7];
513                 let amt = reader.read(&mut dest).unwrap();
514 
515                 assert_eq!(dest[..amt], expect[..amt], "len: {len}, nslice: {n_slice}");
516 
517                 // Read after move.
518                 let mut box_reader = Box::new(reader);
519                 let amt = box_reader.read(&mut dest).unwrap();
520                 assert_eq!(dest[..amt], expect[..amt], "len: {len}, nslice: {n_slice}");
521 
522                 // Test read_to_end.
523                 let mut reader = new_message_reader(source.clone(), n_slice);
524                 let mut dest = vec![];
525                 reader.read_to_end(&mut dest).unwrap();
526                 assert_eq!(dest, expect, "len: {len}, nslice: {n_slice}");
527 
528                 assert_eq!(0, reader.len());
529                 assert_eq!(0, reader.read(&mut [1]).unwrap());
530 
531                 // Test arbitrary consuming.
532                 let mut reader = new_message_reader(source.clone(), n_slice);
533                 reader.consume(source.len() * (n_slice - 1));
534                 let mut dest = vec![];
535                 reader.read_to_end(&mut dest).unwrap();
536                 assert_eq!(dest.len(), source.len(), "len: {len}, nslice: {n_slice}");
537                 assert_eq!(
538                     *dest,
539                     expect[expect.len() - source.len()..],
540                     "len: {len}, nslice: {n_slice}"
541                 );
542                 assert_eq!(0, reader.len());
543                 assert_eq!(0, reader.read(&mut [1]).unwrap());
544             }
545         }
546     }
547 
548     #[test]
test_converter()549     fn test_converter() {
550         let a = vec![1, 2, 3, 0];
551         assert_eq!(GrpcSlice::from(a.clone()).as_slice(), a.as_slice());
552         assert_eq!(GrpcSlice::from(a.as_slice()).as_slice(), a.as_slice());
553 
554         let s = "abcd".to_owned();
555         assert_eq!(GrpcSlice::from(s.clone()).as_slice(), s.as_bytes());
556         assert_eq!(GrpcSlice::from(s.as_str()).as_slice(), s.as_bytes());
557 
558         let cs = CString::new(s.clone()).unwrap();
559         assert_eq!(GrpcSlice::from(cs.clone()).as_slice(), s.as_bytes());
560         assert_eq!(GrpcSlice::from(cs.as_c_str()).as_slice(), s.as_bytes());
561     }
562 
563     #[cfg(feature = "prost-codec")]
564     #[test]
test_buf_impl()565     fn test_buf_impl() {
566         use bytes::Buf;
567 
568         for len in 0..1024 + 1 {
569             for n_slice in 1..4 {
570                 let source = vec![len as u8; len];
571 
572                 let mut reader = new_message_reader(source.clone(), n_slice);
573 
574                 let mut remaining = len * n_slice;
575                 let mut count = 100;
576                 while reader.remaining() > 0 {
577                     assert_eq!(remaining, reader.remaining());
578                     let bytes = Buf::chunk(&reader);
579                     bytes.iter().for_each(|b| assert_eq!(*b, len as u8));
580                     let mut read = bytes.len();
581                     // We don't have to advance by the whole amount we read.
582                     if read > 5 && len % 2 == 0 {
583                         read -= 5;
584                     }
585                     reader.advance(read);
586                     remaining -= read;
587                     count -= 1;
588                     assert!(count > 0);
589                 }
590 
591                 assert_eq!(0, remaining);
592                 assert_eq!(0, reader.remaining());
593             }
594         }
595     }
596 }
597