1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3 use std::future::Future;
4 use std::pin::Pin;
5 use std::ptr;
6 use std::sync::Arc;
7 use std::task::{Context, Poll};
8 use std::time::Duration;
9
10 use crate::grpc_sys;
11 use futures_executor::block_on;
12 use futures_util::future::poll_fn;
13 use futures_util::{ready, Sink, Stream};
14 use parking_lot::Mutex;
15
16 use super::{ShareCall, ShareCallHolder, SinkBase, WriteFlags};
17 use crate::buf::GrpcSlice;
18 use crate::call::{check_run, Call, MessageReader, Method};
19 use crate::channel::Channel;
20 use crate::codec::{DeserializeFn, SerializeFn};
21 use crate::error::{Error, Result};
22 use crate::metadata::{Metadata, UnownedMetadata};
23 use crate::task::{BatchFuture, BatchType};
24
25 /// Update the flag bit in res.
26 #[inline]
change_flag(res: &mut u32, flag: u32, set: bool)27 pub fn change_flag(res: &mut u32, flag: u32, set: bool) {
28 if set {
29 *res |= flag;
30 } else {
31 *res &= !flag;
32 }
33 }
34
35 /// Options for calls made by client.
36 #[derive(Clone, Default)]
37 pub struct CallOption {
38 timeout: Option<Duration>,
39 write_flags: WriteFlags,
40 call_flags: u32,
41 headers: Option<Metadata>,
42 }
43
44 impl CallOption {
45 /// Signal that the call should not return UNAVAILABLE before it has started.
wait_for_ready(mut self, wait_for_ready: bool) -> CallOption46 pub fn wait_for_ready(mut self, wait_for_ready: bool) -> CallOption {
47 change_flag(
48 &mut self.call_flags,
49 grpc_sys::GRPC_INITIAL_METADATA_WAIT_FOR_READY,
50 wait_for_ready,
51 );
52 self
53 }
54
55 /// Set write flags.
write_flags(mut self, write_flags: WriteFlags) -> CallOption56 pub fn write_flags(mut self, write_flags: WriteFlags) -> CallOption {
57 self.write_flags = write_flags;
58 self
59 }
60
61 /// Set a timeout.
timeout(mut self, timeout: Duration) -> CallOption62 pub fn timeout(mut self, timeout: Duration) -> CallOption {
63 self.timeout = Some(timeout);
64 self
65 }
66
67 /// Get the timeout.
get_timeout(&self) -> Option<Duration>68 pub fn get_timeout(&self) -> Option<Duration> {
69 self.timeout
70 }
71
72 /// Set the headers to be sent with the call.
headers(mut self, meta: Metadata) -> CallOption73 pub fn headers(mut self, meta: Metadata) -> CallOption {
74 self.headers = Some(meta);
75 self
76 }
77
78 /// Get headers to be sent with the call.
get_headers(&self) -> Option<&Metadata>79 pub fn get_headers(&self) -> Option<&Metadata> {
80 self.headers.as_ref()
81 }
82 }
83
84 impl Call {
unary_async<Req, Resp>( channel: &Channel, method: &Method<Req, Resp>, req: &Req, mut opt: CallOption, ) -> Result<ClientUnaryReceiver<Resp>>85 pub fn unary_async<Req, Resp>(
86 channel: &Channel,
87 method: &Method<Req, Resp>,
88 req: &Req,
89 mut opt: CallOption,
90 ) -> Result<ClientUnaryReceiver<Resp>> {
91 let call = channel.create_call(method, &opt)?;
92 let mut payload = GrpcSlice::default();
93 (method.req_ser())(req, &mut payload)?;
94 let cq_f = check_run(BatchType::CheckRead, |ctx, tag| unsafe {
95 grpc_sys::grpcwrap_call_start_unary(
96 call.call,
97 ctx,
98 payload.as_mut_ptr(),
99 opt.write_flags.flags,
100 opt.headers
101 .as_mut()
102 .map_or_else(ptr::null_mut, |c| c as *mut _ as _),
103 opt.call_flags,
104 tag,
105 )
106 });
107 Ok(ClientUnaryReceiver::new(call, cq_f, method.resp_de()))
108 }
109
client_streaming<Req, Resp>( channel: &Channel, method: &Method<Req, Resp>, mut opt: CallOption, ) -> Result<(ClientCStreamSender<Req>, ClientCStreamReceiver<Resp>)>110 pub fn client_streaming<Req, Resp>(
111 channel: &Channel,
112 method: &Method<Req, Resp>,
113 mut opt: CallOption,
114 ) -> Result<(ClientCStreamSender<Req>, ClientCStreamReceiver<Resp>)> {
115 let call = channel.create_call(method, &opt)?;
116 let cq_f = check_run(BatchType::CheckRead, |ctx, tag| unsafe {
117 grpc_sys::grpcwrap_call_start_client_streaming(
118 call.call,
119 ctx,
120 opt.headers
121 .as_mut()
122 .map_or_else(ptr::null_mut, |c| c as *mut _ as _),
123 opt.call_flags,
124 tag,
125 )
126 });
127
128 let share_call = Arc::new(Mutex::new(ShareCall::new(call, cq_f)));
129 let sink = ClientCStreamSender::new(share_call.clone(), method.req_ser(), opt.call_flags);
130 let recv = ClientCStreamReceiver::new(share_call, method.resp_de());
131 Ok((sink, recv))
132 }
133
server_streaming<Req, Resp>( channel: &Channel, method: &Method<Req, Resp>, req: &Req, mut opt: CallOption, ) -> Result<ClientSStreamReceiver<Resp>>134 pub fn server_streaming<Req, Resp>(
135 channel: &Channel,
136 method: &Method<Req, Resp>,
137 req: &Req,
138 mut opt: CallOption,
139 ) -> Result<ClientSStreamReceiver<Resp>> {
140 let call = channel.create_call(method, &opt)?;
141 let mut payload = GrpcSlice::default();
142 (method.req_ser())(req, &mut payload)?;
143 let cq_f = check_run(BatchType::Finish, |ctx, tag| unsafe {
144 grpc_sys::grpcwrap_call_start_server_streaming(
145 call.call,
146 ctx,
147 payload.as_mut_ptr(),
148 opt.write_flags.flags,
149 opt.headers
150 .as_mut()
151 .map_or_else(ptr::null_mut, |c| c as *mut _ as _),
152 opt.call_flags,
153 tag,
154 )
155 });
156
157 let headers_f = check_run(BatchType::Finish, |ctx, tag| unsafe {
158 grpc_sys::grpcwrap_call_recv_initial_metadata(call.call, ctx, tag)
159 });
160
161 Ok(ClientSStreamReceiver::new(
162 call,
163 cq_f,
164 method.resp_de(),
165 headers_f,
166 ))
167 }
168
duplex_streaming<Req, Resp>( channel: &Channel, method: &Method<Req, Resp>, mut opt: CallOption, ) -> Result<(ClientDuplexSender<Req>, ClientDuplexReceiver<Resp>)>169 pub fn duplex_streaming<Req, Resp>(
170 channel: &Channel,
171 method: &Method<Req, Resp>,
172 mut opt: CallOption,
173 ) -> Result<(ClientDuplexSender<Req>, ClientDuplexReceiver<Resp>)> {
174 let call = channel.create_call(method, &opt)?;
175 let cq_f = check_run(BatchType::Finish, |ctx, tag| unsafe {
176 grpc_sys::grpcwrap_call_start_duplex_streaming(
177 call.call,
178 ctx,
179 opt.headers
180 .as_mut()
181 .map_or_else(ptr::null_mut, |c| c as *mut _ as _),
182 opt.call_flags,
183 tag,
184 )
185 });
186
187 let headers_f = check_run(BatchType::Finish, |ctx, tag| unsafe {
188 grpc_sys::grpcwrap_call_recv_initial_metadata(call.call, ctx, tag)
189 });
190
191 let share_call = Arc::new(Mutex::new(ShareCall::new(call, cq_f)));
192 let sink = ClientDuplexSender::new(share_call.clone(), method.req_ser(), opt.call_flags);
193 let recv = ClientDuplexReceiver::new(share_call, method.resp_de(), headers_f);
194 Ok((sink, recv))
195 }
196 }
197
198 /// A receiver for unary request.
199 ///
200 /// The future is resolved once response is received.
201 #[must_use = "if unused the ClientUnaryReceiver may immediately cancel the RPC"]
202 pub struct ClientUnaryReceiver<T> {
203 call: Call,
204 resp_f: BatchFuture,
205 resp_de: DeserializeFn<T>,
206 finished: bool,
207 message: Option<T>,
208 initial_metadata: UnownedMetadata,
209 trailing_metadata: UnownedMetadata,
210 }
211
212 impl<T> ClientUnaryReceiver<T> {
new(call: Call, resp_f: BatchFuture, resp_de: DeserializeFn<T>) -> ClientUnaryReceiver<T>213 fn new(call: Call, resp_f: BatchFuture, resp_de: DeserializeFn<T>) -> ClientUnaryReceiver<T> {
214 ClientUnaryReceiver {
215 call,
216 resp_f,
217 resp_de,
218 finished: false,
219 message: None,
220 initial_metadata: UnownedMetadata::empty(),
221 trailing_metadata: UnownedMetadata::empty(),
222 }
223 }
224
225 /// Cancel the call.
226 #[inline]
cancel(&mut self)227 pub fn cancel(&mut self) {
228 self.call.cancel()
229 }
230
231 #[inline]
resp_de(&self, reader: MessageReader) -> Result<T>232 pub fn resp_de(&self, reader: MessageReader) -> Result<T> {
233 (self.resp_de)(reader)
234 }
235
wait_for_batch_future(&mut self) -> Result<()>236 async fn wait_for_batch_future(&mut self) -> Result<()> {
237 if self.finished {
238 return Ok(());
239 }
240
241 let data = Pin::new(&mut self.resp_f).await?;
242 self.initial_metadata = data.initial_metadata;
243 self.trailing_metadata = data.trailing_metadata;
244 self.message = Some(self.resp_de(data.message_reader.unwrap())?);
245 self.finished = true;
246 Ok(())
247 }
248
message(&mut self) -> Result<T>249 pub async fn message(&mut self) -> Result<T> {
250 self.wait_for_batch_future().await?;
251 Ok(self.message.take().unwrap())
252 }
253
254 /// Get the initial metadata.
headers(&mut self) -> Result<&Metadata>255 pub async fn headers(&mut self) -> Result<&Metadata> {
256 self.wait_for_batch_future().await?;
257 // Because we have a reference to call, so it's safe to read.
258 Ok(unsafe { self.initial_metadata.assume_valid() })
259 }
260
trailers(&mut self) -> Result<&Metadata>261 pub async fn trailers(&mut self) -> Result<&Metadata> {
262 self.wait_for_batch_future().await?;
263 // Because we have a reference to call, so it's safe to read.
264 Ok(unsafe { self.trailing_metadata.assume_valid() })
265 }
266
receive_sync(&mut self) -> Result<(Metadata, T, Metadata)>267 pub fn receive_sync(&mut self) -> Result<(Metadata, T, Metadata)> {
268 block_on(async {
269 let headers = self.headers().await?.clone();
270 let message = self.message().await?;
271 let trailer = self.trailers().await?.clone();
272 Ok::<(Metadata, T, Metadata), Error>((headers, message, trailer))
273 })
274 }
275 }
276
277 impl<T: Unpin> Future for ClientUnaryReceiver<T> {
278 type Output = Result<T>;
279
280 /// Note this method is conflict with method `message`.
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<T>>281 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<T>> {
282 if self.finished {
283 if let Some(message) = self.message.take() {
284 return Poll::Ready(Ok(message));
285 }
286 panic!("future should not be polled twice.");
287 }
288
289 let data = ready!(Pin::new(&mut self.resp_f).poll(cx)?);
290 self.initial_metadata = data.initial_metadata;
291 self.trailing_metadata = data.trailing_metadata;
292 self.finished = true;
293 Poll::Ready(self.resp_de(data.message_reader.unwrap()))
294 }
295 }
296
297 /// A receiver for client streaming call.
298 ///
299 /// If the corresponding sink has dropped or cancelled, this will poll a
300 /// [`RpcFailure`] error with the [`Cancelled`] status.
301 ///
302 /// [`RpcFailure`]: ./enum.Error.html#variant.RpcFailure
303 /// [`Cancelled`]: ./enum.RpcStatusCode.html#variant.Cancelled
304 #[must_use = "if unused the ClientCStreamReceiver may immediately cancel the RPC"]
305 pub struct ClientCStreamReceiver<T> {
306 call: Arc<Mutex<ShareCall>>,
307 resp_de: DeserializeFn<T>,
308 finished: bool,
309 message: Option<T>,
310 initial_metadata: UnownedMetadata,
311 trailing_metadata: UnownedMetadata,
312 }
313
314 impl<T> ClientCStreamReceiver<T> {
315 /// Private constructor to simplify code in `impl Call`
new(call: Arc<Mutex<ShareCall>>, resp_de: DeserializeFn<T>) -> ClientCStreamReceiver<T>316 fn new(call: Arc<Mutex<ShareCall>>, resp_de: DeserializeFn<T>) -> ClientCStreamReceiver<T> {
317 ClientCStreamReceiver {
318 call,
319 resp_de,
320 finished: false,
321 message: None,
322 initial_metadata: UnownedMetadata::empty(),
323 trailing_metadata: UnownedMetadata::empty(),
324 }
325 }
326
327 /// Cancel the call.
cancel(&mut self)328 pub fn cancel(&mut self) {
329 let lock = self.call.lock();
330 lock.call.cancel()
331 }
332
333 #[inline]
resp_de(&self, reader: MessageReader) -> Result<T>334 pub fn resp_de(&self, reader: MessageReader) -> Result<T> {
335 (self.resp_de)(reader)
336 }
337
wait_for_batch_future(&mut self) -> Result<()>338 async fn wait_for_batch_future(&mut self) -> Result<()> {
339 if self.finished {
340 return Ok(());
341 }
342 let data = poll_fn(|cx| {
343 let mut call = self.call.lock();
344 call.poll_finish(cx)
345 })
346 .await?;
347
348 self.message = Some(self.resp_de(data.message_reader.unwrap())?);
349 self.initial_metadata = data.initial_metadata;
350 self.trailing_metadata = data.trailing_metadata;
351 self.finished = true;
352 Ok(())
353 }
354
message(&mut self) -> Result<T>355 pub async fn message(&mut self) -> Result<T> {
356 self.wait_for_batch_future().await?;
357 Ok(self.message.take().unwrap())
358 }
359
360 /// Get the initial metadata.
headers(&mut self) -> Result<&Metadata>361 pub async fn headers(&mut self) -> Result<&Metadata> {
362 self.wait_for_batch_future().await?;
363 // We still have a reference in share call.
364 Ok(unsafe { self.initial_metadata.assume_valid() })
365 }
366
trailers(&mut self) -> Result<&Metadata>367 pub async fn trailers(&mut self) -> Result<&Metadata> {
368 self.wait_for_batch_future().await?;
369 // We still have a reference in share call.
370 Ok(unsafe { self.trailing_metadata.assume_valid() })
371 }
372 }
373
374 impl<T> Drop for ClientCStreamReceiver<T> {
375 /// The corresponding RPC will be canceled if the receiver did not
376 /// finish before dropping.
drop(&mut self)377 fn drop(&mut self) {
378 if !self.finished {
379 self.cancel();
380 }
381 }
382 }
383
384 impl<T: Unpin> Future for ClientCStreamReceiver<T> {
385 type Output = Result<T>;
386
387 /// Note this method is conflict with method `message`.
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<T>>388 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<T>> {
389 if self.finished {
390 if let Some(message) = self.message.take() {
391 return Poll::Ready(Ok(message));
392 }
393 panic!("future should not be polled twice.");
394 }
395
396 let data = {
397 let mut call = self.call.lock();
398 ready!(call.poll_finish(cx)?)
399 };
400 self.initial_metadata = data.initial_metadata;
401 self.trailing_metadata = data.trailing_metadata;
402 self.finished = true;
403 Poll::Ready((self.resp_de)(data.message_reader.unwrap()))
404 }
405 }
406
407 /// A sink for client streaming call and duplex streaming call.
408 /// To close the sink properly, you should call [`close`] before dropping.
409 ///
410 /// [`close`]: #method.close
411 #[must_use = "if unused the StreamingCallSink may immediately cancel the RPC"]
412 pub struct StreamingCallSink<Req> {
413 call: Arc<Mutex<ShareCall>>,
414 sink_base: SinkBase,
415 close_f: Option<BatchFuture>,
416 req_ser: SerializeFn<Req>,
417 call_flags: u32,
418 }
419
420 impl<Req> StreamingCallSink<Req> {
new( call: Arc<Mutex<ShareCall>>, req_ser: SerializeFn<Req>, call_flags: u32, ) -> StreamingCallSink<Req>421 fn new(
422 call: Arc<Mutex<ShareCall>>,
423 req_ser: SerializeFn<Req>,
424 call_flags: u32,
425 ) -> StreamingCallSink<Req> {
426 StreamingCallSink {
427 call,
428 sink_base: SinkBase::new(false),
429 close_f: None,
430 req_ser,
431 call_flags,
432 }
433 }
434
435 /// By default it always sends messages with their configured buffer hint. But when the
436 /// `enhance_batch` is enabled, messages will be batched together as many as possible.
437 /// The rules are listed as below:
438 /// - All messages except the last one will be sent with `buffer_hint` set to true.
439 /// - The last message will also be sent with `buffer_hint` set to true unless any message is
440 /// offered with buffer hint set to false.
441 ///
442 /// No matter `enhance_batch` is true or false, it's recommended to follow the contract of
443 /// Sink and call `poll_flush` to ensure messages are handled by gRPC C Core.
enhance_batch(&mut self, flag: bool)444 pub fn enhance_batch(&mut self, flag: bool) {
445 self.sink_base.enhance_buffer_strategy = flag;
446 }
447
cancel(&mut self)448 pub fn cancel(&mut self) {
449 let call = self.call.lock();
450 call.call.cancel()
451 }
452 }
453
454 impl<P> Drop for StreamingCallSink<P> {
455 /// The corresponding RPC will be canceled if the sink did not call
456 /// [`close`] before dropping.
457 ///
458 /// [`close`]: #method.close
drop(&mut self)459 fn drop(&mut self) {
460 if self.close_f.is_none() {
461 self.cancel();
462 }
463 }
464 }
465
466 impl<Req> Sink<(Req, WriteFlags)> for StreamingCallSink<Req> {
467 type Error = Error;
468
469 #[inline]
poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>>470 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
471 Pin::new(&mut self.sink_base).poll_ready(cx)
472 }
473
474 #[inline]
start_send(mut self: Pin<&mut Self>, (msg, flags): (Req, WriteFlags)) -> Result<()>475 fn start_send(mut self: Pin<&mut Self>, (msg, flags): (Req, WriteFlags)) -> Result<()> {
476 {
477 let mut call = self.call.lock();
478 call.check_alive()?;
479 }
480 let t = &mut *self;
481 Pin::new(&mut t.sink_base).start_send(&mut t.call, &msg, flags, t.req_ser, t.call_flags)
482 }
483
484 #[inline]
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>>485 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
486 {
487 let mut call = self.call.lock();
488 call.check_alive()?;
489 }
490 let t = &mut *self;
491 Pin::new(&mut t.sink_base).poll_flush(cx, &mut t.call, t.call_flags)
492 }
493
poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>>494 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<()>> {
495 let t = &mut *self;
496 let mut call = t.call.lock();
497 if t.close_f.is_none() {
498 ready!(Pin::new(&mut t.sink_base).poll_ready(cx)?);
499
500 let close_f = call.call.start_send_close_client()?;
501 t.close_f = Some(close_f);
502 }
503
504 if Pin::new(t.close_f.as_mut().unwrap()).poll(cx)?.is_pending() {
505 // if call is finished, can return early here.
506 call.check_alive()?;
507 return Poll::Pending;
508 }
509 Poll::Ready(Ok(()))
510 }
511 }
512
513 /// A sink for client streaming call.
514 ///
515 /// To close the sink properly, you should call [`close`] before dropping.
516 ///
517 /// [`close`]: #method.close
518 pub type ClientCStreamSender<T> = StreamingCallSink<T>;
519 /// A sink for duplex streaming call.
520 ///
521 /// To close the sink properly, you should call [`close`] before dropping.
522 ///
523 /// [`close`]: #method.close
524 pub type ClientDuplexSender<T> = StreamingCallSink<T>;
525
526 enum FutureOrValue<F, V> {
527 Future(F),
528 Value(V),
529 }
530
531 struct ResponseStreamImpl<H, T> {
532 call: H,
533 msg_f: Option<BatchFuture>,
534 read_done: bool,
535 finished: bool,
536 resp_de: DeserializeFn<T>,
537 headers_f: FutureOrValue<BatchFuture, UnownedMetadata>,
538 }
539
540 impl<H: ShareCallHolder + Unpin, T> ResponseStreamImpl<H, T> {
new(call: H, resp_de: DeserializeFn<T>, headers_f: BatchFuture) -> ResponseStreamImpl<H, T>541 fn new(call: H, resp_de: DeserializeFn<T>, headers_f: BatchFuture) -> ResponseStreamImpl<H, T> {
542 ResponseStreamImpl {
543 call,
544 msg_f: None,
545 read_done: false,
546 finished: false,
547 resp_de,
548 headers_f: FutureOrValue::Future(headers_f),
549 }
550 }
551
cancel(&mut self)552 fn cancel(&mut self) {
553 self.call.call(|c| c.call.cancel())
554 }
555
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>>556 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<T>>> {
557 if !self.finished {
558 let t = &mut *self;
559 let finished = &mut t.finished;
560 let _ = t.call.call(|c| {
561 let res = c.poll_finish(cx);
562 *finished = c.finished;
563 res
564 })?;
565 }
566
567 let mut bytes = None;
568 loop {
569 if !self.read_done {
570 if let Some(msg_f) = &mut self.msg_f {
571 let batch_result = ready!(Pin::new(msg_f).poll(cx)?);
572 bytes = batch_result.message_reader;
573 if bytes.is_none() {
574 self.read_done = true;
575 }
576 }
577 }
578
579 if self.read_done {
580 if self.finished {
581 return Poll::Ready(None);
582 }
583 return Poll::Pending;
584 }
585
586 // so msg_f must be either stale or not initialised yet.
587 self.msg_f.take();
588 let msg_f = self.call.call(|c| c.call.start_recv_message())?;
589 self.msg_f = Some(msg_f);
590 if let Some(data) = bytes {
591 let msg = (self.resp_de)(data)?;
592 return Poll::Ready(Some(Ok(msg)));
593 }
594 }
595 }
596
597 // Cancel the call if we still have some messages or did not
598 // receive status code.
on_drop(&mut self)599 fn on_drop(&mut self) {
600 if !self.read_done || !self.finished {
601 self.cancel();
602 }
603 }
604
headers(&mut self) -> Result<&Metadata>605 async fn headers(&mut self) -> Result<&Metadata> {
606 if let FutureOrValue::Future(f) = &mut self.headers_f {
607 self.headers_f = FutureOrValue::Value(Pin::new(f).await?.initial_metadata);
608 }
609 match &self.headers_f {
610 // We still have reference to call.
611 FutureOrValue::Value(v) => Ok(unsafe { v.assume_valid() }),
612 _ => unreachable!(),
613 }
614 }
615 }
616
617 /// A receiver for server streaming call.
618 #[must_use = "if unused the ClientSStreamReceiver may immediately cancel the RPC"]
619 pub struct ClientSStreamReceiver<Resp> {
620 imp: ResponseStreamImpl<ShareCall, Resp>,
621 }
622
623 impl<Resp> ClientSStreamReceiver<Resp> {
new( call: Call, finish_f: BatchFuture, de: DeserializeFn<Resp>, headers_f: BatchFuture, ) -> ClientSStreamReceiver<Resp>624 fn new(
625 call: Call,
626 finish_f: BatchFuture,
627 de: DeserializeFn<Resp>,
628 headers_f: BatchFuture,
629 ) -> ClientSStreamReceiver<Resp> {
630 let share_call = ShareCall::new(call, finish_f);
631 ClientSStreamReceiver {
632 imp: ResponseStreamImpl::new(share_call, de, headers_f),
633 }
634 }
635
cancel(&mut self)636 pub fn cancel(&mut self) {
637 self.imp.cancel()
638 }
639
640 /// Get the initial metadata.
641 #[inline]
headers(&mut self) -> Result<&Metadata>642 pub async fn headers(&mut self) -> Result<&Metadata> {
643 self.imp.headers().await
644 }
645 }
646
647 impl<Resp> Stream for ClientSStreamReceiver<Resp> {
648 type Item = Result<Resp>;
649
650 #[inline]
poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>651 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
652 Pin::new(&mut self.imp).poll(cx)
653 }
654 }
655
656 /// A response receiver for duplex call.
657 ///
658 /// If the corresponding sink has dropped or cancelled, this will poll a
659 /// [`RpcFailure`] error with the [`Cancelled`] status.
660 ///
661 /// [`RpcFailure`]: ./enum.Error.html#variant.RpcFailure
662 /// [`Cancelled`]: ./enum.RpcStatusCode.html#variant.Cancelled
663 #[must_use = "if unused the ClientDuplexReceiver may immediately cancel the RPC"]
664 pub struct ClientDuplexReceiver<Resp> {
665 imp: ResponseStreamImpl<Arc<Mutex<ShareCall>>, Resp>,
666 }
667
668 impl<Resp> ClientDuplexReceiver<Resp> {
new( call: Arc<Mutex<ShareCall>>, de: DeserializeFn<Resp>, headers_f: BatchFuture, ) -> ClientDuplexReceiver<Resp>669 fn new(
670 call: Arc<Mutex<ShareCall>>,
671 de: DeserializeFn<Resp>,
672 headers_f: BatchFuture,
673 ) -> ClientDuplexReceiver<Resp> {
674 ClientDuplexReceiver {
675 imp: ResponseStreamImpl::new(call, de, headers_f),
676 }
677 }
678
cancel(&mut self)679 pub fn cancel(&mut self) {
680 self.imp.cancel()
681 }
682
683 /// Get the initial metadata.
684 #[inline]
headers(&mut self) -> Result<&Metadata>685 pub async fn headers(&mut self) -> Result<&Metadata> {
686 self.imp.headers().await
687 }
688 }
689
690 impl<Resp> Drop for ClientDuplexReceiver<Resp> {
691 /// The corresponding RPC will be canceled if the receiver did not
692 /// finish before dropping.
drop(&mut self)693 fn drop(&mut self) {
694 self.imp.on_drop()
695 }
696 }
697
698 impl<Resp> Stream for ClientDuplexReceiver<Resp> {
699 type Item = Result<Resp>;
700
poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>701 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
702 Pin::new(&mut self.imp).poll(cx)
703 }
704 }
705
706 #[cfg(test)]
707 mod tests {
708 #[test]
test_change_flag()709 fn test_change_flag() {
710 let mut flag = 2 | 4;
711 super::change_flag(&mut flag, 8, true);
712 assert_eq!(flag, 2 | 4 | 8);
713 super::change_flag(&mut flag, 4, false);
714 assert_eq!(flag, 2 | 8);
715 }
716 }
717