// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. pub mod client; pub mod server; use std::ffi::CStr; use std::fmt::{self, Debug, Display}; use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use std::{ptr, slice}; use crate::grpc_sys::{self, grpc_call, grpc_call_error, grpcwrap_batch_context}; use crate::metadata::UnownedMetadata; use crate::{cq::CompletionQueue, Metadata, MetadataBuilder}; use futures_util::ready; use libc::c_void; use parking_lot::Mutex; use crate::buf::{GrpcByteBuffer, GrpcByteBufferReader, GrpcSlice}; use crate::codec::{DeserializeFn, Marshaller, SerializeFn}; use crate::error::{Error, Result}; use crate::grpc_sys::grpc_status_code::*; use crate::task::{self, BatchFuture, BatchResult, BatchType, CallTag}; /// An gRPC status code structure. /// This type contains constants for all gRPC status codes. #[derive(PartialEq, Eq, Clone, Copy)] pub struct RpcStatusCode(i32); impl From for RpcStatusCode { fn from(code: i32) -> RpcStatusCode { RpcStatusCode(code) } } impl From for i32 { fn from(code: RpcStatusCode) -> i32 { code.0 } } impl Display for RpcStatusCode { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { Debug::fmt(self, f) } } macro_rules! status_codes { ( $( ($num:path, $konst:ident); )+ ) => { impl RpcStatusCode { $( pub const $konst: RpcStatusCode = RpcStatusCode($num); )+ } impl Debug for RpcStatusCode { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, "{}-{}", self.0, match self { $(RpcStatusCode($num) => stringify!($konst),)+ RpcStatusCode(_) => "INVALID_STATUS_CODE", } ) } } } } status_codes! { (GRPC_STATUS_OK, OK); (GRPC_STATUS_CANCELLED, CANCELLED); (GRPC_STATUS_UNKNOWN, UNKNOWN); (GRPC_STATUS_INVALID_ARGUMENT, INVALID_ARGUMENT); (GRPC_STATUS_DEADLINE_EXCEEDED, DEADLINE_EXCEEDED); (GRPC_STATUS_NOT_FOUND, NOT_FOUND); (GRPC_STATUS_ALREADY_EXISTS, ALREADY_EXISTS); (GRPC_STATUS_PERMISSION_DENIED, PERMISSION_DENIED); (GRPC_STATUS_RESOURCE_EXHAUSTED, RESOURCE_EXHAUSTED); (GRPC_STATUS_FAILED_PRECONDITION, FAILED_PRECONDITION); (GRPC_STATUS_ABORTED, ABORTED); (GRPC_STATUS_OUT_OF_RANGE, OUT_OF_RANGE); (GRPC_STATUS_UNIMPLEMENTED, UNIMPLEMENTED); (GRPC_STATUS_INTERNAL, INTERNAL); (GRPC_STATUS_UNAVAILABLE, UNAVAILABLE); (GRPC_STATUS_DATA_LOSS, DATA_LOSS); (GRPC_STATUS_UNAUTHENTICATED, UNAUTHENTICATED); (GRPC_STATUS__DO_NOT_USE, DO_NOT_USE); } /// Method types supported by gRPC. #[derive(Clone, Copy)] pub enum MethodType { /// Single request sent from client, single response received from server. Unary, /// Stream of requests sent from client, single response received from server. ClientStreaming, /// Single request sent from client, stream of responses received from server. ServerStreaming, /// Both server and client can stream arbitrary number of requests and responses simultaneously. Duplex, } /// A description of a remote method. // TODO: add serializer and deserializer. pub struct Method { /// Type of method. pub ty: MethodType, /// Full qualified name of the method. pub name: &'static str, /// The marshaller used for request messages. pub req_mar: Marshaller, /// The marshaller used for response messages. pub resp_mar: Marshaller, } impl Method { /// Get the request serializer. #[inline] pub fn req_ser(&self) -> SerializeFn { self.req_mar.ser } /// Get the request deserializer. #[inline] pub fn req_de(&self) -> DeserializeFn { self.req_mar.de } /// Get the response serializer. #[inline] pub fn resp_ser(&self) -> SerializeFn { self.resp_mar.ser } /// Get the response deserializer. #[inline] pub fn resp_de(&self) -> DeserializeFn { self.resp_mar.de } } /// RPC result returned from the server. #[derive(Debug, Clone)] pub struct RpcStatus { /// gRPC status code. `Ok` indicates success, all other values indicate an error. code: RpcStatusCode, /// error message. message: String, /// Additional details for rich error model. /// /// See also https://grpc.io/docs/guides/error/#richer-error-model. details: Vec, /// Debug error string debug_error_string: String, } impl Display for RpcStatus { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { Debug::fmt(self, fmt) } } impl RpcStatus { /// Create a new [`RpcStatus`]. pub fn new>(code: T) -> RpcStatus { RpcStatus::with_message(code, String::new()) } /// Create a new [`RpcStatus`] with given message. pub fn with_message>(code: T, message: String) -> RpcStatus { RpcStatus::with_details(code, message, vec![]) } /// Create a new [`RpcStats`] with code, message and details. /// /// If using rich error model, `details` should be binary message that sets `code` and /// `message` to the same value. Or you can use `into` method to do automatic /// transformation if using `grpcio_proto::google::rpc::Status`. pub fn with_details>( code: T, message: String, details: Vec, ) -> RpcStatus { RpcStatus::with_details_and_error_string(code, message, details, String::new()) } /// Create a new [`RpcStats`] with code, message, details and debug error string. /// /// If using rich error model, `details` should be binary message that sets `code` and /// `message` to the same value. Or you can use `into` method to do automatic /// transformation if using `grpcio_proto::google::rpc::Status`. pub fn with_details_and_error_string>( code: T, message: String, details: Vec, debug_error_string: String, ) -> RpcStatus { RpcStatus { code: code.into(), message, details, debug_error_string, } } /// Create a new [`RpcStatus`] that status code is Ok. pub fn ok() -> RpcStatus { RpcStatus::new(RpcStatusCode::OK) } /// Return the instance's error code. #[inline] pub fn code(&self) -> RpcStatusCode { self.code } /// Return the instance's error message. #[inline] pub fn message(&self) -> &str { &self.message } /// Return the (binary) error details. /// /// Usually it contains a serialized `google.rpc.Status` proto. pub fn details(&self) -> &[u8] { &self.details } /// Return the debug error string. /// /// This will return a detailed string of the gRPC Core error that led to the failure. /// It shouldn't be relied upon for anything other than gaining more debug data in /// failure cases. pub fn debug_error_string(&self) -> &str { &self.debug_error_string } } pub type MessageReader = GrpcByteBufferReader; /// Context for batch request. pub struct BatchContext { ctx: *mut grpcwrap_batch_context, } impl BatchContext { pub fn new() -> BatchContext { BatchContext { ctx: unsafe { grpc_sys::grpcwrap_batch_context_create() }, } } pub fn as_ptr(&self) -> *mut grpcwrap_batch_context { self.ctx } pub fn take_recv_message(&self) -> Option { let ptr = unsafe { grpc_sys::grpcwrap_batch_context_take_recv_message(self.ctx) }; if ptr.is_null() { None } else { Some(unsafe { GrpcByteBuffer::from_raw(ptr) }) } } /// Get the status of the rpc call. pub fn rpc_status(&self) -> RpcStatus { let status = RpcStatusCode(unsafe { grpc_sys::grpcwrap_batch_context_recv_status_on_client_status(self.ctx) }); if status == RpcStatusCode::OK { RpcStatus::ok() } else { unsafe { let mut msg_len = 0; let details_ptr = grpc_sys::grpcwrap_batch_context_recv_status_on_client_details( self.ctx, &mut msg_len, ); let msg_slice = slice::from_raw_parts(details_ptr as *const _, msg_len); let message = String::from_utf8_lossy(msg_slice).into_owned(); let m_ptr = grpc_sys::grpcwrap_batch_context_recv_status_on_client_trailing_metadata( self.ctx, ); let metadata = &*(m_ptr as *const Metadata); let details = metadata.search_binary_error_details().to_vec(); let error_string_ptr = grpc_sys::grpcwrap_batch_context_recv_status_on_client_error_string(self.ctx); let error_string = if error_string_ptr.is_null() { String::new() } else { CStr::from_ptr(error_string_ptr) .to_string_lossy() .into_owned() }; RpcStatus::with_details_and_error_string(status, message, details, error_string) } } } /// Fetch the response bytes of the rpc call. pub fn recv_message(&mut self) -> Option { let buf = self.take_recv_message()?; Some(GrpcByteBufferReader::new(buf)) } /// Get the initial metadata from response. /// /// If initial metadata is not fetched or the method has been called, empty metadata will be /// returned. pub fn take_initial_metadata(&mut self) -> UnownedMetadata { let mut res = UnownedMetadata::empty(); unsafe { grpcio_sys::grpcwrap_batch_context_take_recv_initial_metadata( self.ctx, res.as_mut_ptr(), ); } res } /// Get the trailing metadata from response. /// /// If trailing metadata is not fetched or the method has been called, empty metadata will be /// returned. pub fn take_trailing_metadata(&mut self) -> UnownedMetadata { let mut res = UnownedMetadata::empty(); unsafe { grpc_sys::grpcwrap_batch_context_take_recv_status_on_client_trailing_metadata( self.ctx, res.as_mut_ptr(), ); } res } } impl Drop for BatchContext { fn drop(&mut self) { unsafe { grpc_sys::grpcwrap_batch_context_destroy(self.ctx) } } } #[inline] fn box_batch_tag(tag: CallTag) -> (*mut grpcwrap_batch_context, *mut CallTag) { let tag_box = Box::new(tag); ( tag_box.batch_ctx().unwrap().as_ptr(), Box::into_raw(tag_box), ) } /// A helper function that runs the batch call and checks the result. fn check_run(bt: BatchType, f: F) -> BatchFuture where F: FnOnce(*mut grpcwrap_batch_context, *mut c_void) -> grpc_call_error, { let (cq_f, tag) = CallTag::batch_pair(bt); let (batch_ptr, tag_ptr) = box_batch_tag(tag); let code = f(batch_ptr, tag_ptr as *mut c_void); if code != grpc_call_error::GRPC_CALL_OK { unsafe { drop(Box::from_raw(tag_ptr)); } panic!("create call fail: {:?}", code); } cq_f } /// A Call represents an RPC. /// /// When created, it is in a configuration state allowing properties to be /// set until it is invoked. After invoke, the Call can have messages /// written to it and read from it. pub struct Call { pub call: *mut grpc_call, pub cq: CompletionQueue, } unsafe impl Send for Call {} impl Call { pub unsafe fn from_raw(call: *mut grpc_sys::grpc_call, cq: CompletionQueue) -> Call { assert!(!call.is_null()); Call { call, cq } } /// Send a message asynchronously. pub fn start_send_message( &mut self, msg: &mut GrpcSlice, write_flags: u32, initial_metadata: Option<&mut Metadata>, call_flags: u32, ) -> Result { let _cq_ref = self.cq.borrow()?; let f = check_run(BatchType::Finish, |ctx, tag| unsafe { grpc_sys::grpcwrap_call_send_message( self.call, ctx, msg.as_mut_ptr(), write_flags, initial_metadata.map_or_else(ptr::null_mut, |m| m as *mut _ as _), call_flags, tag, ) }); Ok(f) } /// Finish the rpc call from client. pub fn start_send_close_client(&mut self) -> Result { let _cq_ref = self.cq.borrow()?; let f = check_run(BatchType::Finish, |_, tag| unsafe { grpc_sys::grpcwrap_call_send_close_from_client(self.call, tag) }); Ok(f) } /// Receive a message asynchronously. pub fn start_recv_message(&mut self) -> Result { let _cq_ref = self.cq.borrow()?; let f = check_run(BatchType::Read, |ctx, tag| unsafe { grpc_sys::grpcwrap_call_recv_message(self.call, ctx, tag) }); Ok(f) } /// Start handling from server side. /// /// Future will finish once close is received by the server. pub fn start_server_side(&mut self) -> Result { let _cq_ref = self.cq.borrow()?; let f = check_run(BatchType::Finish, |ctx, tag| unsafe { grpc_sys::grpcwrap_call_start_serverside(self.call, ctx, tag) }); Ok(f) } /// Send a status from server. pub fn start_send_status_from_server( &mut self, status: &RpcStatus, initial_metadata: &mut Option, call_flags: u32, send_empty_metadata: bool, payload: &mut Option, write_flags: u32, ) -> Result { let _cq_ref = self.cq.borrow()?; if initial_metadata.is_none() && send_empty_metadata { initial_metadata.replace(MetadataBuilder::new().build()); } let f = check_run(BatchType::Finish, |ctx, tag| unsafe { let (msg_ptr, msg_len) = if status.code() == RpcStatusCode::OK { (ptr::null(), 0) } else { (status.message.as_ptr(), status.message.len()) }; let payload_p = match payload { Some(p) => p.as_mut_ptr(), None => ptr::null_mut(), }; let mut trailing_metadata: Option = if status.details.is_empty() { None } else { let mut builder = MetadataBuilder::new(); builder.set_binary_error_details(&status.details); Some(builder.build()) }; grpc_sys::grpcwrap_call_send_status_from_server( self.call, ctx, status.code().into(), msg_ptr as _, msg_len, initial_metadata .as_mut() .map_or_else(ptr::null_mut, |m| m as *mut _ as _), call_flags, trailing_metadata .as_mut() .map_or_else(ptr::null_mut, |m| m as *mut _ as _), payload_p, write_flags, tag, ) }); Ok(f) } /// Abort an rpc call before handler is called. pub fn abort(self, status: &RpcStatus) { match self.cq.borrow() { // Queue is shutdown, ignore. Err(Error::QueueShutdown) => return, Err(e) => panic!("unexpected error when aborting call: {:?}", e), _ => {} } let call_ptr = self.call; let tag = CallTag::abort(self); let (batch_ptr, tag_ptr) = box_batch_tag(tag); let code = unsafe { let (msg_ptr, msg_len) = if status.code() == RpcStatusCode::OK { (ptr::null(), 0) } else { (status.message.as_ptr(), status.message.len()) }; grpc_sys::grpcwrap_call_send_status_from_server( call_ptr, batch_ptr, status.code().into(), msg_ptr as _, msg_len, (&mut MetadataBuilder::new().build()) as *mut _ as _, 0, ptr::null_mut(), ptr::null_mut(), 0, tag_ptr as *mut c_void, ) }; if code != grpc_call_error::GRPC_CALL_OK { unsafe { drop(Box::from_raw(tag_ptr)); } panic!("create call fail: {:?}", code); } } /// Cancel the rpc call by client. fn cancel(&self) { match self.cq.borrow() { // Queue is shutdown, ignore. Err(Error::QueueShutdown) => return, Err(e) => panic!("unexpected error when canceling call: {:?}", e), _ => {} } unsafe { grpc_sys::grpc_call_cancel(self.call, ptr::null_mut()); } } } impl Drop for Call { fn drop(&mut self) { unsafe { grpc_sys::grpc_call_unref(self.call) } } } /// A share object for client streaming and duplex streaming call. /// /// In both cases, receiver and sender can be polled in the same time, /// hence we need to share the call in the both sides and abort the sink /// once the call is canceled or finished early. struct ShareCall { call: Call, close_f: BatchFuture, finished: bool, status: Option, } impl ShareCall { fn new(call: Call, close_f: BatchFuture) -> ShareCall { ShareCall { call, close_f, finished: false, status: None, } } /// Poll if the call is still alive. /// /// If the call is still running, will register a notification for its completion. fn poll_finish(&mut self, cx: &mut Context) -> Poll> { let res = match Pin::new(&mut self.close_f).poll(cx) { Poll::Ready(Ok(reader)) => { self.status = Some(RpcStatus::ok()); Poll::Ready(Ok(reader)) } Poll::Pending => return Poll::Pending, Poll::Ready(Err(Error::RpcFailure(status))) => { self.status = Some(status.clone()); Poll::Ready(Err(Error::RpcFailure(status))) } res => res, }; self.finished = true; res } /// Check if the call is finished. fn check_alive(&mut self) -> Result<()> { if self.finished { // maybe can just take here. return Err(Error::RpcFinished(self.status.clone())); } task::check_alive(&self.close_f) } } /// A helper trait that allows executing function on the internal `ShareCall` struct. trait ShareCallHolder { fn call R>(&mut self, f: F) -> R; } impl ShareCallHolder for ShareCall { fn call R>(&mut self, f: F) -> R { f(self) } } impl ShareCallHolder for Arc> { fn call R>(&mut self, f: F) -> R { let mut call = self.lock(); f(&mut call) } } /// A helper struct for constructing Stream object for batch requests. struct StreamingBase { close_f: Option, msg_f: Option, read_done: bool, } impl StreamingBase { fn new(close_f: Option) -> StreamingBase { StreamingBase { close_f, msg_f: None, read_done: false, } } fn poll( &mut self, cx: &mut Context, call: &mut C, skip_finish_check: bool, ) -> Poll>> { if !skip_finish_check { let mut finished = false; if let Some(close_f) = &mut self.close_f { if Pin::new(close_f).poll(cx)?.is_ready() { // Don't return immediately, there may be pending data. finished = true; } } if finished { self.close_f.take(); } } let mut bytes = None; if !self.read_done { if let Some(msg_f) = &mut self.msg_f { bytes = ready!(Pin::new(msg_f).poll(cx)?).message_reader; if bytes.is_none() { self.read_done = true; } } } if self.read_done { if self.close_f.is_none() { return Poll::Ready(None); } return Poll::Pending; } // so msg_f must be either stale or not initialized yet. self.msg_f.take(); let msg_f = call.call(|c| c.call.start_recv_message())?; self.msg_f = Some(msg_f); if bytes.is_none() { self.poll(cx, call, true) } else { Poll::Ready(bytes.map(Ok)) } } // Cancel the call if we still have some messages or did not // receive status code. fn on_drop(&self, call: &mut C) { if !self.read_done || self.close_f.is_some() { call.call(|c| c.call.cancel()); } } } /// Flags for write operations. #[derive(Default, Clone, Copy)] pub struct WriteFlags { flags: u32, } impl WriteFlags { /// Hint that the write may be buffered and need not go out on the wire immediately. /// /// gRPC is free to buffer the message until the next non-buffered write, or until write stream /// completion, but it need not buffer completely or at all. pub fn buffer_hint(mut self, need_buffered: bool) -> WriteFlags { client::change_flag( &mut self.flags, grpc_sys::GRPC_WRITE_BUFFER_HINT, need_buffered, ); self } /// Force compression to be disabled. pub fn force_no_compress(mut self, no_compress: bool) -> WriteFlags { client::change_flag( &mut self.flags, grpc_sys::GRPC_WRITE_NO_COMPRESS, no_compress, ); self } /// Get whether buffer hint is enabled. pub fn get_buffer_hint(self) -> bool { (self.flags & grpc_sys::GRPC_WRITE_BUFFER_HINT) != 0 } /// Get whether compression is disabled. pub fn get_force_no_compress(self) -> bool { (self.flags & grpc_sys::GRPC_WRITE_NO_COMPRESS) != 0 } } /// A helper struct for constructing Sink object for batch requests. struct SinkBase { // Batch job to be executed in `poll_ready`. batch_f: Option, headers: Metadata, send_metadata: bool, // Flag to indicate if enhance batch strategy. This behavior will modify the `buffer_hint` to batch // messages as much as possible. enhance_buffer_strategy: bool, // Buffer used to store the data to be sent, send out the last data in this round of `start_send`. buffer: GrpcSlice, // Write flags used to control the data to be sent in `buffer`. buf_flags: Option, // Used to records whether a message in which `buffer_hint` is false exists. // Note: only used in enhanced buffer strategy. last_buf_hint: bool, } impl SinkBase { fn new(send_metadata: bool) -> SinkBase { SinkBase { batch_f: None, headers: MetadataBuilder::new().build(), send_metadata, enhance_buffer_strategy: false, buffer: GrpcSlice::default(), buf_flags: None, last_buf_hint: true, } } fn start_send( &mut self, call: &mut C, t: &T, flags: WriteFlags, ser: SerializeFn, call_flags: u32, ) -> Result<()> { // temporary fix: buffer hint with send meta will not send out any metadata. // note: only the first message can enter this code block. if self.send_metadata { ser(t, &mut self.buffer)?; self.buf_flags = Some(flags); self.start_send_buffer_message(false, call, call_flags)?; self.send_metadata = false; return Ok(()); } // If there is already a buffered message waiting to be sent, set `buffer_hint` to true to indicate // that this is not the last message. if self.buf_flags.is_some() { self.start_send_buffer_message(true, call, call_flags)?; } ser(t, &mut self.buffer)?; let hint = flags.get_buffer_hint(); self.last_buf_hint &= hint; self.buf_flags = Some(flags); // If sink disable batch, start sending the message in buffer immediately. if !self.enhance_buffer_strategy { self.start_send_buffer_message(hint, call, call_flags)?; } Ok(()) } #[inline] fn poll_ready(&mut self, cx: &mut Context) -> Poll> { match &mut self.batch_f { None => return Poll::Ready(Ok(())), Some(f) => { ready!(Pin::new(f).poll(cx)?); } } self.batch_f.take(); Poll::Ready(Ok(())) } #[inline] fn poll_flush( &mut self, cx: &mut Context, call: &mut C, call_flags: u32, ) -> Poll> { if self.batch_f.is_some() { ready!(self.poll_ready(cx)?); } if self.buf_flags.is_some() { self.start_send_buffer_message(self.last_buf_hint, call, call_flags)?; ready!(self.poll_ready(cx)?); } self.last_buf_hint = true; Poll::Ready(Ok(())) } #[inline] fn start_send_buffer_message( &mut self, buffer_hint: bool, call: &mut C, call_flags: u32, ) -> Result<()> { // `start_send` is supposed to be called after `poll_ready` returns ready. assert!(self.batch_f.is_none()); let buffer = &mut self.buffer; let mut flags = self.buf_flags.unwrap(); flags = flags.buffer_hint(buffer_hint); let headers = if self.send_metadata { Some(&mut self.headers) } else { None }; let write_f = call.call(|c| { c.call .start_send_message(buffer, flags.flags, headers, call_flags) })?; self.batch_f = Some(write_f); if !self.buffer.is_inline() { self.buffer = GrpcSlice::default(); } self.buf_flags.take(); Ok(()) } }