1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3 use std::ffi::CStr;
4 use std::future::Future;
5 use std::pin::Pin;
6 use std::sync::Arc;
7 use std::task::{Context, Poll};
8 use std::time::Duration;
9 use std::{result, slice};
10
11 use crate::grpc_sys::{
12 self, gpr_clock_type, gpr_timespec, grpc_call_error, grpcwrap_request_call_context,
13 };
14 use futures_util::ready;
15 use futures_util::{Sink, Stream};
16 use parking_lot::Mutex;
17
18 use super::{RpcStatus, ShareCall, ShareCallHolder, WriteFlags};
19 use crate::buf::GrpcSlice;
20 use crate::call::{
21 BatchContext, Call, MessageReader, MethodType, RpcStatusCode, SinkBase, StreamingBase,
22 };
23 use crate::codec::{DeserializeFn, SerializeFn};
24 use crate::cq::CompletionQueue;
25 use crate::error::{Error, Result};
26 use crate::metadata::Metadata;
27 use crate::server::ServerChecker;
28 use crate::server::{BoxHandler, RequestCallContext};
29 use crate::task::{BatchFuture, CallTag, Executor, Kicker};
30 use crate::CheckResult;
31
32 /// A time point that an rpc or operation should finished before it.
33 #[derive(Clone, Copy)]
34 pub struct Deadline {
35 pub(crate) spec: gpr_timespec,
36 }
37
38 impl Deadline {
new(spec: gpr_timespec) -> Deadline39 fn new(spec: gpr_timespec) -> Deadline {
40 let realtime_spec =
41 unsafe { grpc_sys::gpr_convert_clock_type(spec, gpr_clock_type::GPR_CLOCK_REALTIME) };
42
43 Deadline {
44 spec: realtime_spec,
45 }
46 }
47
48 /// Checks if the deadline is exceeded.
exceeded(self) -> bool49 pub fn exceeded(self) -> bool {
50 unsafe {
51 let now = grpc_sys::gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME);
52 grpc_sys::gpr_time_cmp(now, self.spec) >= 0
53 }
54 }
55
spec(self) -> gpr_timespec56 pub(crate) fn spec(self) -> gpr_timespec {
57 self.spec
58 }
59 }
60
61 impl From<Duration> for Deadline {
62 /// Build a deadline from given duration.
63 ///
64 /// The deadline will be `now + duration`.
65 #[inline]
from(dur: Duration) -> Deadline66 fn from(dur: Duration) -> Deadline {
67 Deadline::new(dur.into())
68 }
69 }
70
71 /// Context for accepting a request.
72 pub struct RequestContext {
73 ctx: *mut grpcwrap_request_call_context,
74 request_call: Option<RequestCallContext>,
75 }
76
77 impl RequestContext {
new(rc: RequestCallContext) -> RequestContext78 pub fn new(rc: RequestCallContext) -> RequestContext {
79 let ctx = unsafe { grpc_sys::grpcwrap_request_call_context_create() };
80
81 RequestContext {
82 ctx,
83 request_call: Some(rc),
84 }
85 }
86
87 /// Try to accept a client side streaming request.
88 ///
89 /// Return error if the request is a client side unary request.
handle_stream_req( self, cq: &CompletionQueue, rc: &mut RequestCallContext, ) -> result::Result<(), Self>90 pub fn handle_stream_req(
91 self,
92 cq: &CompletionQueue,
93 rc: &mut RequestCallContext,
94 ) -> result::Result<(), Self> {
95 let checker = rc.get_checker();
96 let handler = unsafe { rc.get_handler(self.method()) };
97 match handler {
98 Some(handler) => match handler.method_type() {
99 MethodType::Unary | MethodType::ServerStreaming => Err(self),
100 _ => {
101 execute(self, cq, None, handler, checker);
102 Ok(())
103 }
104 },
105 None => {
106 execute_unimplemented(self, cq.clone());
107 Ok(())
108 }
109 }
110 }
111
112 /// Accept a client side unary request.
113 ///
114 /// This method should be called after `handle_stream_req`. When handling
115 /// client side unary request, handler will only be called after the unary
116 /// request is received.
handle_unary_req(self, rc: RequestCallContext, _: &CompletionQueue)117 pub fn handle_unary_req(self, rc: RequestCallContext, _: &CompletionQueue) {
118 // fetch message before calling callback.
119 let tag = Box::new(CallTag::unary_request(self, rc));
120 let batch_ctx = tag.batch_ctx().unwrap().as_ptr();
121 let request_ctx = tag.request_ctx().unwrap().as_ptr();
122 let tag_ptr = Box::into_raw(tag);
123 unsafe {
124 let call = grpc_sys::grpcwrap_request_call_context_get_call(request_ctx);
125 let code = grpc_sys::grpcwrap_call_recv_message(call, batch_ctx, tag_ptr as _);
126 if code != grpc_call_error::GRPC_CALL_OK {
127 drop(Box::from_raw(tag_ptr));
128 // it should not failed.
129 panic!("try to receive message fail: {:?}", code);
130 }
131 }
132 }
133
take_request_call_context(&mut self) -> Option<RequestCallContext>134 pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> {
135 self.request_call.take()
136 }
137
as_ptr(&self) -> *mut grpcwrap_request_call_context138 pub fn as_ptr(&self) -> *mut grpcwrap_request_call_context {
139 self.ctx
140 }
141
call(&self, cq: CompletionQueue) -> Call142 fn call(&self, cq: CompletionQueue) -> Call {
143 unsafe {
144 // It is okay to use a mutable pointer on a immutable reference, `self`,
145 // because grpcwrap_request_call_context_ref_call is thread-safe.
146 let call = grpc_sys::grpcwrap_request_call_context_ref_call(self.ctx);
147 assert!(!call.is_null());
148 Call::from_raw(call, cq)
149 }
150 }
151
method(&self) -> &[u8]152 pub fn method(&self) -> &[u8] {
153 let mut len = 0;
154 let method = unsafe { grpc_sys::grpcwrap_request_call_context_method(self.ctx, &mut len) };
155
156 unsafe { slice::from_raw_parts(method as _, len) }
157 }
158
host(&self) -> &[u8]159 fn host(&self) -> &[u8] {
160 let mut len = 0;
161 let host = unsafe { grpc_sys::grpcwrap_request_call_context_host(self.ctx, &mut len) };
162
163 unsafe { slice::from_raw_parts(host as _, len) }
164 }
165
deadline(&self) -> Deadline166 fn deadline(&self) -> Deadline {
167 let t = unsafe { grpc_sys::grpcwrap_request_call_context_deadline(self.ctx) };
168
169 Deadline::new(t)
170 }
171
metadata(&self) -> &Metadata172 fn metadata(&self) -> &Metadata {
173 unsafe {
174 let ptr = grpc_sys::grpcwrap_request_call_context_metadata_array(self.ctx);
175 let arr_ptr: *const Metadata = ptr as _;
176 &*arr_ptr
177 }
178 }
179
peer(&self) -> String180 fn peer(&self) -> String {
181 unsafe {
182 // RequestContext always holds a reference of the call.
183 let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx);
184 let p = grpc_sys::grpc_call_get_peer(call);
185 let peer = CStr::from_ptr(p)
186 .to_str()
187 .expect("valid UTF-8 data")
188 .to_owned();
189 grpc_sys::gpr_free(p as _);
190 peer
191 }
192 }
193
194 /// If the server binds in non-secure mode, this will return None
195 #[cfg(feature = "_secure")]
auth_context(&self) -> Option<crate::AuthContext>196 fn auth_context(&self) -> Option<crate::AuthContext> {
197 unsafe {
198 let call = grpc_sys::grpcwrap_request_call_context_get_call(self.ctx);
199 crate::AuthContext::from_call_ptr(call)
200 }
201 }
202 }
203
204 impl Drop for RequestContext {
drop(&mut self)205 fn drop(&mut self) {
206 unsafe { grpc_sys::grpcwrap_request_call_context_destroy(self.ctx) }
207 }
208 }
209
210 /// A context for handling client side unary request.
211 pub struct UnaryRequestContext {
212 request: RequestContext,
213 request_call: Option<RequestCallContext>,
214 batch: BatchContext,
215 }
216
217 impl UnaryRequestContext {
new(ctx: RequestContext, rc: RequestCallContext) -> UnaryRequestContext218 pub fn new(ctx: RequestContext, rc: RequestCallContext) -> UnaryRequestContext {
219 UnaryRequestContext {
220 request: ctx,
221 request_call: Some(rc),
222 batch: BatchContext::new(),
223 }
224 }
225
batch_ctx(&self) -> &BatchContext226 pub fn batch_ctx(&self) -> &BatchContext {
227 &self.batch
228 }
229
batch_ctx_mut(&mut self) -> &mut BatchContext230 pub fn batch_ctx_mut(&mut self) -> &mut BatchContext {
231 &mut self.batch
232 }
233
request_ctx(&self) -> &RequestContext234 pub fn request_ctx(&self) -> &RequestContext {
235 &self.request
236 }
237
take_request_call_context(&mut self) -> Option<RequestCallContext>238 pub fn take_request_call_context(&mut self) -> Option<RequestCallContext> {
239 self.request_call.take()
240 }
241
handle( self, rc: &mut RequestCallContext, cq: &CompletionQueue, reader: Option<MessageReader>, )242 pub fn handle(
243 self,
244 rc: &mut RequestCallContext,
245 cq: &CompletionQueue,
246 reader: Option<MessageReader>,
247 ) {
248 let checker = rc.get_checker();
249 let handler = unsafe { rc.get_handler(self.request.method()).unwrap() };
250 if reader.is_some() {
251 return execute(self.request, cq, reader, handler, checker);
252 }
253
254 let status = RpcStatus::with_message(RpcStatusCode::INTERNAL, "No payload".to_owned());
255 self.request.call(cq.clone()).abort(&status)
256 }
257 }
258
259 /// A stream for client a streaming call and a duplex streaming call.
260 ///
261 /// The corresponding RPC will be canceled if the stream did not
262 /// finish before dropping.
263 #[must_use = "if unused the RequestStream may immediately cancel the RPC"]
264 pub struct RequestStream<T> {
265 call: Arc<Mutex<ShareCall>>,
266 base: StreamingBase,
267 de: DeserializeFn<T>,
268 }
269
270 impl<T> RequestStream<T> {
new(call: Arc<Mutex<ShareCall>>, de: DeserializeFn<T>) -> RequestStream<T>271 fn new(call: Arc<Mutex<ShareCall>>, de: DeserializeFn<T>) -> RequestStream<T> {
272 RequestStream {
273 call,
274 base: StreamingBase::new(None),
275 de,
276 }
277 }
278 }
279
280 impl<T> Stream for RequestStream<T> {
281 type Item = Result<T>;
282
poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>>283 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>> {
284 {
285 let mut call = self.call.lock();
286 call.check_alive()?;
287 }
288
289 let t = &mut *self;
290 match ready!(t.base.poll(cx, &mut t.call, false)?) {
291 None => Poll::Ready(None),
292 Some(data) => Poll::Ready(Some((t.de)(data))),
293 }
294 }
295 }
296
297 impl<T> Drop for RequestStream<T> {
298 /// The corresponding RPC will be canceled if the stream did not
299 /// finish before dropping.
drop(&mut self)300 fn drop(&mut self) {
301 self.base.on_drop(&mut self.call);
302 }
303 }
304
305 /// A helper macro used to implement server side unary sink.
306 /// Not using generic here because we don't need to expose
307 /// `CallHolder` or `Call` to caller.
308 // TODO: Use type alias to be friendly for documentation.
309 macro_rules! impl_unary_sink {
310 ($(#[$attr:meta])* $t:ident, $rt:ident, $holder:ty) => {
311 pub struct $rt {
312 call: $holder,
313 cq_f: Option<BatchFuture>,
314 err: Option<Error>,
315 }
316
317 impl Future for $rt {
318 type Output = Result<()>;
319
320 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
321 if let Some(e) = self.err.take() {
322 return Poll::Ready(Err(e));
323 }
324
325 if self.cq_f.is_some() {
326 ready!(Pin::new(self.cq_f.as_mut().unwrap()).poll(cx)?);
327 self.cq_f.take();
328 }
329
330 ready!(self.call.call(|c| c.poll_finish(cx))?);
331 Poll::Ready(Ok(()))
332 }
333 }
334
335 $(#[$attr])*
336 pub struct $t<T> {
337 call: Option<$holder>,
338 write_flags: u32,
339 ser: SerializeFn<T>,
340 headers: Option<Metadata>,
341 call_flags: u32,
342 }
343
344 impl<T> $t<T> {
345 fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> {
346 $t {
347 call: Some(call),
348 write_flags: 0,
349 ser,
350 headers: None,
351 call_flags: 0,
352 }
353 }
354
355 #[inline]
356 pub fn set_headers(&mut self, meta: Metadata) {
357 self.headers = Some(meta);
358 }
359
360 #[inline]
361 pub fn set_call_flags(&mut self, flags: u32) {
362 // TODO: implement a server-side call flags interface similar to the client-side .CallOption.
363 self.call_flags = flags;
364 }
365
366 pub fn success(self, t: T) -> $rt {
367 self.complete(RpcStatus::ok(), Some(t))
368 }
369
370 pub fn fail(self, status: RpcStatus) -> $rt {
371 self.complete(status, None)
372 }
373
374 fn complete(mut self, status: RpcStatus, t: Option<T>) -> $rt {
375 let mut data = match t {
376 Some(t) => {
377 let mut buf = GrpcSlice::default();
378 if let Err(e) = (self.ser)(&t, &mut buf) {
379 return $rt {
380 call: self.call.take().unwrap(),
381 cq_f: None,
382 err: Some(e),
383 };
384 }
385 Some(buf)
386 }
387 None => None,
388 };
389
390 let headers = &mut self.headers;
391 let call_flags = self.call_flags;
392 let write_flags = self.write_flags;
393
394 let res = self.call.as_mut().unwrap().call(|c| {
395 c.call
396 .start_send_status_from_server(&status, headers, call_flags, true, &mut data, write_flags)
397 });
398
399 let (cq_f, err) = match res {
400 Ok(f) => (Some(f), None),
401 Err(e) => (None, Some(e)),
402 };
403
404 $rt {
405 call: self.call.take().unwrap(),
406 cq_f,
407 err,
408 }
409 }
410 }
411
412 impl<T> Drop for $t<T> {
413 /// The corresponding RPC will be canceled if the sink did not
414 /// send a response before dropping.
415 fn drop(&mut self) {
416 self.call
417 .as_mut()
418 .map(|call| call.call(|c| c.call.cancel()));
419 }
420 }
421 };
422 }
423
424 impl_unary_sink!(
425 /// A sink for unary call.
426 ///
427 /// To close the sink properly, you should call [`success`] or [`fail`] before dropping.
428 ///
429 /// [`success`]: #method.success
430 /// [`fail`]: #method.fail
431 #[must_use = "if unused the sink may immediately cancel the RPC"]
432 UnarySink,
433 UnarySinkResult,
434 ShareCall
435 );
436 impl_unary_sink!(
437 /// A sink for client streaming call.
438 ///
439 /// To close the sink properly, you should call [`success`] or [`fail`] before dropping.
440 ///
441 /// [`success`]: #method.success
442 /// [`fail`]: #method.fail
443 #[must_use = "if unused the sink may immediately cancel the RPC"]
444 ClientStreamingSink,
445 ClientStreamingSinkResult,
446 Arc<Mutex<ShareCall>>
447 );
448
449 // A macro helper to implement server side streaming sink.
450 macro_rules! impl_stream_sink {
451 ($(#[$attr:meta])* $t:ident, $ft:ident, $holder:ty) => {
452 $(#[$attr])*
453 pub struct $t<T> {
454 call: Option<$holder>,
455 base: SinkBase,
456 flush_f: Option<BatchFuture>,
457 status: RpcStatus,
458 flushed: bool,
459 closed: bool,
460 ser: SerializeFn<T>,
461 }
462
463 impl<T> $t<T> {
464 fn new(call: $holder, ser: SerializeFn<T>) -> $t<T> {
465 $t {
466 call: Some(call),
467 base: SinkBase::new(true),
468 flush_f: None,
469 status: RpcStatus::ok(),
470 flushed: false,
471 closed: false,
472 ser,
473 }
474 }
475
476 pub fn set_headers(&mut self, meta: Metadata) {
477 self.base.headers = meta;
478 }
479
480 /// By default it always sends messages with their configured buffer hint. But when the
481 /// `enhance_batch` is enabled, messages will be batched together as many as possible.
482 /// The rules are listed as below:
483 /// - All messages except the last one will be sent with `buffer_hint` set to true.
484 /// - The last message will also be sent with `buffer_hint` set to true unless any message is
485 /// offered with buffer hint set to false.
486 ///
487 /// No matter `enhance_batch` is true or false, it's recommended to follow the contract of
488 /// Sink and call `poll_flush` to ensure messages are handled by gRPC C Core.
489 pub fn enhance_batch(&mut self, flag: bool) {
490 self.base.enhance_buffer_strategy = flag;
491 }
492
493 pub fn set_status(&mut self, status: RpcStatus) {
494 assert!(self.flush_f.is_none());
495 self.status = status;
496 }
497
498 pub fn fail(mut self, status: RpcStatus) -> $ft {
499 assert!(self.flush_f.is_none());
500 let send_metadata = self.base.send_metadata;
501 let res = self.call.as_mut().unwrap().call(|c| {
502 c.call
503 .start_send_status_from_server(&status, &mut None, 0, send_metadata, &mut None, 0)
504 });
505
506 let (fail_f, err) = match res {
507 Ok(f) => (Some(f), None),
508 Err(e) => (None, Some(e)),
509 };
510
511 $ft {
512 call: self.call.take().unwrap(),
513 fail_f,
514 err,
515 }
516 }
517 }
518
519 impl<T> Drop for $t<T> {
520 /// The corresponding RPC will be canceled if the sink did not call
521 /// [`close`] or [`fail`] before dropping.
522 ///
523 /// [`close`]: #method.close
524 /// [`fail`]: #method.fail
525 fn drop(&mut self) {
526 // We did not close it explicitly and it was not dropped in the `fail`.
527 if !self.closed && self.call.is_some() {
528 let mut call = self.call.take().unwrap();
529 call.call(|c| c.call.cancel());
530 }
531 }
532 }
533
534 impl<T> Sink<(T, WriteFlags)> for $t<T> {
535 type Error = Error;
536
537 #[inline]
538 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
539 if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? {
540 return Poll::Ready(Err(Error::RemoteStopped));
541 }
542 Pin::new(&mut self.base).poll_ready(cx)
543 }
544
545 #[inline]
546 fn start_send(mut self: Pin<&mut Self>, (msg, flags): (T, WriteFlags)) -> Result<()> {
547 let t = &mut *self;
548 t.base.start_send(t.call.as_mut().unwrap(), &msg, flags, t.ser, 0)
549 }
550
551 #[inline]
552 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
553 if let Poll::Ready(_) = self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))? {
554 return Poll::Ready(Err(Error::RemoteStopped));
555 }
556 let t = &mut *self;
557 Pin::new(&mut t.base).poll_flush(cx, t.call.as_mut().unwrap(), 0)
558 }
559
560 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
561 if self.flush_f.is_none() {
562 ready!(Pin::new(&mut self.base).poll_ready(cx)?);
563
564 let send_metadata = self.base.send_metadata;
565 let t = &mut *self;
566 let status = &t.status;
567 let flush_f = t.call.as_mut().unwrap().call(|c| {
568 c.call
569 .start_send_status_from_server(status, &mut None, 0, send_metadata, &mut None, 0)
570 })?;
571 t.flush_f = Some(flush_f);
572 }
573
574 if !self.flushed {
575 ready!(Pin::new(self.flush_f.as_mut().unwrap()).poll(cx)?);
576 self.flushed = true;
577 }
578
579 ready!(self.call.as_mut().unwrap().call(|c| c.poll_finish(cx))?);
580 self.closed = true;
581 Poll::Ready(Ok(()))
582 }
583 }
584
585 #[must_use = "if unused the sink failure may immediately cancel the RPC"]
586 pub struct $ft {
587 call: $holder,
588 fail_f: Option<BatchFuture>,
589 err: Option<Error>,
590 }
591
592 impl Future for $ft {
593 type Output = Result<()>;
594
595 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
596 if let Some(e) = self.err.take() {
597 return Poll::Ready(Err(e));
598 }
599
600 let readiness = self.call.call(|c| {
601 if c.finished {
602 return Poll::Ready(Ok(()));
603 }
604
605 c.poll_finish(cx).map(|r| r.map(|_| ()))
606 })?;
607
608 if let Some(ref mut f) = self.fail_f {
609 ready!(Pin::new(f).poll(cx)?);
610 }
611
612 self.fail_f.take();
613 readiness.map(Ok)
614 }
615 }
616 };
617 }
618
619 impl_stream_sink!(
620 /// A sink for server streaming call.
621 ///
622 /// To close the sink properly, you should call [`close`] or [`fail`] before dropping.
623 ///
624 /// [`close`]: #method.close
625 /// [`fail`]: #method.fail
626 #[must_use = "if unused the sink may immediately cancel the RPC"]
627 ServerStreamingSink,
628 ServerStreamingSinkFailure,
629 ShareCall
630 );
631 impl_stream_sink!(
632 /// A sink for duplex streaming call.
633 ///
634 /// To close the sink properly, you should call [`close`] or [`fail`] before dropping.
635 ///
636 /// [`close`]: #method.close
637 /// [`fail`]: #method.fail
638 #[must_use = "if unused the sink may immediately cancel the RPC"]
639 DuplexSink,
640 DuplexSinkFailure,
641 Arc<Mutex<ShareCall>>
642 );
643
644 /// A context for rpc handling.
645 pub struct RpcContext<'a> {
646 ctx: RequestContext,
647 executor: Executor<'a>,
648 deadline: Deadline,
649 }
650
651 impl<'a> RpcContext<'a> {
new(ctx: RequestContext, cq: &CompletionQueue) -> RpcContext<'_>652 fn new(ctx: RequestContext, cq: &CompletionQueue) -> RpcContext<'_> {
653 RpcContext {
654 deadline: ctx.deadline(),
655 ctx,
656 executor: Executor::new(cq),
657 }
658 }
659
kicker(&self) -> Kicker660 fn kicker(&self) -> Kicker {
661 let call = self.call();
662 Kicker::from_call(call)
663 }
664
call(&self) -> Call665 pub(crate) fn call(&self) -> Call {
666 self.ctx.call(self.executor.cq().clone())
667 }
668
method(&self) -> &[u8]669 pub fn method(&self) -> &[u8] {
670 self.ctx.method()
671 }
672
host(&self) -> &[u8]673 pub fn host(&self) -> &[u8] {
674 self.ctx.host()
675 }
676
deadline(&self) -> Deadline677 pub fn deadline(&self) -> Deadline {
678 self.deadline
679 }
680
681 /// Get the initial metadata sent by client.
request_headers(&self) -> &Metadata682 pub fn request_headers(&self) -> &Metadata {
683 self.ctx.metadata()
684 }
685
peer(&self) -> String686 pub fn peer(&self) -> String {
687 self.ctx.peer()
688 }
689
690 /// Wrapper around the gRPC Core AuthContext
691 ///
692 /// If the server binds in non-secure mode, this will return None
693 #[cfg(feature = "_secure")]
auth_context(&self) -> Option<crate::AuthContext>694 pub fn auth_context(&self) -> Option<crate::AuthContext> {
695 self.ctx.auth_context()
696 }
697
698 /// Spawn the future into current gRPC poll thread.
699 ///
700 /// This can reduce a lot of context switching, but please make
701 /// sure there is no heavy work in the future.
spawn<F>(&self, f: F) where F: Future<Output = ()> + Send + 'static,702 pub fn spawn<F>(&self, f: F)
703 where
704 F: Future<Output = ()> + Send + 'static,
705 {
706 self.executor.spawn(f, self.kicker())
707 }
708 }
709
710 // Following four helper functions are used to create a callback closure.
711
712 macro_rules! accept_call {
713 ($call:expr) => {
714 match $call.start_server_side() {
715 Err(Error::QueueShutdown) => return,
716 Err(e) => panic!("unexpected error when trying to accept request: {:?}", e),
717 Ok(f) => f,
718 }
719 };
720 }
721
722 // Helper function to call a unary handler.
execute_unary<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, payload: MessageReader, f: &mut F, ) where F: FnMut(RpcContext<'_>, P, UnarySink<Q>),723 pub fn execute_unary<P, Q, F>(
724 ctx: RpcContext<'_>,
725 ser: SerializeFn<Q>,
726 de: DeserializeFn<P>,
727 payload: MessageReader,
728 f: &mut F,
729 ) where
730 F: FnMut(RpcContext<'_>, P, UnarySink<Q>),
731 {
732 let mut call = ctx.call();
733 let close_f = accept_call!(call);
734 let request = match de(payload) {
735 Ok(f) => f,
736 Err(e) => {
737 let status = RpcStatus::with_message(
738 RpcStatusCode::INTERNAL,
739 format!("Failed to deserialize response message: {e:?}"),
740 );
741 call.abort(&status);
742 return;
743 }
744 };
745 let sink = UnarySink::new(ShareCall::new(call, close_f), ser);
746 f(ctx, request, sink)
747 }
748
749 // Helper function to call client streaming handler.
execute_client_streaming<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, f: &mut F, ) where F: FnMut(RpcContext<'_>, RequestStream<P>, ClientStreamingSink<Q>),750 pub fn execute_client_streaming<P, Q, F>(
751 ctx: RpcContext<'_>,
752 ser: SerializeFn<Q>,
753 de: DeserializeFn<P>,
754 f: &mut F,
755 ) where
756 F: FnMut(RpcContext<'_>, RequestStream<P>, ClientStreamingSink<Q>),
757 {
758 let mut call = ctx.call();
759 let close_f = accept_call!(call);
760 let call = Arc::new(Mutex::new(ShareCall::new(call, close_f)));
761
762 let req_s = RequestStream::new(call.clone(), de);
763 let sink = ClientStreamingSink::new(call, ser);
764 f(ctx, req_s, sink)
765 }
766
767 // Helper function to call server streaming handler.
execute_server_streaming<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, payload: MessageReader, f: &mut F, ) where F: FnMut(RpcContext<'_>, P, ServerStreamingSink<Q>),768 pub fn execute_server_streaming<P, Q, F>(
769 ctx: RpcContext<'_>,
770 ser: SerializeFn<Q>,
771 de: DeserializeFn<P>,
772 payload: MessageReader,
773 f: &mut F,
774 ) where
775 F: FnMut(RpcContext<'_>, P, ServerStreamingSink<Q>),
776 {
777 let mut call = ctx.call();
778 let close_f = accept_call!(call);
779
780 let request = match de(payload) {
781 Ok(t) => t,
782 Err(e) => {
783 let status = RpcStatus::with_message(
784 RpcStatusCode::INTERNAL,
785 format!("Failed to deserialize response message: {e:?}"),
786 );
787 call.abort(&status);
788 return;
789 }
790 };
791
792 let sink = ServerStreamingSink::new(ShareCall::new(call, close_f), ser);
793 f(ctx, request, sink)
794 }
795
796 // Helper function to call duplex streaming handler.
execute_duplex_streaming<P, Q, F>( ctx: RpcContext<'_>, ser: SerializeFn<Q>, de: DeserializeFn<P>, f: &mut F, ) where F: FnMut(RpcContext<'_>, RequestStream<P>, DuplexSink<Q>),797 pub fn execute_duplex_streaming<P, Q, F>(
798 ctx: RpcContext<'_>,
799 ser: SerializeFn<Q>,
800 de: DeserializeFn<P>,
801 f: &mut F,
802 ) where
803 F: FnMut(RpcContext<'_>, RequestStream<P>, DuplexSink<Q>),
804 {
805 let mut call = ctx.call();
806 let close_f = accept_call!(call);
807 let call = Arc::new(Mutex::new(ShareCall::new(call, close_f)));
808
809 let req_s = RequestStream::new(call.clone(), de);
810 let sink = DuplexSink::new(call, ser);
811 f(ctx, req_s, sink)
812 }
813
814 // A helper function used to handle all undefined rpc calls.
execute_unimplemented(ctx: RequestContext, cq: CompletionQueue)815 pub fn execute_unimplemented(ctx: RequestContext, cq: CompletionQueue) {
816 // Suppress needless-pass-by-value.
817 let ctx = ctx;
818 let mut call = ctx.call(cq);
819 accept_call!(call);
820 call.abort(&RpcStatus::new(RpcStatusCode::UNIMPLEMENTED))
821 }
822
823 // Helper function to call handler.
824 //
825 // Invoked after a request is ready to be handled.
execute( ctx: RequestContext, cq: &CompletionQueue, payload: Option<MessageReader>, f: &mut BoxHandler, mut checkers: Vec<Box<dyn ServerChecker>>, )826 fn execute(
827 ctx: RequestContext,
828 cq: &CompletionQueue,
829 payload: Option<MessageReader>,
830 f: &mut BoxHandler,
831 mut checkers: Vec<Box<dyn ServerChecker>>,
832 ) {
833 let rpc_ctx = RpcContext::new(ctx, cq);
834
835 for handler in checkers.iter_mut() {
836 match handler.check(&rpc_ctx) {
837 CheckResult::Continue => {}
838 CheckResult::Abort(status) => {
839 rpc_ctx.call().abort(&status);
840 return;
841 }
842 }
843 }
844
845 f.handle(rpc_ctx, payload)
846 }
847