1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3 use grpcio_sys::*;
4 use std::cell::UnsafeCell;
5 use std::ffi::{c_void, CStr, CString};
6 use std::fmt::{self, Debug, Formatter};
7 use std::io::{self, BufRead, Read};
8 use std::mem::{self, ManuallyDrop, MaybeUninit};
9
10 /// Copied from grpc-sys/grpc/include/grpc/impl/codegen/slice.h. Unfortunately bindgen doesn't
11 /// generate it automatically.
12 const INLINED_SIZE: usize = mem::size_of::<libc::size_t>() + mem::size_of::<*mut u8>() - 1
13 + mem::size_of::<*mut libc::c_void>();
14
15 /// A convenient rust wrapper for the type `grpc_slice`.
16 ///
17 /// It's expected that the slice should be initialized.
18 #[repr(transparent)]
19 pub struct GrpcSlice(grpc_slice);
20
21 impl GrpcSlice {
22 /// Get the length of the data.
len(&self) -> usize23 pub fn len(&self) -> usize {
24 unsafe {
25 if !self.0.refcount.is_null() {
26 self.0.data.refcounted.length
27 } else {
28 self.0.data.inlined.length as usize
29 }
30 }
31 }
32
33 /// Returns a slice of inner buffer.
as_slice(&self) -> &[u8]34 pub fn as_slice(&self) -> &[u8] {
35 unsafe {
36 if !self.0.refcount.is_null() {
37 let start = self.0.data.refcounted.bytes;
38 let len = self.0.data.refcounted.length;
39 std::slice::from_raw_parts(start, len)
40 } else {
41 let len = self.0.data.inlined.length;
42 &self.0.data.inlined.bytes[..len as usize]
43 }
44 }
45 }
46
is_empty(&self) -> bool47 pub fn is_empty(&self) -> bool {
48 self.len() == 0
49 }
50
51 /// Creates a slice from static rust slice.
52 ///
53 /// Same as `From<&[u8]>` but without copying the buffer.
54 #[inline]
from_static_slice(s: &'static [u8]) -> GrpcSlice55 pub fn from_static_slice(s: &'static [u8]) -> GrpcSlice {
56 GrpcSlice(unsafe { grpc_slice_from_static_buffer(s.as_ptr() as _, s.len()) })
57 }
58
59 /// Creates a `GrpcSlice` from static rust str.
60 ///
61 /// Same as `from_str` but without allocation.
62 #[inline]
from_static_str(s: &'static str) -> GrpcSlice63 pub fn from_static_str(s: &'static str) -> GrpcSlice {
64 GrpcSlice::from_static_slice(s.as_bytes())
65 }
66
67 /// Checks whether the slice stores bytes inline.
is_inline(&self) -> bool68 pub fn is_inline(&self) -> bool {
69 self.0.refcount.is_null()
70 }
71
72 /// Reallocates current slice with given capacity.
73 ///
74 /// The length of returned slice is the exact same as given cap.
75 ///
76 /// ## Safety
77 ///
78 /// Caller is expected to initialize all available bytes to guarantee safety of this slice.
realloc(&mut self, cap: usize) -> &mut [MaybeUninit<u8>]79 pub unsafe fn realloc(&mut self, cap: usize) -> &mut [MaybeUninit<u8>] {
80 if cap <= INLINED_SIZE {
81 // Only inlined slice can be reused safely.
82 if !self.0.refcount.is_null() {
83 *self = GrpcSlice::default();
84 }
85 self.0.data.inlined.length = cap as u8;
86 std::slice::from_raw_parts_mut(
87 self.0.data.inlined.bytes.as_mut_ptr() as *mut MaybeUninit<u8>,
88 cap,
89 )
90 } else {
91 *self = GrpcSlice(grpcio_sys::grpc_slice_malloc_large(cap));
92 let start = self.0.data.refcounted.bytes;
93 let len = self.0.data.refcounted.length;
94 std::slice::from_raw_parts_mut(start as *mut MaybeUninit<u8>, len)
95 }
96 }
97
as_mut_ptr(&mut self) -> *mut grpc_slice98 pub fn as_mut_ptr(&mut self) -> *mut grpc_slice {
99 &mut self.0
100 }
101 }
102
103 impl Clone for GrpcSlice {
104 /// Clone the slice.
105 ///
106 /// If the slice is not inlined, the reference count will be increased
107 /// instead of copy.
clone(&self) -> Self108 fn clone(&self) -> Self {
109 GrpcSlice(unsafe { grpc_slice_ref(self.0) })
110 }
111 }
112
113 impl Default for GrpcSlice {
114 /// Returns a default slice, which is empty.
default() -> Self115 fn default() -> Self {
116 GrpcSlice(unsafe { grpc_empty_slice() })
117 }
118 }
119
120 impl Debug for GrpcSlice {
fmt(&self, f: &mut Formatter) -> fmt::Result121 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
122 self.as_slice().fmt(f)
123 }
124 }
125
126 impl Drop for GrpcSlice {
drop(&mut self)127 fn drop(&mut self) {
128 unsafe {
129 grpc_slice_unref(self.0);
130 }
131 }
132 }
133
134 unsafe impl Send for GrpcSlice {}
135 unsafe impl Sync for GrpcSlice {}
136
137 impl PartialEq<[u8]> for GrpcSlice {
eq(&self, r: &[u8]) -> bool138 fn eq(&self, r: &[u8]) -> bool {
139 // Technically, the equal function inside vtable should be used.
140 // But it's not cheap or safe to create a grpc_slice from rust slice.
141 self.as_slice() == r
142 }
143 }
144
145 impl PartialEq<GrpcSlice> for GrpcSlice {
eq(&self, r: &GrpcSlice) -> bool146 fn eq(&self, r: &GrpcSlice) -> bool {
147 unsafe { grpc_slice_eq(self.0, r.0) != 0 }
148 }
149 }
150
drop_vec(ptr: *mut c_void, len: usize)151 unsafe extern "C" fn drop_vec(ptr: *mut c_void, len: usize) {
152 Vec::from_raw_parts(ptr as *mut u8, len, len);
153 }
154
155 impl From<Vec<u8>> for GrpcSlice {
156 /// Converts a `Vec<u8>` into `GrpcSlice`.
157 ///
158 /// If v can't fit inline, there will be allocations.
159 #[inline]
from(mut v: Vec<u8>) -> GrpcSlice160 fn from(mut v: Vec<u8>) -> GrpcSlice {
161 if v.is_empty() {
162 return GrpcSlice::default();
163 }
164
165 if v.len() == v.capacity() {
166 let slice = unsafe {
167 grpcio_sys::grpc_slice_new_with_len(v.as_mut_ptr() as _, v.len(), Some(drop_vec))
168 };
169 mem::forget(v);
170 return GrpcSlice(slice);
171 }
172
173 unsafe {
174 GrpcSlice(grpcio_sys::grpc_slice_from_copied_buffer(
175 v.as_mut_ptr() as _,
176 v.len(),
177 ))
178 }
179 }
180 }
181
182 /// Creates a `GrpcSlice` from rust string.
183 ///
184 /// If the string can't fit inline, there will be allocations.
185 impl From<String> for GrpcSlice {
186 #[inline]
from(s: String) -> GrpcSlice187 fn from(s: String) -> GrpcSlice {
188 GrpcSlice::from(s.into_bytes())
189 }
190 }
191
192 /// Creates a `GrpcSlice` from rust cstring.
193 ///
194 /// If the cstring can't fit inline, there will be allocations.
195 impl From<CString> for GrpcSlice {
196 #[inline]
from(s: CString) -> GrpcSlice197 fn from(s: CString) -> GrpcSlice {
198 GrpcSlice::from(s.into_bytes())
199 }
200 }
201
202 /// Creates a `GrpcSlice` from rust slice.
203 ///
204 /// The data inside slice will be cloned. If the data can't fit inline,
205 /// necessary buffer will be allocated.
206 impl From<&'_ [u8]> for GrpcSlice {
207 #[inline]
from(s: &'_ [u8]) -> GrpcSlice208 fn from(s: &'_ [u8]) -> GrpcSlice {
209 GrpcSlice(unsafe { grpc_slice_from_copied_buffer(s.as_ptr() as _, s.len()) })
210 }
211 }
212
213 /// Creates a `GrpcSlice` from rust str.
214 ///
215 /// The data inside str will be cloned. If the data can't fit inline,
216 /// necessary buffer will be allocated.
217 impl From<&'_ str> for GrpcSlice {
218 #[inline]
from(s: &'_ str) -> GrpcSlice219 fn from(s: &'_ str) -> GrpcSlice {
220 GrpcSlice::from(s.as_bytes())
221 }
222 }
223
224 /// Creates a `GrpcSlice` from rust `CStr`.
225 ///
226 /// The data inside `CStr` will be cloned. If the data can't fit inline,
227 /// necessary buffer will be allocated.
228 impl From<&'_ CStr> for GrpcSlice {
229 #[inline]
from(s: &'_ CStr) -> GrpcSlice230 fn from(s: &'_ CStr) -> GrpcSlice {
231 GrpcSlice::from(s.to_bytes())
232 }
233 }
234
235 /// A collection of `GrpcBytes`.
236 #[repr(C)]
237 pub struct GrpcByteBuffer(*mut grpc_byte_buffer);
238
239 impl GrpcByteBuffer {
240 #[inline]
from_raw(ptr: *mut grpc_byte_buffer) -> GrpcByteBuffer241 pub unsafe fn from_raw(ptr: *mut grpc_byte_buffer) -> GrpcByteBuffer {
242 GrpcByteBuffer(ptr)
243 }
244 }
245
246 impl<'a> From<&'a [GrpcSlice]> for GrpcByteBuffer {
247 /// Create a buffer from the given slice array.
248 ///
249 /// A buffer is allocated for the whole slice array, and every slice will
250 /// be `Clone::clone` into the buffer.
from(slice: &'a [GrpcSlice]) -> Self251 fn from(slice: &'a [GrpcSlice]) -> Self {
252 let len = slice.len();
253 unsafe {
254 let s = slice.as_ptr() as *const grpc_slice as *const UnsafeCell<grpc_slice>;
255 // hack: see From<&GrpcSlice>.
256 GrpcByteBuffer(grpc_raw_byte_buffer_create((*s).get(), len))
257 }
258 }
259 }
260
261 impl<'a> From<&'a GrpcSlice> for GrpcByteBuffer {
262 /// Create a buffer from the given single slice.
263 ///
264 /// A buffer, which length is 1, is allocated for the slice.
265 #[allow(clippy::cast_ref_to_mut)]
from(s: &'a GrpcSlice) -> GrpcByteBuffer266 fn from(s: &'a GrpcSlice) -> GrpcByteBuffer {
267 unsafe {
268 // hack: buffer_create accepts an mutable pointer to indicate it mutate
269 // ref count. Ref count is recorded by atomic variable, which is considered
270 // `Sync` in rust. This is an interesting difference in what is *mutable*
271 // between C++ and rust.
272 // Using `UnsafeCell` to avoid raw cast, which is UB.
273 let s = &*(s as *const GrpcSlice as *const grpc_slice as *const UnsafeCell<grpc_slice>);
274 GrpcByteBuffer(grpc_raw_byte_buffer_create((*s).get(), 1))
275 }
276 }
277 }
278
279 impl Clone for GrpcByteBuffer {
clone(&self) -> Self280 fn clone(&self) -> Self {
281 unsafe { GrpcByteBuffer(grpc_byte_buffer_copy(self.0)) }
282 }
283 }
284
285 impl Drop for GrpcByteBuffer {
drop(&mut self)286 fn drop(&mut self) {
287 unsafe { grpc_byte_buffer_destroy(self.0) }
288 }
289 }
290
291 /// A zero-copy reader for the message payload.
292 ///
293 /// To achieve zero-copy, use the BufRead API `fill_buf` and `consume`
294 /// to operate the reader.
295 #[repr(C)]
296 pub struct GrpcByteBufferReader {
297 reader: grpc_byte_buffer_reader,
298 /// Current reading buffer.
299 // This is a temporary buffer that may need to be dropped before every
300 // iteration. So use `ManuallyDrop` to control the behavior more clean
301 // and precisely.
302 slice: ManuallyDrop<GrpcSlice>,
303 /// The offset of `slice` that has not been read.
304 offset: usize,
305 /// How many bytes pending for reading.
306 remain: usize,
307 }
308
309 impl GrpcByteBufferReader {
310 /// Creates a reader for the `GrpcByteBuffer`.
311 ///
312 /// `buf` is stored inside the reader, and dropped when the reader is dropped.
new(buf: GrpcByteBuffer) -> GrpcByteBufferReader313 pub fn new(buf: GrpcByteBuffer) -> GrpcByteBufferReader {
314 let mut reader = MaybeUninit::uninit();
315 let mut s = MaybeUninit::uninit();
316 unsafe {
317 let code = grpc_byte_buffer_reader_init(reader.as_mut_ptr(), buf.0);
318 assert_eq!(code, 1);
319 if 0 == grpc_byte_buffer_reader_next(reader.as_mut_ptr(), s.as_mut_ptr()) {
320 s.as_mut_ptr().write(grpc_empty_slice());
321 }
322 let remain = grpc_byte_buffer_length((*reader.as_mut_ptr()).buffer_out);
323 // buf is stored inside `reader` as `buffer_in`, so do not drop it.
324 mem::forget(buf);
325
326 GrpcByteBufferReader {
327 reader: reader.assume_init(),
328 slice: ManuallyDrop::new(GrpcSlice(s.assume_init())),
329 offset: 0,
330 remain,
331 }
332 }
333 }
334
335 /// Get the next slice from reader.
load_next_slice(&mut self)336 fn load_next_slice(&mut self) {
337 unsafe {
338 ManuallyDrop::drop(&mut self.slice);
339 if 0 == grpc_byte_buffer_reader_next(&mut self.reader, &mut self.slice.0) {
340 self.slice = ManuallyDrop::new(GrpcSlice::default());
341 }
342 }
343 self.offset = 0;
344 }
345
346 #[inline]
len(&self) -> usize347 pub fn len(&self) -> usize {
348 self.remain
349 }
350
351 #[inline]
is_empty(&self) -> bool352 pub fn is_empty(&self) -> bool {
353 self.remain == 0
354 }
355 }
356
357 impl Read for GrpcByteBufferReader {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>358 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
359 let read = self.fill_buf()?.read(buf)?;
360 self.consume(read);
361 Ok(read)
362 }
363
read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize>364 fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
365 let cap = self.remain;
366 buf.reserve(cap);
367 let old_len = buf.len();
368 while self.remain > 0 {
369 let read = {
370 let s = match self.fill_buf() {
371 Ok(s) => s,
372 Err(e) => {
373 unsafe {
374 buf.set_len(old_len);
375 }
376 return Err(e);
377 }
378 };
379 buf.extend_from_slice(s);
380 s.len()
381 };
382 self.consume(read);
383 }
384 Ok(cap)
385 }
386 }
387
388 impl BufRead for GrpcByteBufferReader {
389 #[inline]
fill_buf(&mut self) -> io::Result<&[u8]>390 fn fill_buf(&mut self) -> io::Result<&[u8]> {
391 if self.slice.is_empty() {
392 return Ok(&[]);
393 }
394 Ok(unsafe { self.slice.as_slice().get_unchecked(self.offset..) })
395 }
396
consume(&mut self, mut amt: usize)397 fn consume(&mut self, mut amt: usize) {
398 if amt > self.remain {
399 amt = self.remain;
400 }
401 self.remain -= amt;
402 let mut offset = self.offset + amt;
403 while offset >= self.slice.len() && offset > 0 {
404 offset -= self.slice.len();
405 self.load_next_slice();
406 }
407 self.offset = offset;
408 }
409 }
410
411 impl Drop for GrpcByteBufferReader {
drop(&mut self)412 fn drop(&mut self) {
413 unsafe {
414 grpc_byte_buffer_reader_destroy(&mut self.reader);
415 ManuallyDrop::drop(&mut self.slice);
416 grpc_byte_buffer_destroy(self.reader.buffer_in);
417 }
418 }
419 }
420
421 unsafe impl Sync for GrpcByteBufferReader {}
422 unsafe impl Send for GrpcByteBufferReader {}
423
424 #[cfg(feature = "prost-codec")]
425 impl bytes::Buf for GrpcByteBufferReader {
remaining(&self) -> usize426 fn remaining(&self) -> usize {
427 self.remain
428 }
429
chunk(&self) -> &[u8]430 fn chunk(&self) -> &[u8] {
431 // This is similar but not identical to `BuffRead::fill_buf`, since `self`
432 // is not mutable, we can only return bytes up to the end of the current
433 // slice.
434
435 // Optimization for empty slice
436 if self.slice.is_empty() {
437 return &[];
438 }
439
440 unsafe { self.slice.as_slice().get_unchecked(self.offset..) }
441 }
442
advance(&mut self, cnt: usize)443 fn advance(&mut self, cnt: usize) {
444 self.consume(cnt);
445 }
446 }
447
448 #[cfg(test)]
449 mod tests {
450 use super::*;
451
new_message_reader(seed: Vec<u8>, copy_count: usize) -> GrpcByteBufferReader452 fn new_message_reader(seed: Vec<u8>, copy_count: usize) -> GrpcByteBufferReader {
453 let data = vec![GrpcSlice::from(seed); copy_count];
454 let buf = GrpcByteBuffer::from(data.as_slice());
455 GrpcByteBufferReader::new(buf)
456 }
457
458 #[test]
test_grpc_slice()459 fn test_grpc_slice() {
460 let empty = GrpcSlice::default();
461 assert!(empty.is_empty());
462 assert_eq!(empty.len(), 0);
463 assert!(empty.as_slice().is_empty());
464
465 let a = vec![0, 2, 1, 3, 8];
466 let slice = GrpcSlice::from(a.clone());
467 assert_eq!(a.as_slice(), slice.as_slice());
468 assert_eq!(a.len(), slice.len());
469 assert_eq!(&slice, &*a);
470
471 let a = vec![5; 64];
472 let slice = GrpcSlice::from(a.clone());
473 assert_eq!(a.as_slice(), slice.as_slice());
474 assert_eq!(a.len(), slice.len());
475 assert_eq!(&slice, &*a);
476
477 let a = vec![];
478 let slice = GrpcSlice::from(a);
479 assert_eq!(empty, slice);
480 }
481
482 #[test]
483 // Old code crashes under a very weird circumstance, due to a typo in `MessageReader::consume`
test_typo_len_offset()484 fn test_typo_len_offset() {
485 let data = vec![1, 2, 3, 4, 5, 6, 7, 8];
486 // half of the size of `data`
487 let half_size = data.len() / 2;
488 let slice = GrpcSlice::from(data.clone());
489 let buffer = GrpcByteBuffer::from(&slice);
490 let mut reader = GrpcByteBufferReader::new(buffer);
491 assert_eq!(reader.len(), data.len());
492 // first 3 elements of `data`
493 let mut buf = vec![0; half_size];
494 reader.read(buf.as_mut_slice()).unwrap();
495 assert_eq!(data[..half_size], *buf.as_slice());
496 assert_eq!(reader.len(), data.len() - half_size);
497 assert!(!reader.is_empty());
498 reader.read(&mut buf).unwrap();
499 assert_eq!(data[half_size..], *buf.as_slice());
500 assert_eq!(reader.len(), 0);
501 assert!(reader.is_empty());
502 }
503
504 #[test]
test_message_reader()505 fn test_message_reader() {
506 for len in 0..=1024 {
507 for n_slice in 1..=4 {
508 let source = vec![len as u8; len];
509 let expect = vec![len as u8; len * n_slice];
510 // Test read.
511 let mut reader = new_message_reader(source.clone(), n_slice);
512 let mut dest = [0; 7];
513 let amt = reader.read(&mut dest).unwrap();
514
515 assert_eq!(dest[..amt], expect[..amt], "len: {len}, nslice: {n_slice}");
516
517 // Read after move.
518 let mut box_reader = Box::new(reader);
519 let amt = box_reader.read(&mut dest).unwrap();
520 assert_eq!(dest[..amt], expect[..amt], "len: {len}, nslice: {n_slice}");
521
522 // Test read_to_end.
523 let mut reader = new_message_reader(source.clone(), n_slice);
524 let mut dest = vec![];
525 reader.read_to_end(&mut dest).unwrap();
526 assert_eq!(dest, expect, "len: {len}, nslice: {n_slice}");
527
528 assert_eq!(0, reader.len());
529 assert_eq!(0, reader.read(&mut [1]).unwrap());
530
531 // Test arbitrary consuming.
532 let mut reader = new_message_reader(source.clone(), n_slice);
533 reader.consume(source.len() * (n_slice - 1));
534 let mut dest = vec![];
535 reader.read_to_end(&mut dest).unwrap();
536 assert_eq!(dest.len(), source.len(), "len: {len}, nslice: {n_slice}");
537 assert_eq!(
538 *dest,
539 expect[expect.len() - source.len()..],
540 "len: {len}, nslice: {n_slice}"
541 );
542 assert_eq!(0, reader.len());
543 assert_eq!(0, reader.read(&mut [1]).unwrap());
544 }
545 }
546 }
547
548 #[test]
test_converter()549 fn test_converter() {
550 let a = vec![1, 2, 3, 0];
551 assert_eq!(GrpcSlice::from(a.clone()).as_slice(), a.as_slice());
552 assert_eq!(GrpcSlice::from(a.as_slice()).as_slice(), a.as_slice());
553
554 let s = "abcd".to_owned();
555 assert_eq!(GrpcSlice::from(s.clone()).as_slice(), s.as_bytes());
556 assert_eq!(GrpcSlice::from(s.as_str()).as_slice(), s.as_bytes());
557
558 let cs = CString::new(s.clone()).unwrap();
559 assert_eq!(GrpcSlice::from(cs.clone()).as_slice(), s.as_bytes());
560 assert_eq!(GrpcSlice::from(cs.as_c_str()).as_slice(), s.as_bytes());
561 }
562
563 #[cfg(feature = "prost-codec")]
564 #[test]
test_buf_impl()565 fn test_buf_impl() {
566 use bytes::Buf;
567
568 for len in 0..1024 + 1 {
569 for n_slice in 1..4 {
570 let source = vec![len as u8; len];
571
572 let mut reader = new_message_reader(source.clone(), n_slice);
573
574 let mut remaining = len * n_slice;
575 let mut count = 100;
576 while reader.remaining() > 0 {
577 assert_eq!(remaining, reader.remaining());
578 let bytes = Buf::chunk(&reader);
579 bytes.iter().for_each(|b| assert_eq!(*b, len as u8));
580 let mut read = bytes.len();
581 // We don't have to advance by the whole amount we read.
582 if read > 5 && len % 2 == 0 {
583 read -= 5;
584 }
585 reader.advance(read);
586 remaining -= read;
587 count -= 1;
588 assert!(count > 0);
589 }
590
591 assert_eq!(0, remaining);
592 assert_eq!(0, reader.remaining());
593 }
594 }
595 }
596 }
597