1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 use std::borrow::Cow;
4 use std::collections::hash_map::Entry;
5 use std::collections::HashMap;
6 use std::ffi::{CStr, CString};
7 use std::future::Future;
8 use std::sync::Arc;
9 use std::time::Duration;
10 use std::{cmp, i32, ptr};
11 
12 use crate::{
13     grpc_sys::{self, gpr_timespec, grpc_arg_pointer_vtable, grpc_channel, grpc_channel_args},
14     Deadline,
15 };
16 use libc::{self, c_char, c_int};
17 
18 use crate::call::{Call, Method};
19 use crate::cq::CompletionQueue;
20 use crate::env::Environment;
21 use crate::error::Result;
22 use crate::task::CallTag;
23 use crate::task::Kicker;
24 use crate::{CallOption, ChannelCredentials};
25 use crate::{ResourceQuota, RpcStatusCode};
26 
27 pub use crate::grpc_sys::{
28     grpc_compression_algorithm as CompressionAlgorithms,
29     grpc_compression_level as CompressionLevel, grpc_connectivity_state as ConnectivityState,
30 };
31 
32 /// Ref: http://www.grpc.io/docs/guides/wire.html#user-agents
format_user_agent_string(agent: &str) -> CString33 fn format_user_agent_string(agent: &str) -> CString {
34     let version = env!("CARGO_PKG_VERSION");
35     let trimed_agent = agent.trim();
36     let val = if trimed_agent.is_empty() {
37         format!("grpc-rust/{version}")
38     } else {
39         format!("{trimed_agent} grpc-rust/{version}")
40     };
41     CString::new(val).unwrap()
42 }
43 
dur_to_ms(dur: Duration) -> i3244 fn dur_to_ms(dur: Duration) -> i32 {
45     let millis = dur.as_secs() * 1000 + dur.subsec_nanos() as u64 / 1_000_000;
46     cmp::min(i32::MAX as u64, millis) as i32
47 }
48 
49 enum Options {
50     Integer(i32),
51     String(CString),
52     Pointer(ResourceQuota, *const grpc_arg_pointer_vtable),
53 }
54 
55 /// The optimization target for a [`Channel`].
56 #[derive(Clone, Copy)]
57 pub enum OptTarget {
58     /// Minimize latency at the cost of throughput.
59     Latency,
60     /// Balance latency and throughput.
61     Blend,
62     /// Maximize throughput at the expense of latency.
63     Throughput,
64 }
65 
66 #[derive(Clone, Copy)]
67 pub enum LbPolicy {
68     PickFirst,
69     RoundRobin,
70 }
71 
72 /// [`Channel`] factory in order to configure the properties.
73 pub struct ChannelBuilder {
74     env: Arc<Environment>,
75     options: HashMap<Cow<'static, [u8]>, Options>,
76     credentials: Option<ChannelCredentials>,
77 }
78 
79 impl ChannelBuilder {
80     /// Initialize a new [`ChannelBuilder`].
new(env: Arc<Environment>) -> ChannelBuilder81     pub fn new(env: Arc<Environment>) -> ChannelBuilder {
82         ChannelBuilder {
83             env,
84             options: HashMap::new(),
85             credentials: None,
86         }
87     }
88 
89     /// Set default authority to pass if none specified on call construction.
default_authority<S: Into<Vec<u8>>>(mut self, authority: S) -> ChannelBuilder90     pub fn default_authority<S: Into<Vec<u8>>>(mut self, authority: S) -> ChannelBuilder {
91         let authority = CString::new(authority).unwrap();
92         self.options.insert(
93             Cow::Borrowed(grpcio_sys::GRPC_ARG_DEFAULT_AUTHORITY),
94             Options::String(authority),
95         );
96         self
97     }
98 
99     /// Set resource quota by consuming a ResourceQuota
set_resource_quota(mut self, quota: ResourceQuota) -> ChannelBuilder100     pub fn set_resource_quota(mut self, quota: ResourceQuota) -> ChannelBuilder {
101         unsafe {
102             self.options.insert(
103                 Cow::Borrowed(grpcio_sys::GRPC_ARG_RESOURCE_QUOTA),
104                 Options::Pointer(quota, grpc_sys::grpc_resource_quota_arg_vtable()),
105             );
106         }
107         self
108     }
109 
110     /// Set maximum number of concurrent incoming streams to allow on a HTTP/2 connection.
max_concurrent_stream(mut self, num: i32) -> ChannelBuilder111     pub fn max_concurrent_stream(mut self, num: i32) -> ChannelBuilder {
112         self.options.insert(
113             Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_CONCURRENT_STREAMS),
114             Options::Integer(num),
115         );
116         self
117     }
118 
119     /// Set maximum message length that the channel can receive. `-1` means unlimited.
max_receive_message_len(mut self, len: i32) -> ChannelBuilder120     pub fn max_receive_message_len(mut self, len: i32) -> ChannelBuilder {
121         self.options.insert(
122             Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH),
123             Options::Integer(len),
124         );
125         self
126     }
127 
128     /// Set maximum message length that the channel can send. `-1` means unlimited.
max_send_message_len(mut self, len: i32) -> ChannelBuilder129     pub fn max_send_message_len(mut self, len: i32) -> ChannelBuilder {
130         self.options.insert(
131             Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_SEND_MESSAGE_LENGTH),
132             Options::Integer(len),
133         );
134         self
135     }
136 
137     /// Set maximum time between subsequent connection attempts.
max_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder138     pub fn max_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder {
139         self.options.insert(
140             Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_RECONNECT_BACKOFF_MS),
141             Options::Integer(dur_to_ms(backoff)),
142         );
143         self
144     }
145 
146     /// Set time between the first and second connection attempts.
initial_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder147     pub fn initial_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder {
148         self.options.insert(
149             Cow::Borrowed(grpcio_sys::GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS),
150             Options::Integer(dur_to_ms(backoff)),
151         );
152         self
153     }
154 
155     /// Set initial sequence number for HTTP/2 transports.
https_initial_seq_number(mut self, number: i32) -> ChannelBuilder156     pub fn https_initial_seq_number(mut self, number: i32) -> ChannelBuilder {
157         self.options.insert(
158             Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER),
159             Options::Integer(number),
160         );
161         self
162     }
163 
164     /// Set amount to read ahead on individual streams. Defaults to 64KB. Larger
165     /// values help throughput on high-latency connections.
stream_initial_window_size(mut self, window_size: i32) -> ChannelBuilder166     pub fn stream_initial_window_size(mut self, window_size: i32) -> ChannelBuilder {
167         self.options.insert(
168             Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES),
169             Options::Integer(window_size),
170         );
171         self
172     }
173 
174     /// Set primary user agent, which goes at the start of the user-agent metadata sent on
175     /// each request.
primary_user_agent(mut self, agent: &str) -> ChannelBuilder176     pub fn primary_user_agent(mut self, agent: &str) -> ChannelBuilder {
177         let agent_string = format_user_agent_string(agent);
178         self.options.insert(
179             Cow::Borrowed(grpcio_sys::GRPC_ARG_PRIMARY_USER_AGENT_STRING),
180             Options::String(agent_string),
181         );
182         self
183     }
184 
185     /// Set whether to allow the use of `SO_REUSEPORT` if available. Defaults to `true`.
reuse_port(mut self, reuse: bool) -> ChannelBuilder186     pub fn reuse_port(mut self, reuse: bool) -> ChannelBuilder {
187         self.options.insert(
188             Cow::Borrowed(grpcio_sys::GRPC_ARG_ALLOW_REUSEPORT),
189             Options::Integer(reuse as i32),
190         );
191         self
192     }
193 
194     /// Set the size of slice to try and read from the wire each time.
tcp_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder195     pub fn tcp_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder {
196         self.options.insert(
197             Cow::Borrowed(grpcio_sys::GRPC_ARG_TCP_READ_CHUNK_SIZE),
198             Options::Integer(bytes),
199         );
200         self
201     }
202 
203     /// Set the minimum size of slice to try and read from the wire each time.
tcp_min_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder204     pub fn tcp_min_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder {
205         self.options.insert(
206             Cow::Borrowed(grpcio_sys::GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE),
207             Options::Integer(bytes),
208         );
209         self
210     }
211 
212     /// Set the maximum size of slice to try and read from the wire each time.
tcp_max_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder213     pub fn tcp_max_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder {
214         self.options.insert(
215             Cow::Borrowed(grpcio_sys::GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE),
216             Options::Integer(bytes),
217         );
218         self
219     }
220 
221     /// How much data are we willing to queue up per stream if
222     /// write_buffer_hint is set. This is an upper bound.
http2_write_buffer_size(mut self, size: i32) -> ChannelBuilder223     pub fn http2_write_buffer_size(mut self, size: i32) -> ChannelBuilder {
224         self.options.insert(
225             Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE),
226             Options::Integer(size),
227         );
228         self
229     }
230 
231     /// How big a frame are we willing to receive via HTTP/2.
232     /// Min 16384, max 16777215.
233     /// Larger values give lower CPU usage for large messages, but more head of line
234     /// blocking for small messages.
http2_max_frame_size(mut self, size: i32) -> ChannelBuilder235     pub fn http2_max_frame_size(mut self, size: i32) -> ChannelBuilder {
236         self.options.insert(
237             Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MAX_FRAME_SIZE),
238             Options::Integer(size),
239         );
240         self
241     }
242 
243     /// Set whether to enable BDP probing.
http2_bdp_probe(mut self, enable: bool) -> ChannelBuilder244     pub fn http2_bdp_probe(mut self, enable: bool) -> ChannelBuilder {
245         self.options.insert(
246             Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_BDP_PROBE),
247             Options::Integer(enable as i32),
248         );
249         self
250     }
251 
252     /// Minimum time between sending successive ping frames without receiving any
253     /// data frame.
http2_min_sent_ping_interval_without_data( mut self, interval: Duration, ) -> ChannelBuilder254     pub fn http2_min_sent_ping_interval_without_data(
255         mut self,
256         interval: Duration,
257     ) -> ChannelBuilder {
258         self.options.insert(
259             Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS),
260             Options::Integer(dur_to_ms(interval)),
261         );
262         self
263     }
264 
265     /// Minimum allowed time between receiving successive ping frames without
266     /// sending any data frame.
http2_min_recv_ping_interval_without_data( mut self, interval: Duration, ) -> ChannelBuilder267     pub fn http2_min_recv_ping_interval_without_data(
268         mut self,
269         interval: Duration,
270     ) -> ChannelBuilder {
271         self.options.insert(
272             Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS),
273             Options::Integer(dur_to_ms(interval)),
274         );
275         self
276     }
277 
278     /// How many pings can we send before needing to send a data frame or header
279     /// frame? (0 indicates that an infinite number of pings can be sent without
280     /// sending a data frame or header frame)
http2_max_pings_without_data(mut self, num: i32) -> ChannelBuilder281     pub fn http2_max_pings_without_data(mut self, num: i32) -> ChannelBuilder {
282         self.options.insert(
283             Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA),
284             Options::Integer(num),
285         );
286         self
287     }
288 
289     /// How many misbehaving pings the server can bear before sending goaway and
290     /// closing the transport? (0 indicates that the server can bear an infinite
291     /// number of misbehaving pings)
http2_max_ping_strikes(mut self, num: i32) -> ChannelBuilder292     pub fn http2_max_ping_strikes(mut self, num: i32) -> ChannelBuilder {
293         self.options.insert(
294             Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MAX_PING_STRIKES),
295             Options::Integer(num),
296         );
297         self
298     }
299 
300     /// If set to zero, disables use of http proxies.
enable_http_proxy(mut self, num: bool) -> ChannelBuilder301     pub fn enable_http_proxy(mut self, num: bool) -> ChannelBuilder {
302         self.options.insert(
303             Cow::Borrowed(grpcio_sys::GRPC_ARG_ENABLE_HTTP_PROXY),
304             Options::Integer(num as i32),
305         );
306         self
307     }
308 
309     /// Set default compression algorithm for the channel.
default_compression_algorithm(mut self, algo: CompressionAlgorithms) -> ChannelBuilder310     pub fn default_compression_algorithm(mut self, algo: CompressionAlgorithms) -> ChannelBuilder {
311         self.options.insert(
312             Cow::Borrowed(grpcio_sys::GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM),
313             Options::Integer(algo as i32),
314         );
315         self
316     }
317 
318     /// Set default gzip compression level.
319     #[cfg(feature = "nightly")]
default_gzip_compression_level(mut self, level: usize) -> ChannelBuilder320     pub fn default_gzip_compression_level(mut self, level: usize) -> ChannelBuilder {
321         self.options.insert(
322             Cow::Borrowed(grpcio_sys::GRPC_ARG_GZIP_COMPRESSION_LEVEL),
323             Options::Integer(level as i32),
324         );
325         self
326     }
327 
328     /// Set default grpc min message size to compression.
329     #[cfg(feature = "nightly")]
default_grpc_min_message_size_to_compress( mut self, lower_bound: usize, ) -> ChannelBuilder330     pub fn default_grpc_min_message_size_to_compress(
331         mut self,
332         lower_bound: usize,
333     ) -> ChannelBuilder {
334         self.options.insert(
335             Cow::Borrowed(grpcio_sys::GRPC_ARG_MIN_MESSAGE_SIZE_TO_COMPRESS),
336             Options::Integer(lower_bound as i32),
337         );
338         self
339     }
340 
341     /// Set default compression level for the channel.
default_compression_level(mut self, level: CompressionLevel) -> ChannelBuilder342     pub fn default_compression_level(mut self, level: CompressionLevel) -> ChannelBuilder {
343         self.options.insert(
344             Cow::Borrowed(grpcio_sys::GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL),
345             Options::Integer(level as i32),
346         );
347         self
348     }
349 
350     /// After a duration of this time the client/server pings its peer to see
351     /// if the transport is still alive.
keepalive_time(mut self, timeout: Duration) -> ChannelBuilder352     pub fn keepalive_time(mut self, timeout: Duration) -> ChannelBuilder {
353         self.options.insert(
354             Cow::Borrowed(grpcio_sys::GRPC_ARG_KEEPALIVE_TIME_MS),
355             Options::Integer(dur_to_ms(timeout)),
356         );
357         self
358     }
359 
360     /// After waiting for a duration of this time, if the keepalive ping sender does
361     /// not receive the ping ack, it will close the transport.
keepalive_timeout(mut self, timeout: Duration) -> ChannelBuilder362     pub fn keepalive_timeout(mut self, timeout: Duration) -> ChannelBuilder {
363         self.options.insert(
364             Cow::Borrowed(grpcio_sys::GRPC_ARG_KEEPALIVE_TIMEOUT_MS),
365             Options::Integer(dur_to_ms(timeout)),
366         );
367         self
368     }
369 
370     /// Is it permissible to send keepalive pings without any outstanding streams.
keepalive_permit_without_calls(mut self, allow: bool) -> ChannelBuilder371     pub fn keepalive_permit_without_calls(mut self, allow: bool) -> ChannelBuilder {
372         self.options.insert(
373             Cow::Borrowed(grpcio_sys::GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS),
374             Options::Integer(allow as i32),
375         );
376         self
377     }
378 
379     /// Set optimization target for the channel. See [`OptTarget`] for all available
380     /// optimization targets. Defaults to `OptTarget::Blend`.
optimize_for(mut self, target: OptTarget) -> ChannelBuilder381     pub fn optimize_for(mut self, target: OptTarget) -> ChannelBuilder {
382         let val = match target {
383             OptTarget::Latency => CString::new("latency"),
384             OptTarget::Blend => CString::new("blend"),
385             OptTarget::Throughput => CString::new("throughput"),
386         };
387         self.options.insert(
388             Cow::Borrowed(grpcio_sys::GRPC_ARG_OPTIMIZATION_TARGET),
389             Options::String(val.unwrap()),
390         );
391         self
392     }
393 
394     /// Set LbPolicy for channel
395     ///
396     /// This method allows one to set the load-balancing policy for a given channel.
load_balancing_policy(mut self, lb_policy: LbPolicy) -> ChannelBuilder397     pub fn load_balancing_policy(mut self, lb_policy: LbPolicy) -> ChannelBuilder {
398         let val = match lb_policy {
399             LbPolicy::PickFirst => CString::new("pick_first"),
400             LbPolicy::RoundRobin => CString::new("round_robin"),
401         };
402         self.options.insert(
403             Cow::Borrowed(grpcio_sys::GRPC_ARG_LB_POLICY_NAME),
404             Options::String(val.unwrap()),
405         );
406         self
407     }
408 
409     /// Set use local subchannel pool
410     ///
411     /// This method allows channel use it's owned subchannel pool.
use_local_subchannel_pool(mut self, enable: bool) -> ChannelBuilder412     pub fn use_local_subchannel_pool(mut self, enable: bool) -> ChannelBuilder {
413         self.options.insert(
414             Cow::Borrowed(grpcio_sys::GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL),
415             Options::Integer(enable as i32),
416         );
417         self
418     }
419 
420     /// Enables retry functionality.  Defaults to true.  When enabled, transparent
421     /// retries will be performed as appropriate, and configurable retries are
422     /// enabled when they are configured via the service config. For details, see:
423     ///   https://github.com/grpc/proposal/blob/master/A6-client-retries.md
424     /// NOTE: Hedging functionality is not yet implemented.
enable_retry(mut self, enable: bool) -> ChannelBuilder425     pub fn enable_retry(mut self, enable: bool) -> ChannelBuilder {
426         self.options.insert(
427             Cow::Borrowed(grpcio_sys::GRPC_ARG_ENABLE_RETRIES),
428             Options::Integer(enable as i32),
429         );
430         self
431     }
432 
433     /// Set a raw integer configuration.
434     ///
435     /// This method is only for bench usage, users should use the encapsulated API instead.
436     #[doc(hidden)]
raw_cfg_int(mut self, key: CString, val: i32) -> ChannelBuilder437     pub fn raw_cfg_int(mut self, key: CString, val: i32) -> ChannelBuilder {
438         self.options
439             .insert(Cow::Owned(key.into_bytes_with_nul()), Options::Integer(val));
440         self
441     }
442 
443     /// Set a raw string configuration.
444     ///
445     /// This method is only for bench usage, users should use the encapsulated API instead.
446     #[doc(hidden)]
raw_cfg_string(mut self, key: CString, val: CString) -> ChannelBuilder447     pub fn raw_cfg_string(mut self, key: CString, val: CString) -> ChannelBuilder {
448         self.options
449             .insert(Cow::Owned(key.into_bytes_with_nul()), Options::String(val));
450         self
451     }
452 
453     /// Build `ChannelArgs` from the current configuration.
454     #[allow(clippy::useless_conversion)]
455     #[allow(clippy::cmp_owned)]
build_args(&self) -> ChannelArgs456     pub fn build_args(&self) -> ChannelArgs {
457         let args = unsafe { grpc_sys::grpcwrap_channel_args_create(self.options.len()) };
458         for (i, (k, v)) in self.options.iter().enumerate() {
459             let key = k.as_ptr() as *const c_char;
460             match *v {
461                 Options::Integer(val) => unsafe {
462                     // On most modern compiler and architect, c_int is the same as i32,
463                     // panic directly to simplify signature.
464                     assert!(
465                         val <= i32::from(libc::INT_MAX) && val >= i32::from(libc::INT_MIN),
466                         "{} is out of range for {:?}",
467                         val,
468                         CStr::from_bytes_with_nul(k).unwrap()
469                     );
470                     grpc_sys::grpcwrap_channel_args_set_integer(args, i, key, val as c_int)
471                 },
472                 Options::String(ref val) => unsafe {
473                     grpc_sys::grpcwrap_channel_args_set_string(args, i, key, val.as_ptr())
474                 },
475                 Options::Pointer(ref quota, vtable) => unsafe {
476                     grpc_sys::grpcwrap_channel_args_set_pointer_vtable(
477                         args,
478                         i,
479                         key,
480                         quota.get_ptr() as _,
481                         vtable,
482                     )
483                 },
484             }
485         }
486         ChannelArgs { args }
487     }
488 
prepare_connect_args(&mut self) -> ChannelArgs489     fn prepare_connect_args(&mut self) -> ChannelArgs {
490         if let Entry::Vacant(e) = self.options.entry(Cow::Borrowed(
491             grpcio_sys::GRPC_ARG_PRIMARY_USER_AGENT_STRING,
492         )) {
493             e.insert(Options::String(format_user_agent_string("")));
494         }
495         self.build_args()
496     }
497 
498     /// Build an [`Channel`] that connects to a specific address.
connect(mut self, addr: &str) -> Channel499     pub fn connect(mut self, addr: &str) -> Channel {
500         let args = self.prepare_connect_args();
501         let addr = CString::new(addr).unwrap();
502         let addr_ptr = addr.as_ptr();
503         let mut creds = self
504             .credentials
505             .unwrap_or_else(ChannelCredentials::insecure);
506         let channel =
507             unsafe { grpcio_sys::grpc_channel_create(addr_ptr, creds.as_mut_ptr(), args.args) };
508 
509         unsafe { Channel::new(self.env.pick_cq(), self.env, channel) }
510     }
511 
512     /// Build an [`Channel`] taking over an established connection from
513     /// a file descriptor. The target string given is purely informative to
514     /// describe the endpoint of the connection. Takes ownership of the given
515     /// file descriptor and will close it when the connection is closed.
516     ///
517     /// This function is available on posix systems only.
518     ///
519     /// # Safety
520     ///
521     /// The file descriptor must correspond to a connected stream socket. After
522     /// this call, the socket must not be accessed (read / written / closed)
523     /// by other code.
524     #[cfg(unix)]
connect_from_fd(mut self, target: &str, fd: ::std::os::raw::c_int) -> Channel525     pub unsafe fn connect_from_fd(mut self, target: &str, fd: ::std::os::raw::c_int) -> Channel {
526         let args = self.prepare_connect_args();
527         let target = CString::new(target).unwrap();
528         let target_ptr = target.as_ptr();
529         // Actually only insecure credentials are supported currently.
530         let mut creds = self
531             .credentials
532             .unwrap_or_else(ChannelCredentials::insecure);
533         let channel =
534             grpcio_sys::grpc_channel_create_from_fd(target_ptr, fd, creds.as_mut_ptr(), args.args);
535 
536         Channel::new(self.env.pick_cq(), self.env, channel)
537     }
538 }
539 
540 #[cfg(feature = "_secure")]
541 mod secure_channel {
542     use std::borrow::Cow;
543     use std::ffi::CString;
544 
545     use crate::ChannelCredentials;
546 
547     use super::{ChannelBuilder, Options};
548 
549     const OPT_SSL_TARGET_NAME_OVERRIDE: &[u8] = b"grpc.ssl_target_name_override\0";
550 
551     impl ChannelBuilder {
552         /// The caller of the secure_channel_create functions may override the target name used
553         /// for SSL host name checking using this channel argument.
554         ///
555         /// This *should* be used for testing only.
556         #[doc(hidden)]
override_ssl_target<S: Into<Vec<u8>>>(mut self, target: S) -> ChannelBuilder557         pub fn override_ssl_target<S: Into<Vec<u8>>>(mut self, target: S) -> ChannelBuilder {
558             let target = CString::new(target).unwrap();
559             self.options.insert(
560                 Cow::Borrowed(OPT_SSL_TARGET_NAME_OVERRIDE),
561                 Options::String(target),
562             );
563             self
564         }
565 
566         /// Set the credentials used to build the connection.
set_credentials(mut self, creds: ChannelCredentials) -> ChannelBuilder567         pub fn set_credentials(mut self, creds: ChannelCredentials) -> ChannelBuilder {
568             self.credentials = Some(creds);
569             self
570         }
571     }
572 }
573 
574 pub struct ChannelArgs {
575     args: *mut grpc_channel_args,
576 }
577 
578 impl ChannelArgs {
as_ptr(&self) -> *const grpc_channel_args579     pub fn as_ptr(&self) -> *const grpc_channel_args {
580         self.args
581     }
582 }
583 
584 impl Drop for ChannelArgs {
drop(&mut self)585     fn drop(&mut self) {
586         unsafe { grpc_sys::grpcwrap_channel_args_destroy(self.args) }
587     }
588 }
589 
590 struct ChannelInner {
591     _env: Arc<Environment>,
592     channel: *mut grpc_channel,
593 }
594 
595 impl ChannelInner {
596     // If try_to_connect is true, the channel will try to establish a connection, potentially
597     // changing the state.
check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState598     fn check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState {
599         unsafe {
600             grpc_sys::grpc_channel_check_connectivity_state(self.channel, try_to_connect as _)
601         }
602     }
603 }
604 
605 impl Drop for ChannelInner {
drop(&mut self)606     fn drop(&mut self) {
607         unsafe {
608             grpc_sys::grpc_channel_destroy(self.channel);
609         }
610     }
611 }
612 
613 /// A gRPC channel.
614 ///
615 /// Channels are an abstraction of long-lived connections to remote servers. More client objects
616 /// can reuse the same channel.
617 ///
618 /// Use [`ChannelBuilder`] to build a [`Channel`].
619 #[derive(Clone)]
620 pub struct Channel {
621     inner: Arc<ChannelInner>,
622     cq: CompletionQueue,
623 }
624 
625 #[allow(clippy::non_send_fields_in_send_ty)]
626 unsafe impl Send for Channel {}
627 unsafe impl Sync for Channel {}
628 
629 impl Channel {
630     /// Create a new channel. Avoid using this directly and use
631     /// [`ChannelBuilder`] to build a [`Channel`] instead.
632     ///
633     /// # Safety
634     ///
635     /// The given grpc_channel must correspond to an instantiated grpc core
636     /// channel. Takes exclusive ownership of the channel and will close it after
637     /// use.
new( cq: CompletionQueue, env: Arc<Environment>, channel: *mut grpc_channel, ) -> Channel638     pub unsafe fn new(
639         cq: CompletionQueue,
640         env: Arc<Environment>,
641         channel: *mut grpc_channel,
642     ) -> Channel {
643         Channel {
644             inner: Arc::new(ChannelInner { _env: env, channel }),
645             cq,
646         }
647     }
648 
649     /// Create a lame channel that will fail all its operations.
lame(env: Arc<Environment>, target: &str) -> Channel650     pub fn lame(env: Arc<Environment>, target: &str) -> Channel {
651         unsafe {
652             let target = CString::new(target).unwrap();
653             let ch = grpc_sys::grpc_lame_client_channel_create(
654                 target.as_ptr(),
655                 RpcStatusCode::UNAVAILABLE.into(),
656                 b"call on lame client\0".as_ptr() as _,
657             );
658             Self::new(env.pick_cq(), env, ch)
659         }
660     }
661 
662     /// If try_to_connect is true, the channel will try to establish a connection, potentially
663     /// changing the state.
check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState664     pub fn check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState {
665         self.inner.check_connectivity_state(try_to_connect)
666     }
667 
668     /// Blocking wait for channel state change or deadline expiration.
669     ///
670     /// `check_connectivity_state` needs to be called to get the current state. Returns false
671     /// means deadline excceeds before observing any state changes.
wait_for_state_change( &self, last_observed: ConnectivityState, deadline: impl Into<Deadline>, ) -> impl Future<Output = bool>672     pub fn wait_for_state_change(
673         &self,
674         last_observed: ConnectivityState,
675         deadline: impl Into<Deadline>,
676     ) -> impl Future<Output = bool> {
677         let (cq_f, prom) = CallTag::action_pair();
678         let prom_box = Box::new(prom);
679         let tag = Box::into_raw(prom_box);
680         let should_wait = if let Ok(cq_ref) = self.cq.borrow() {
681             unsafe {
682                 grpcio_sys::grpc_channel_watch_connectivity_state(
683                     self.inner.channel,
684                     last_observed,
685                     deadline.into().spec(),
686                     cq_ref.as_ptr(),
687                     tag as *mut _,
688                 )
689             }
690             true
691         } else {
692             // It's already shutdown.
693             false
694         };
695         async move { should_wait && cq_f.await.unwrap() }
696     }
697 
698     /// Wait for this channel to be connected.
699     ///
700     /// Returns false means deadline excceeds before connection is connected.
wait_for_connected(&self, deadline: impl Into<Deadline>) -> bool701     pub async fn wait_for_connected(&self, deadline: impl Into<Deadline>) -> bool {
702         // Fast path, it's probably connected.
703         let mut state = self.check_connectivity_state(true);
704         if ConnectivityState::GRPC_CHANNEL_READY == state {
705             return true;
706         }
707         let deadline = deadline.into();
708         loop {
709             if self.wait_for_state_change(state, deadline).await {
710                 state = self.check_connectivity_state(true);
711                 match state {
712                     ConnectivityState::GRPC_CHANNEL_READY => return true,
713                     ConnectivityState::GRPC_CHANNEL_SHUTDOWN => return false,
714                     _ => (),
715                 }
716                 continue;
717             }
718             return false;
719         }
720     }
721 
722     /// Create a Kicker.
create_kicker(&self) -> Result<Kicker>723     pub(crate) fn create_kicker(&self) -> Result<Kicker> {
724         let cq_ref = self.cq.borrow()?;
725         let raw_call = unsafe {
726             let ch = self.inner.channel;
727             let cq = cq_ref.as_ptr();
728             // Do not timeout.
729             let timeout = gpr_timespec::inf_future();
730             grpc_sys::grpcwrap_channel_create_call(
731                 ch,
732                 ptr::null_mut(),
733                 0,
734                 cq,
735                 ptr::null(),
736                 0,
737                 ptr::null(),
738                 0,
739                 timeout,
740             )
741         };
742         let call = unsafe { Call::from_raw(raw_call, self.cq.clone()) };
743         Ok(Kicker::from_call(call))
744     }
745 
746     /// Create a call using the method and option.
create_call<Req, Resp>( &self, method: &Method<Req, Resp>, opt: &CallOption, ) -> Result<Call>747     pub(crate) fn create_call<Req, Resp>(
748         &self,
749         method: &Method<Req, Resp>,
750         opt: &CallOption,
751     ) -> Result<Call> {
752         let cq_ref = self.cq.borrow()?;
753         let raw_call = unsafe {
754             let ch = self.inner.channel;
755             let cq = cq_ref.as_ptr();
756             let method_ptr = method.name.as_ptr();
757             let method_len = method.name.len();
758             let timeout = opt
759                 .get_timeout()
760                 .map_or_else(gpr_timespec::inf_future, gpr_timespec::from);
761             grpc_sys::grpcwrap_channel_create_call(
762                 ch,
763                 ptr::null_mut(),
764                 0,
765                 cq,
766                 method_ptr as *const _,
767                 method_len,
768                 ptr::null(),
769                 0,
770                 timeout,
771             )
772         };
773 
774         unsafe { Ok(Call::from_raw(raw_call, self.cq.clone())) }
775     }
776 
cq(&self) -> &CompletionQueue777     pub(crate) fn cq(&self) -> &CompletionQueue {
778         &self.cq
779     }
780 }
781 
782 #[cfg(test)]
783 #[cfg(feature = "nightly")]
784 mod tests {
785     use crate::env::Environment;
786     use crate::ChannelBuilder;
787     use std::sync::Arc;
788 
789     #[test]
790     #[cfg(feature = "nightly")]
test_grpc_min_message_size_to_compress()791     fn test_grpc_min_message_size_to_compress() {
792         let env = Arc::new(Environment::new(1));
793         let cb = ChannelBuilder::new(env);
794         cb.default_grpc_min_message_size_to_compress(1);
795     }
796     #[test]
797     #[cfg(feature = "nightly")]
test_gzip_compression_level()798     fn test_gzip_compression_level() {
799         let env = Arc::new(Environment::new(1));
800         let cb = ChannelBuilder::new(env);
801         cb.default_gzip_compression_level(1);
802     }
803 }
804