1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 use std::future::Future;
4 use std::pin::Pin;
5 use std::ptr;
6 use std::sync::Arc;
7 use std::task::{Context, Poll};
8 use std::time::Duration;
9 
10 use crate::grpc_sys;
11 use futures_executor::block_on;
12 use futures_util::future::poll_fn;
13 use futures_util::{ready, Sink, Stream};
14 use parking_lot::Mutex;
15 
16 use super::{ShareCall, ShareCallHolder, SinkBase, WriteFlags};
17 use crate::buf::GrpcSlice;
18 use crate::call::{check_run, Call, MessageReader, Method};
19 use crate::channel::Channel;
20 use crate::codec::{DeserializeFn, SerializeFn};
21 use crate::error::{Error, Result};
22 use crate::metadata::{Metadata, UnownedMetadata};
23 use crate::task::{BatchFuture, BatchType};
24 
25 /// Update the flag bit in res.
26 #[inline]
change_flag(res: &mut u32, flag: u32, set: bool)27 pub fn change_flag(res: &mut u32, flag: u32, set: bool) {
28     if set {
29         *res |= flag;
30     } else {
31         *res &= !flag;
32     }
33 }
34 
35 /// Options for calls made by client.
36 #[derive(Clone, Default)]
37 pub struct CallOption {
38     timeout: Option<Duration>,
39     write_flags: WriteFlags,
40     call_flags: u32,
41     headers: Option<Metadata>,
42 }
43 
44 impl CallOption {
45     /// Signal that the call should not return UNAVAILABLE before it has started.
wait_for_ready(mut self, wait_for_ready: bool) -> CallOption46     pub fn wait_for_ready(mut self, wait_for_ready: bool) -> CallOption {
47         change_flag(
48             &mut self.call_flags,
49             grpc_sys::GRPC_INITIAL_METADATA_WAIT_FOR_READY,
50             wait_for_ready,
51         );
52         self
53     }
54 
55     /// Set write flags.
write_flags(mut self, write_flags: WriteFlags) -> CallOption56     pub fn write_flags(mut self, write_flags: WriteFlags) -> CallOption {
57         self.write_flags = write_flags;
58         self
59     }
60 
61     /// Set a timeout.
timeout(mut self, timeout: Duration) -> CallOption62     pub fn timeout(mut self, timeout: Duration) -> CallOption {
63         self.timeout = Some(timeout);
64         self
65     }
66 
67     /// Get the timeout.
get_timeout(&self) -> Option<Duration>68     pub fn get_timeout(&self) -> Option<Duration> {
69         self.timeout
70     }
71 
72     /// Set the headers to be sent with the call.
headers(mut self, meta: Metadata) -> CallOption73     pub fn headers(mut self, meta: Metadata) -> CallOption {
74         self.headers = Some(meta);
75         self
76     }
77 
78     /// Get headers to be sent with the call.
get_headers(&self) -> Option<&Metadata>79     pub fn get_headers(&self) -> Option<&Metadata> {
80         self.headers.as_ref()
81     }
82 }
83 
84 impl Call {
unary_async<Req, Resp>( channel: &Channel, method: &Method<Req, Resp>, req: &Req, mut opt: CallOption, ) -> Result<ClientUnaryReceiver<Resp>>85     pub fn unary_async<Req, Resp>(
86         channel: &Channel,
87         method: &Method<Req, Resp>,
88         req: &Req,
89         mut opt: CallOption,
90     ) -> Result<ClientUnaryReceiver<Resp>> {
91         let call = channel.create_call(method, &opt)?;
92         let mut payload = GrpcSlice::default();
93         (method.req_ser())(req, &mut payload)?;
94         let cq_f = check_run(BatchType::CheckRead, |ctx, tag| unsafe {
95             grpc_sys::grpcwrap_call_start_unary(
96                 call.call,
97                 ctx,
98                 payload.as_mut_ptr(),
99                 opt.write_flags.flags,
100                 opt.headers
101                     .as_mut()
102                     .map_or_else(ptr::null_mut, |c| c as *mut _ as _),
103                 opt.call_flags,
104                 tag,
105             )
106         });
107         Ok(ClientUnaryReceiver::new(call, cq_f, method.resp_de()))
108     }
109 
client_streaming<Req, Resp>( channel: &Channel, method: &Method<Req, Resp>, mut opt: CallOption, ) -> Result<(ClientCStreamSender<Req>, ClientCStreamReceiver<Resp>)>110     pub fn client_streaming<Req, Resp>(
111         channel: &Channel,
112         method: &Method<Req, Resp>,
113         mut opt: CallOption,
114     ) -> Result<(ClientCStreamSender<Req>, ClientCStreamReceiver<Resp>)> {
115         let call = channel.create_call(method, &opt)?;
116         let cq_f = check_run(BatchType::CheckRead, |ctx, tag| unsafe {
117             grpc_sys::grpcwrap_call_start_client_streaming(
118                 call.call,
119                 ctx,
120                 opt.headers
121                     .as_mut()
122                     .map_or_else(ptr::null_mut, |c| c as *mut _ as _),
123                 opt.call_flags,
124                 tag,
125             )
126         });
127 
128         let share_call = Arc::new(Mutex::new(ShareCall::new(call, cq_f)));
129         let sink = ClientCStreamSender::new(share_call.clone(), method.req_ser(), opt.call_flags);
130         let recv = ClientCStreamReceiver::new(share_call, method.resp_de());
131         Ok((sink, recv))
132     }
133 
server_streaming<Req, Resp>( channel: &Channel, method: &Method<Req, Resp>, req: &Req, mut opt: CallOption, ) -> Result<ClientSStreamReceiver<Resp>>134     pub fn server_streaming<Req, Resp>(
135         channel: &Channel,
136         method: &Method<Req, Resp>,
137         req: &Req,
138         mut opt: CallOption,
139     ) -> Result<ClientSStreamReceiver<Resp>> {
140         let call = channel.create_call(method, &opt)?;
141         let mut payload = GrpcSlice::default();
142         (method.req_ser())(req, &mut payload)?;
143         let cq_f = check_run(BatchType::Finish, |ctx, tag| unsafe {
144             grpc_sys::grpcwrap_call_start_server_streaming(
145                 call.call,
146                 ctx,
147                 payload.as_mut_ptr(),
148                 opt.write_flags.flags,
149                 opt.headers
150                     .as_mut()
151                     .map_or_else(ptr::null_mut, |c| c as *mut _ as _),
152                 opt.call_flags,
153                 tag,
154             )
155         });
156 
157         let headers_f = check_run(BatchType::Finish, |ctx, tag| unsafe {
158             grpc_sys::grpcwrap_call_recv_initial_metadata(call.call, ctx, tag)
159         });
160 
161         Ok(ClientSStreamReceiver::new(
162             call,
163             cq_f,
164             method.resp_de(),
165             headers_f,
166         ))
167     }
168 
duplex_streaming<Req, Resp>( channel: &Channel, method: &Method<Req, Resp>, mut opt: CallOption, ) -> Result<(ClientDuplexSender<Req>, ClientDuplexReceiver<Resp>)>169     pub fn duplex_streaming<Req, Resp>(
170         channel: &Channel,
171         method: &Method<Req, Resp>,
172         mut opt: CallOption,
173     ) -> Result<(ClientDuplexSender<Req>, ClientDuplexReceiver<Resp>)> {
174         let call = channel.create_call(method, &opt)?;
175         let cq_f = check_run(BatchType::Finish, |ctx, tag| unsafe {
176             grpc_sys::grpcwrap_call_start_duplex_streaming(
177                 call.call,
178                 ctx,
179                 opt.headers
180                     .as_mut()
181                     .map_or_else(ptr::null_mut, |c| c as *mut _ as _),
182                 opt.call_flags,
183                 tag,
184             )
185         });
186 
187         let headers_f = check_run(BatchType::Finish, |ctx, tag| unsafe {
188             grpc_sys::grpcwrap_call_recv_initial_metadata(call.call, ctx, tag)
189         });
190 
191         let share_call = Arc::new(Mutex::new(ShareCall::new(call, cq_f)));
192         let sink = ClientDuplexSender::new(share_call.clone(), method.req_ser(), opt.call_flags);
193         let recv = ClientDuplexReceiver::new(share_call, method.resp_de(), headers_f);
194         Ok((sink, recv))
195     }
196 }
197 
198 /// A receiver for unary request.
199 ///
200 /// The future is resolved once response is received.
201 #[must_use = "if unused the ClientUnaryReceiver may immediately cancel the RPC"]
202 pub struct ClientUnaryReceiver<T> {
203     call: Call,
204     resp_f: BatchFuture,
205     resp_de: DeserializeFn<T>,
206     finished: bool,
207     message: Option<T>,
208     initial_metadata: UnownedMetadata,
209     trailing_metadata: UnownedMetadata,
210 }
211 
212 impl<T> ClientUnaryReceiver<T> {
new(call: Call, resp_f: BatchFuture, resp_de: DeserializeFn<T>) -> ClientUnaryReceiver<T>213     fn new(call: Call, resp_f: BatchFuture, resp_de: DeserializeFn<T>) -> ClientUnaryReceiver<T> {
214         ClientUnaryReceiver {
215             call,
216             resp_f,
217             resp_de,
218             finished: false,
219             message: None,
220             initial_metadata: UnownedMetadata::empty(),
221             trailing_metadata: UnownedMetadata::empty(),
222         }
223     }
224 
225     /// Cancel the call.
226     #[inline]
cancel(&mut self)227     pub fn cancel(&mut self) {
228         self.call.cancel()
229     }
230 
231     #[inline]
resp_de(&self, reader: MessageReader) -> Result<T>232     pub fn resp_de(&self, reader: MessageReader) -> Result<T> {
233         (self.resp_de)(reader)
234     }
235 
wait_for_batch_future(&mut self) -> Result<()>236     async fn wait_for_batch_future(&mut self) -> Result<()> {
237         if self.finished {
238             return Ok(());
239         }
240 
241         let data = Pin::new(&mut self.resp_f).await?;
242         self.initial_metadata = data.initial_metadata;
243         self.trailing_metadata = data.trailing_metadata;
244         self.message = Some(self.resp_de(data.message_reader.unwrap())?);
245         self.finished = true;
246         Ok(())
247     }
248 
message(&mut self) -> Result<T>249     pub async fn message(&mut self) -> Result<T> {
250         self.wait_for_batch_future().await?;
251         Ok(self.message.take().unwrap())
252     }
253 
254     /// Get the initial metadata.
headers(&mut self) -> Result<&Metadata>255     pub async fn headers(&mut self) -> Result<&Metadata> {
256         self.wait_for_batch_future().await?;
257         // Because we have a reference to call, so it's safe to read.
258         Ok(unsafe { self.initial_metadata.assume_valid() })
259     }
260 
trailers(&mut self) -> Result<&Metadata>261     pub async fn trailers(&mut self) -> Result<&Metadata> {
262         self.wait_for_batch_future().await?;
263         // Because we have a reference to call, so it's safe to read.
264         Ok(unsafe { self.trailing_metadata.assume_valid() })
265     }
266 
receive_sync(&mut self) -> Result<(Metadata, T, Metadata)>267     pub fn receive_sync(&mut self) -> Result<(Metadata, T, Metadata)> {
268         block_on(async {
269             let headers = self.headers().await?.clone();
270             let message = self.message().await?;
271             let trailer = self.trailers().await?.clone();
272             Ok::<(Metadata, T, Metadata), Error>((headers, message, trailer))
273         })
274     }
275 }
276 
277 impl<T: Unpin> Future for ClientUnaryReceiver<T> {
278     type Output = Result<T>;
279 
280     /// Note this method is conflict with method `message`.
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<T>>281     fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<T>> {
282         if self.finished {
283             if let Some(message) = self.message.take() {
284                 return Poll::Ready(Ok(message));
285             }
286             panic!("future should not be polled twice.");
287         }
288 
289         let data = ready!(Pin::new(&mut self.resp_f).poll(cx)?);
290         self.initial_metadata = data.initial_metadata;
291         self.trailing_metadata = data.trailing_metadata;
292         self.finished = true;
293         Poll::Ready(self.resp_de(data.message_reader.unwrap()))
294     }
295 }
296 
297 /// A receiver for client streaming call.
298 ///
299 /// If the corresponding sink has dropped or cancelled, this will poll a
300 /// [`RpcFailure`] error with the [`Cancelled`] status.
301 ///
302 /// [`RpcFailure`]: ./enum.Error.html#variant.RpcFailure
303 /// [`Cancelled`]: ./enum.RpcStatusCode.html#variant.Cancelled
304 #[must_use = "if unused the ClientCStreamReceiver may immediately cancel the RPC"]
305 pub struct ClientCStreamReceiver<T> {
306     call: Arc<Mutex<ShareCall>>,
307     resp_de: DeserializeFn<T>,
308     finished: bool,
309     message: Option<T>,
310     initial_metadata: UnownedMetadata,
311     trailing_metadata: UnownedMetadata,
312 }
313 
314 impl<T> ClientCStreamReceiver<T> {
315     /// Private constructor to simplify code in `impl Call`
new(call: Arc<Mutex<ShareCall>>, resp_de: DeserializeFn<T>) -> ClientCStreamReceiver<T>316     fn new(call: Arc<Mutex<ShareCall>>, resp_de: DeserializeFn<T>) -> ClientCStreamReceiver<T> {
317         ClientCStreamReceiver {
318             call,
319             resp_de,
320             finished: false,
321             message: None,
322             initial_metadata: UnownedMetadata::empty(),
323             trailing_metadata: UnownedMetadata::empty(),
324         }
325     }
326 
327     /// Cancel the call.
cancel(&mut self)328     pub fn cancel(&mut self) {
329         let lock = self.call.lock();
330         lock.call.cancel()
331     }
332 
333     #[inline]
resp_de(&self, reader: MessageReader) -> Result<T>334     pub fn resp_de(&self, reader: MessageReader) -> Result<T> {
335         (self.resp_de)(reader)
336     }
337 
wait_for_batch_future(&mut self) -> Result<()>338     async fn wait_for_batch_future(&mut self) -> Result<()> {
339         if self.finished {
340             return Ok(());
341         }
342         let data = poll_fn(|cx| {
343             let mut call = self.call.lock();
344             call.poll_finish(cx)
345         })
346         .await?;
347 
348         self.message = Some(self.resp_de(data.message_reader.unwrap())?);
349         self.initial_metadata = data.initial_metadata;
350         self.trailing_metadata = data.trailing_metadata;
351         self.finished = true;
352         Ok(())
353     }
354 
message(&mut self) -> Result<T>355     pub async fn message(&mut self) -> Result<T> {
356         self.wait_for_batch_future().await?;
357         Ok(self.message.take().unwrap())
358     }
359 
360     /// Get the initial metadata.
headers(&mut self) -> Result<&Metadata>361     pub async fn headers(&mut self) -> Result<&Metadata> {
362         self.wait_for_batch_future().await?;
363         // We still have a reference in share call.
364         Ok(unsafe { self.initial_metadata.assume_valid() })
365     }
366 
trailers(&mut self) -> Result<&Metadata>367     pub async fn trailers(&mut self) -> Result<&Metadata> {
368         self.wait_for_batch_future().await?;
369         // We still have a reference in share call.
370         Ok(unsafe { self.trailing_metadata.assume_valid() })
371     }
372 }
373 
374 impl<T> Drop for ClientCStreamReceiver<T> {
375     /// The corresponding RPC will be canceled if the receiver did not
376     /// finish before dropping.
drop(&mut self)377     fn drop(&mut self) {
378         if !self.finished {
379             self.cancel();
380         }
381     }
382 }
383 
384 impl<T: Unpin> Future for ClientCStreamReceiver<T> {
385     type Output = Result<T>;
386 
387     /// Note this method is conflict with method `message`.
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<T>>388     fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<T>> {
389         if self.finished {
390             if let Some(message) = self.message.take() {
391                 return Poll::Ready(Ok(message));
392             }
393             panic!("future should not be polled twice.");
394         }
395 
396         let data = {
397             let mut call = self.call.lock();
398             ready!(call.poll_finish(cx)?)
399         };
400         self.initial_metadata = data.initial_metadata;
401         self.trailing_metadata = data.trailing_metadata;
402         self.finished = true;
403         Poll::Ready((self.resp_de)(data.message_reader.unwrap()))
404     }
405 }
406 
407 /// A sink for client streaming call and duplex streaming call.
408 /// To close the sink properly, you should call [`close`] before dropping.
409 ///
410 /// [`close`]: #method.close
411 #[must_use = "if unused the StreamingCallSink may immediately cancel the RPC"]
412 pub struct StreamingCallSink<Req> {
413     call: Arc<Mutex<ShareCall>>,
414     sink_base: SinkBase,
415     close_f: Option<BatchFuture>,
416     req_ser: SerializeFn<Req>,
417     call_flags: u32,
418 }
419 
420 impl<Req> StreamingCallSink<Req> {
new( call: Arc<Mutex<ShareCall>>, req_ser: SerializeFn<Req>, call_flags: u32, ) -> StreamingCallSink<Req>421     fn new(
422         call: Arc<Mutex<ShareCall>>,
423         req_ser: SerializeFn<Req>,
424         call_flags: u32,
425     ) -> StreamingCallSink<Req> {
426         StreamingCallSink {
427             call,
428             sink_base: SinkBase::new(false),
429             close_f: None,
430             req_ser,
431             call_flags,
432         }
433     }
434 
435     /// By default it always sends messages with their configured buffer hint. But when the
436     /// `enhance_batch` is enabled, messages will be batched together as many as possible.
437     /// The rules are listed as below:
438     /// - All messages except the last one will be sent with `buffer_hint` set to true.
439     /// - The last message will also be sent with `buffer_hint` set to true unless any message is
440     ///    offered with buffer hint set to false.
441     ///
442     /// No matter `enhance_batch` is true or false, it's recommended to follow the contract of
443     /// Sink and call `poll_flush` to ensure messages are handled by gRPC C Core.
enhance_batch(&mut self, flag: bool)444     pub fn enhance_batch(&mut self, flag: bool) {
445         self.sink_base.enhance_buffer_strategy = flag;
446     }
447 
cancel(&mut self)448     pub fn cancel(&mut self) {
449         let call = self.call.lock();
450         call.call.cancel()
451     }
452 }
453 
454 impl<P> Drop for StreamingCallSink<P> {
455     /// The corresponding RPC will be canceled if the sink did not call
456     /// [`close`] before dropping.
457     ///
458     /// [`close`]: #method.close
drop(&mut self)459     fn drop(&mut self) {
460         if self.close_f.is_none() {
461             self.cancel();
462         }
463     }
464 }
465 
466 impl<Req> Sink<(Req, WriteFlags)> for StreamingCallSink<Req> {
467     type Error = Error;
468 
469     #[inline]
poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>>470     fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
471         Pin::new(&mut self.sink_base).poll_ready(cx)
472     }
473 
474     #[inline]
start_send(mut self: Pin<&mut Self>, (msg, flags): (Req, WriteFlags)) -> Result<()>475     fn start_send(mut self: Pin<&mut Self>, (msg, flags): (Req, WriteFlags)) -> Result<()> {
476         {
477             let mut call = self.call.lock();
478             call.check_alive()?;
479         }
480         let t = &mut *self;
481         Pin::new(&mut t.sink_base).start_send(&mut t.call, &msg, flags, t.req_ser, t.call_flags)
482     }
483 
484     #[inline]
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>>485     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
486         {
487             let mut call = self.call.lock();
488             call.check_alive()?;
489         }
490         let t = &mut *self;
491         Pin::new(&mut t.sink_base).poll_flush(cx, &mut t.call, t.call_flags)
492     }
493 
poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>>494     fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
495         let t = &mut *self;
496         let mut call = t.call.lock();
497         if t.close_f.is_none() {
498             ready!(Pin::new(&mut t.sink_base).poll_ready(cx)?);
499 
500             let close_f = call.call.start_send_close_client()?;
501             t.close_f = Some(close_f);
502         }
503 
504         if Pin::new(t.close_f.as_mut().unwrap()).poll(cx)?.is_pending() {
505             // if call is finished, can return early here.
506             call.check_alive()?;
507             return Poll::Pending;
508         }
509         Poll::Ready(Ok(()))
510     }
511 }
512 
513 /// A sink for client streaming call.
514 ///
515 /// To close the sink properly, you should call [`close`] before dropping.
516 ///
517 /// [`close`]: #method.close
518 pub type ClientCStreamSender<T> = StreamingCallSink<T>;
519 /// A sink for duplex streaming call.
520 ///
521 /// To close the sink properly, you should call [`close`] before dropping.
522 ///
523 /// [`close`]: #method.close
524 pub type ClientDuplexSender<T> = StreamingCallSink<T>;
525 
526 enum FutureOrValue<F, V> {
527     Future(F),
528     Value(V),
529 }
530 
531 struct ResponseStreamImpl<H, T> {
532     call: H,
533     msg_f: Option<BatchFuture>,
534     read_done: bool,
535     finished: bool,
536     resp_de: DeserializeFn<T>,
537     headers_f: FutureOrValue<BatchFuture, UnownedMetadata>,
538 }
539 
540 impl<H: ShareCallHolder + Unpin, T> ResponseStreamImpl<H, T> {
new(call: H, resp_de: DeserializeFn<T>, headers_f: BatchFuture) -> ResponseStreamImpl<H, T>541     fn new(call: H, resp_de: DeserializeFn<T>, headers_f: BatchFuture) -> ResponseStreamImpl<H, T> {
542         ResponseStreamImpl {
543             call,
544             msg_f: None,
545             read_done: false,
546             finished: false,
547             resp_de,
548             headers_f: FutureOrValue::Future(headers_f),
549         }
550     }
551 
cancel(&mut self)552     fn cancel(&mut self) {
553         self.call.call(|c| c.call.cancel())
554     }
555 
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>>556     fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>> {
557         if !self.finished {
558             let t = &mut *self;
559             let finished = &mut t.finished;
560             let _ = t.call.call(|c| {
561                 let res = c.poll_finish(cx);
562                 *finished = c.finished;
563                 res
564             })?;
565         }
566 
567         let mut bytes = None;
568         loop {
569             if !self.read_done {
570                 if let Some(msg_f) = &mut self.msg_f {
571                     let batch_result = ready!(Pin::new(msg_f).poll(cx)?);
572                     bytes = batch_result.message_reader;
573                     if bytes.is_none() {
574                         self.read_done = true;
575                     }
576                 }
577             }
578 
579             if self.read_done {
580                 if self.finished {
581                     return Poll::Ready(None);
582                 }
583                 return Poll::Pending;
584             }
585 
586             // so msg_f must be either stale or not initialised yet.
587             self.msg_f.take();
588             let msg_f = self.call.call(|c| c.call.start_recv_message())?;
589             self.msg_f = Some(msg_f);
590             if let Some(data) = bytes {
591                 let msg = (self.resp_de)(data)?;
592                 return Poll::Ready(Some(Ok(msg)));
593             }
594         }
595     }
596 
597     // Cancel the call if we still have some messages or did not
598     // receive status code.
on_drop(&mut self)599     fn on_drop(&mut self) {
600         if !self.read_done || !self.finished {
601             self.cancel();
602         }
603     }
604 
headers(&mut self) -> Result<&Metadata>605     async fn headers(&mut self) -> Result<&Metadata> {
606         if let FutureOrValue::Future(f) = &mut self.headers_f {
607             self.headers_f = FutureOrValue::Value(Pin::new(f).await?.initial_metadata);
608         }
609         match &self.headers_f {
610             // We still have reference to call.
611             FutureOrValue::Value(v) => Ok(unsafe { v.assume_valid() }),
612             _ => unreachable!(),
613         }
614     }
615 }
616 
617 /// A receiver for server streaming call.
618 #[must_use = "if unused the ClientSStreamReceiver may immediately cancel the RPC"]
619 pub struct ClientSStreamReceiver<Resp> {
620     imp: ResponseStreamImpl<ShareCall, Resp>,
621 }
622 
623 impl<Resp> ClientSStreamReceiver<Resp> {
new( call: Call, finish_f: BatchFuture, de: DeserializeFn<Resp>, headers_f: BatchFuture, ) -> ClientSStreamReceiver<Resp>624     fn new(
625         call: Call,
626         finish_f: BatchFuture,
627         de: DeserializeFn<Resp>,
628         headers_f: BatchFuture,
629     ) -> ClientSStreamReceiver<Resp> {
630         let share_call = ShareCall::new(call, finish_f);
631         ClientSStreamReceiver {
632             imp: ResponseStreamImpl::new(share_call, de, headers_f),
633         }
634     }
635 
cancel(&mut self)636     pub fn cancel(&mut self) {
637         self.imp.cancel()
638     }
639 
640     /// Get the initial metadata.
641     #[inline]
headers(&mut self) -> Result<&Metadata>642     pub async fn headers(&mut self) -> Result<&Metadata> {
643         self.imp.headers().await
644     }
645 }
646 
647 impl<Resp> Stream for ClientSStreamReceiver<Resp> {
648     type Item = Result<Resp>;
649 
650     #[inline]
poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>651     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
652         Pin::new(&mut self.imp).poll(cx)
653     }
654 }
655 
656 /// A response receiver for duplex call.
657 ///
658 /// If the corresponding sink has dropped or cancelled, this will poll a
659 /// [`RpcFailure`] error with the [`Cancelled`] status.
660 ///
661 /// [`RpcFailure`]: ./enum.Error.html#variant.RpcFailure
662 /// [`Cancelled`]: ./enum.RpcStatusCode.html#variant.Cancelled
663 #[must_use = "if unused the ClientDuplexReceiver may immediately cancel the RPC"]
664 pub struct ClientDuplexReceiver<Resp> {
665     imp: ResponseStreamImpl<Arc<Mutex<ShareCall>>, Resp>,
666 }
667 
668 impl<Resp> ClientDuplexReceiver<Resp> {
new( call: Arc<Mutex<ShareCall>>, de: DeserializeFn<Resp>, headers_f: BatchFuture, ) -> ClientDuplexReceiver<Resp>669     fn new(
670         call: Arc<Mutex<ShareCall>>,
671         de: DeserializeFn<Resp>,
672         headers_f: BatchFuture,
673     ) -> ClientDuplexReceiver<Resp> {
674         ClientDuplexReceiver {
675             imp: ResponseStreamImpl::new(call, de, headers_f),
676         }
677     }
678 
cancel(&mut self)679     pub fn cancel(&mut self) {
680         self.imp.cancel()
681     }
682 
683     /// Get the initial metadata.
684     #[inline]
headers(&mut self) -> Result<&Metadata>685     pub async fn headers(&mut self) -> Result<&Metadata> {
686         self.imp.headers().await
687     }
688 }
689 
690 impl<Resp> Drop for ClientDuplexReceiver<Resp> {
691     /// The corresponding RPC will be canceled if the receiver did not
692     /// finish before dropping.
drop(&mut self)693     fn drop(&mut self) {
694         self.imp.on_drop()
695     }
696 }
697 
698 impl<Resp> Stream for ClientDuplexReceiver<Resp> {
699     type Item = Result<Resp>;
700 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>701     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
702         Pin::new(&mut self.imp).poll(cx)
703     }
704 }
705 
706 #[cfg(test)]
707 mod tests {
708     #[test]
test_change_flag()709     fn test_change_flag() {
710         let mut flag = 2 | 4;
711         super::change_flag(&mut flag, 8, true);
712         assert_eq!(flag, 2 | 4 | 8);
713         super::change_flag(&mut flag, 4, false);
714         assert_eq!(flag, 2 | 8);
715     }
716 }
717