1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3 use std::borrow::Cow;
4 use std::collections::hash_map::Entry;
5 use std::collections::HashMap;
6 use std::ffi::{CStr, CString};
7 use std::future::Future;
8 use std::sync::Arc;
9 use std::time::Duration;
10 use std::{cmp, i32, ptr};
11
12 use crate::{
13 grpc_sys::{self, gpr_timespec, grpc_arg_pointer_vtable, grpc_channel, grpc_channel_args},
14 Deadline,
15 };
16 use libc::{self, c_char, c_int};
17
18 use crate::call::{Call, Method};
19 use crate::cq::CompletionQueue;
20 use crate::env::Environment;
21 use crate::error::Result;
22 use crate::task::CallTag;
23 use crate::task::Kicker;
24 use crate::{CallOption, ChannelCredentials};
25 use crate::{ResourceQuota, RpcStatusCode};
26
27 pub use crate::grpc_sys::{
28 grpc_compression_algorithm as CompressionAlgorithms,
29 grpc_compression_level as CompressionLevel, grpc_connectivity_state as ConnectivityState,
30 };
31
32 /// Ref: http://www.grpc.io/docs/guides/wire.html#user-agents
format_user_agent_string(agent: &str) -> CString33 fn format_user_agent_string(agent: &str) -> CString {
34 let version = env!("CARGO_PKG_VERSION");
35 let trimed_agent = agent.trim();
36 let val = if trimed_agent.is_empty() {
37 format!("grpc-rust/{version}")
38 } else {
39 format!("{trimed_agent} grpc-rust/{version}")
40 };
41 CString::new(val).unwrap()
42 }
43
dur_to_ms(dur: Duration) -> i3244 fn dur_to_ms(dur: Duration) -> i32 {
45 let millis = dur.as_secs() * 1000 + dur.subsec_nanos() as u64 / 1_000_000;
46 cmp::min(i32::MAX as u64, millis) as i32
47 }
48
49 enum Options {
50 Integer(i32),
51 String(CString),
52 Pointer(ResourceQuota, *const grpc_arg_pointer_vtable),
53 }
54
55 /// The optimization target for a [`Channel`].
56 #[derive(Clone, Copy)]
57 pub enum OptTarget {
58 /// Minimize latency at the cost of throughput.
59 Latency,
60 /// Balance latency and throughput.
61 Blend,
62 /// Maximize throughput at the expense of latency.
63 Throughput,
64 }
65
66 #[derive(Clone, Copy)]
67 pub enum LbPolicy {
68 PickFirst,
69 RoundRobin,
70 }
71
72 /// [`Channel`] factory in order to configure the properties.
73 pub struct ChannelBuilder {
74 env: Arc<Environment>,
75 options: HashMap<Cow<'static, [u8]>, Options>,
76 credentials: Option<ChannelCredentials>,
77 }
78
79 impl ChannelBuilder {
80 /// Initialize a new [`ChannelBuilder`].
new(env: Arc<Environment>) -> ChannelBuilder81 pub fn new(env: Arc<Environment>) -> ChannelBuilder {
82 ChannelBuilder {
83 env,
84 options: HashMap::new(),
85 credentials: None,
86 }
87 }
88
89 /// Set default authority to pass if none specified on call construction.
default_authority<S: Into<Vec<u8>>>(mut self, authority: S) -> ChannelBuilder90 pub fn default_authority<S: Into<Vec<u8>>>(mut self, authority: S) -> ChannelBuilder {
91 let authority = CString::new(authority).unwrap();
92 self.options.insert(
93 Cow::Borrowed(grpcio_sys::GRPC_ARG_DEFAULT_AUTHORITY),
94 Options::String(authority),
95 );
96 self
97 }
98
99 /// Set resource quota by consuming a ResourceQuota
set_resource_quota(mut self, quota: ResourceQuota) -> ChannelBuilder100 pub fn set_resource_quota(mut self, quota: ResourceQuota) -> ChannelBuilder {
101 unsafe {
102 self.options.insert(
103 Cow::Borrowed(grpcio_sys::GRPC_ARG_RESOURCE_QUOTA),
104 Options::Pointer(quota, grpc_sys::grpc_resource_quota_arg_vtable()),
105 );
106 }
107 self
108 }
109
110 /// Set maximum number of concurrent incoming streams to allow on a HTTP/2 connection.
max_concurrent_stream(mut self, num: i32) -> ChannelBuilder111 pub fn max_concurrent_stream(mut self, num: i32) -> ChannelBuilder {
112 self.options.insert(
113 Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_CONCURRENT_STREAMS),
114 Options::Integer(num),
115 );
116 self
117 }
118
119 /// Set maximum message length that the channel can receive. `-1` means unlimited.
max_receive_message_len(mut self, len: i32) -> ChannelBuilder120 pub fn max_receive_message_len(mut self, len: i32) -> ChannelBuilder {
121 self.options.insert(
122 Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH),
123 Options::Integer(len),
124 );
125 self
126 }
127
128 /// Set maximum message length that the channel can send. `-1` means unlimited.
max_send_message_len(mut self, len: i32) -> ChannelBuilder129 pub fn max_send_message_len(mut self, len: i32) -> ChannelBuilder {
130 self.options.insert(
131 Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_SEND_MESSAGE_LENGTH),
132 Options::Integer(len),
133 );
134 self
135 }
136
137 /// Set maximum time between subsequent connection attempts.
max_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder138 pub fn max_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder {
139 self.options.insert(
140 Cow::Borrowed(grpcio_sys::GRPC_ARG_MAX_RECONNECT_BACKOFF_MS),
141 Options::Integer(dur_to_ms(backoff)),
142 );
143 self
144 }
145
146 /// Set time between the first and second connection attempts.
initial_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder147 pub fn initial_reconnect_backoff(mut self, backoff: Duration) -> ChannelBuilder {
148 self.options.insert(
149 Cow::Borrowed(grpcio_sys::GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS),
150 Options::Integer(dur_to_ms(backoff)),
151 );
152 self
153 }
154
155 /// Set initial sequence number for HTTP/2 transports.
https_initial_seq_number(mut self, number: i32) -> ChannelBuilder156 pub fn https_initial_seq_number(mut self, number: i32) -> ChannelBuilder {
157 self.options.insert(
158 Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER),
159 Options::Integer(number),
160 );
161 self
162 }
163
164 /// Set amount to read ahead on individual streams. Defaults to 64KB. Larger
165 /// values help throughput on high-latency connections.
stream_initial_window_size(mut self, window_size: i32) -> ChannelBuilder166 pub fn stream_initial_window_size(mut self, window_size: i32) -> ChannelBuilder {
167 self.options.insert(
168 Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES),
169 Options::Integer(window_size),
170 );
171 self
172 }
173
174 /// Set primary user agent, which goes at the start of the user-agent metadata sent on
175 /// each request.
primary_user_agent(mut self, agent: &str) -> ChannelBuilder176 pub fn primary_user_agent(mut self, agent: &str) -> ChannelBuilder {
177 let agent_string = format_user_agent_string(agent);
178 self.options.insert(
179 Cow::Borrowed(grpcio_sys::GRPC_ARG_PRIMARY_USER_AGENT_STRING),
180 Options::String(agent_string),
181 );
182 self
183 }
184
185 /// Set whether to allow the use of `SO_REUSEPORT` if available. Defaults to `true`.
reuse_port(mut self, reuse: bool) -> ChannelBuilder186 pub fn reuse_port(mut self, reuse: bool) -> ChannelBuilder {
187 self.options.insert(
188 Cow::Borrowed(grpcio_sys::GRPC_ARG_ALLOW_REUSEPORT),
189 Options::Integer(reuse as i32),
190 );
191 self
192 }
193
194 /// Set the size of slice to try and read from the wire each time.
tcp_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder195 pub fn tcp_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder {
196 self.options.insert(
197 Cow::Borrowed(grpcio_sys::GRPC_ARG_TCP_READ_CHUNK_SIZE),
198 Options::Integer(bytes),
199 );
200 self
201 }
202
203 /// Set the minimum size of slice to try and read from the wire each time.
tcp_min_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder204 pub fn tcp_min_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder {
205 self.options.insert(
206 Cow::Borrowed(grpcio_sys::GRPC_ARG_TCP_MIN_READ_CHUNK_SIZE),
207 Options::Integer(bytes),
208 );
209 self
210 }
211
212 /// Set the maximum size of slice to try and read from the wire each time.
tcp_max_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder213 pub fn tcp_max_read_chunk_size(mut self, bytes: i32) -> ChannelBuilder {
214 self.options.insert(
215 Cow::Borrowed(grpcio_sys::GRPC_ARG_TCP_MAX_READ_CHUNK_SIZE),
216 Options::Integer(bytes),
217 );
218 self
219 }
220
221 /// How much data are we willing to queue up per stream if
222 /// write_buffer_hint is set. This is an upper bound.
http2_write_buffer_size(mut self, size: i32) -> ChannelBuilder223 pub fn http2_write_buffer_size(mut self, size: i32) -> ChannelBuilder {
224 self.options.insert(
225 Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE),
226 Options::Integer(size),
227 );
228 self
229 }
230
231 /// How big a frame are we willing to receive via HTTP/2.
232 /// Min 16384, max 16777215.
233 /// Larger values give lower CPU usage for large messages, but more head of line
234 /// blocking for small messages.
http2_max_frame_size(mut self, size: i32) -> ChannelBuilder235 pub fn http2_max_frame_size(mut self, size: i32) -> ChannelBuilder {
236 self.options.insert(
237 Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MAX_FRAME_SIZE),
238 Options::Integer(size),
239 );
240 self
241 }
242
243 /// Set whether to enable BDP probing.
http2_bdp_probe(mut self, enable: bool) -> ChannelBuilder244 pub fn http2_bdp_probe(mut self, enable: bool) -> ChannelBuilder {
245 self.options.insert(
246 Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_BDP_PROBE),
247 Options::Integer(enable as i32),
248 );
249 self
250 }
251
252 /// Minimum time between sending successive ping frames without receiving any
253 /// data frame.
http2_min_sent_ping_interval_without_data( mut self, interval: Duration, ) -> ChannelBuilder254 pub fn http2_min_sent_ping_interval_without_data(
255 mut self,
256 interval: Duration,
257 ) -> ChannelBuilder {
258 self.options.insert(
259 Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS),
260 Options::Integer(dur_to_ms(interval)),
261 );
262 self
263 }
264
265 /// Minimum allowed time between receiving successive ping frames without
266 /// sending any data frame.
http2_min_recv_ping_interval_without_data( mut self, interval: Duration, ) -> ChannelBuilder267 pub fn http2_min_recv_ping_interval_without_data(
268 mut self,
269 interval: Duration,
270 ) -> ChannelBuilder {
271 self.options.insert(
272 Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS),
273 Options::Integer(dur_to_ms(interval)),
274 );
275 self
276 }
277
278 /// How many pings can we send before needing to send a data frame or header
279 /// frame? (0 indicates that an infinite number of pings can be sent without
280 /// sending a data frame or header frame)
http2_max_pings_without_data(mut self, num: i32) -> ChannelBuilder281 pub fn http2_max_pings_without_data(mut self, num: i32) -> ChannelBuilder {
282 self.options.insert(
283 Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA),
284 Options::Integer(num),
285 );
286 self
287 }
288
289 /// How many misbehaving pings the server can bear before sending goaway and
290 /// closing the transport? (0 indicates that the server can bear an infinite
291 /// number of misbehaving pings)
http2_max_ping_strikes(mut self, num: i32) -> ChannelBuilder292 pub fn http2_max_ping_strikes(mut self, num: i32) -> ChannelBuilder {
293 self.options.insert(
294 Cow::Borrowed(grpcio_sys::GRPC_ARG_HTTP2_MAX_PING_STRIKES),
295 Options::Integer(num),
296 );
297 self
298 }
299
300 /// If set to zero, disables use of http proxies.
enable_http_proxy(mut self, num: bool) -> ChannelBuilder301 pub fn enable_http_proxy(mut self, num: bool) -> ChannelBuilder {
302 self.options.insert(
303 Cow::Borrowed(grpcio_sys::GRPC_ARG_ENABLE_HTTP_PROXY),
304 Options::Integer(num as i32),
305 );
306 self
307 }
308
309 /// Set default compression algorithm for the channel.
default_compression_algorithm(mut self, algo: CompressionAlgorithms) -> ChannelBuilder310 pub fn default_compression_algorithm(mut self, algo: CompressionAlgorithms) -> ChannelBuilder {
311 self.options.insert(
312 Cow::Borrowed(grpcio_sys::GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM),
313 Options::Integer(algo as i32),
314 );
315 self
316 }
317
318 /// Set default gzip compression level.
319 #[cfg(feature = "nightly")]
default_gzip_compression_level(mut self, level: usize) -> ChannelBuilder320 pub fn default_gzip_compression_level(mut self, level: usize) -> ChannelBuilder {
321 self.options.insert(
322 Cow::Borrowed(grpcio_sys::GRPC_ARG_GZIP_COMPRESSION_LEVEL),
323 Options::Integer(level as i32),
324 );
325 self
326 }
327
328 /// Set default grpc min message size to compression.
329 #[cfg(feature = "nightly")]
default_grpc_min_message_size_to_compress( mut self, lower_bound: usize, ) -> ChannelBuilder330 pub fn default_grpc_min_message_size_to_compress(
331 mut self,
332 lower_bound: usize,
333 ) -> ChannelBuilder {
334 self.options.insert(
335 Cow::Borrowed(grpcio_sys::GRPC_ARG_MIN_MESSAGE_SIZE_TO_COMPRESS),
336 Options::Integer(lower_bound as i32),
337 );
338 self
339 }
340
341 /// Set default compression level for the channel.
default_compression_level(mut self, level: CompressionLevel) -> ChannelBuilder342 pub fn default_compression_level(mut self, level: CompressionLevel) -> ChannelBuilder {
343 self.options.insert(
344 Cow::Borrowed(grpcio_sys::GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL),
345 Options::Integer(level as i32),
346 );
347 self
348 }
349
350 /// After a duration of this time the client/server pings its peer to see
351 /// if the transport is still alive.
keepalive_time(mut self, timeout: Duration) -> ChannelBuilder352 pub fn keepalive_time(mut self, timeout: Duration) -> ChannelBuilder {
353 self.options.insert(
354 Cow::Borrowed(grpcio_sys::GRPC_ARG_KEEPALIVE_TIME_MS),
355 Options::Integer(dur_to_ms(timeout)),
356 );
357 self
358 }
359
360 /// After waiting for a duration of this time, if the keepalive ping sender does
361 /// not receive the ping ack, it will close the transport.
keepalive_timeout(mut self, timeout: Duration) -> ChannelBuilder362 pub fn keepalive_timeout(mut self, timeout: Duration) -> ChannelBuilder {
363 self.options.insert(
364 Cow::Borrowed(grpcio_sys::GRPC_ARG_KEEPALIVE_TIMEOUT_MS),
365 Options::Integer(dur_to_ms(timeout)),
366 );
367 self
368 }
369
370 /// Is it permissible to send keepalive pings without any outstanding streams.
keepalive_permit_without_calls(mut self, allow: bool) -> ChannelBuilder371 pub fn keepalive_permit_without_calls(mut self, allow: bool) -> ChannelBuilder {
372 self.options.insert(
373 Cow::Borrowed(grpcio_sys::GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS),
374 Options::Integer(allow as i32),
375 );
376 self
377 }
378
379 /// Set optimization target for the channel. See [`OptTarget`] for all available
380 /// optimization targets. Defaults to `OptTarget::Blend`.
optimize_for(mut self, target: OptTarget) -> ChannelBuilder381 pub fn optimize_for(mut self, target: OptTarget) -> ChannelBuilder {
382 let val = match target {
383 OptTarget::Latency => CString::new("latency"),
384 OptTarget::Blend => CString::new("blend"),
385 OptTarget::Throughput => CString::new("throughput"),
386 };
387 self.options.insert(
388 Cow::Borrowed(grpcio_sys::GRPC_ARG_OPTIMIZATION_TARGET),
389 Options::String(val.unwrap()),
390 );
391 self
392 }
393
394 /// Set LbPolicy for channel
395 ///
396 /// This method allows one to set the load-balancing policy for a given channel.
load_balancing_policy(mut self, lb_policy: LbPolicy) -> ChannelBuilder397 pub fn load_balancing_policy(mut self, lb_policy: LbPolicy) -> ChannelBuilder {
398 let val = match lb_policy {
399 LbPolicy::PickFirst => CString::new("pick_first"),
400 LbPolicy::RoundRobin => CString::new("round_robin"),
401 };
402 self.options.insert(
403 Cow::Borrowed(grpcio_sys::GRPC_ARG_LB_POLICY_NAME),
404 Options::String(val.unwrap()),
405 );
406 self
407 }
408
409 /// Set use local subchannel pool
410 ///
411 /// This method allows channel use it's owned subchannel pool.
use_local_subchannel_pool(mut self, enable: bool) -> ChannelBuilder412 pub fn use_local_subchannel_pool(mut self, enable: bool) -> ChannelBuilder {
413 self.options.insert(
414 Cow::Borrowed(grpcio_sys::GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL),
415 Options::Integer(enable as i32),
416 );
417 self
418 }
419
420 /// Enables retry functionality. Defaults to true. When enabled, transparent
421 /// retries will be performed as appropriate, and configurable retries are
422 /// enabled when they are configured via the service config. For details, see:
423 /// https://github.com/grpc/proposal/blob/master/A6-client-retries.md
424 /// NOTE: Hedging functionality is not yet implemented.
enable_retry(mut self, enable: bool) -> ChannelBuilder425 pub fn enable_retry(mut self, enable: bool) -> ChannelBuilder {
426 self.options.insert(
427 Cow::Borrowed(grpcio_sys::GRPC_ARG_ENABLE_RETRIES),
428 Options::Integer(enable as i32),
429 );
430 self
431 }
432
433 /// Set a raw integer configuration.
434 ///
435 /// This method is only for bench usage, users should use the encapsulated API instead.
436 #[doc(hidden)]
raw_cfg_int(mut self, key: CString, val: i32) -> ChannelBuilder437 pub fn raw_cfg_int(mut self, key: CString, val: i32) -> ChannelBuilder {
438 self.options
439 .insert(Cow::Owned(key.into_bytes_with_nul()), Options::Integer(val));
440 self
441 }
442
443 /// Set a raw string configuration.
444 ///
445 /// This method is only for bench usage, users should use the encapsulated API instead.
446 #[doc(hidden)]
raw_cfg_string(mut self, key: CString, val: CString) -> ChannelBuilder447 pub fn raw_cfg_string(mut self, key: CString, val: CString) -> ChannelBuilder {
448 self.options
449 .insert(Cow::Owned(key.into_bytes_with_nul()), Options::String(val));
450 self
451 }
452
453 /// Build `ChannelArgs` from the current configuration.
454 #[allow(clippy::useless_conversion)]
455 #[allow(clippy::cmp_owned)]
build_args(&self) -> ChannelArgs456 pub fn build_args(&self) -> ChannelArgs {
457 let args = unsafe { grpc_sys::grpcwrap_channel_args_create(self.options.len()) };
458 for (i, (k, v)) in self.options.iter().enumerate() {
459 let key = k.as_ptr() as *const c_char;
460 match *v {
461 Options::Integer(val) => unsafe {
462 // On most modern compiler and architect, c_int is the same as i32,
463 // panic directly to simplify signature.
464 assert!(
465 val <= i32::from(libc::INT_MAX) && val >= i32::from(libc::INT_MIN),
466 "{} is out of range for {:?}",
467 val,
468 CStr::from_bytes_with_nul(k).unwrap()
469 );
470 grpc_sys::grpcwrap_channel_args_set_integer(args, i, key, val as c_int)
471 },
472 Options::String(ref val) => unsafe {
473 grpc_sys::grpcwrap_channel_args_set_string(args, i, key, val.as_ptr())
474 },
475 Options::Pointer(ref quota, vtable) => unsafe {
476 grpc_sys::grpcwrap_channel_args_set_pointer_vtable(
477 args,
478 i,
479 key,
480 quota.get_ptr() as _,
481 vtable,
482 )
483 },
484 }
485 }
486 ChannelArgs { args }
487 }
488
prepare_connect_args(&mut self) -> ChannelArgs489 fn prepare_connect_args(&mut self) -> ChannelArgs {
490 if let Entry::Vacant(e) = self.options.entry(Cow::Borrowed(
491 grpcio_sys::GRPC_ARG_PRIMARY_USER_AGENT_STRING,
492 )) {
493 e.insert(Options::String(format_user_agent_string("")));
494 }
495 self.build_args()
496 }
497
498 /// Build an [`Channel`] that connects to a specific address.
connect(mut self, addr: &str) -> Channel499 pub fn connect(mut self, addr: &str) -> Channel {
500 let args = self.prepare_connect_args();
501 let addr = CString::new(addr).unwrap();
502 let addr_ptr = addr.as_ptr();
503 let mut creds = self
504 .credentials
505 .unwrap_or_else(ChannelCredentials::insecure);
506 let channel =
507 unsafe { grpcio_sys::grpc_channel_create(addr_ptr, creds.as_mut_ptr(), args.args) };
508
509 unsafe { Channel::new(self.env.pick_cq(), self.env, channel) }
510 }
511
512 /// Build an [`Channel`] taking over an established connection from
513 /// a file descriptor. The target string given is purely informative to
514 /// describe the endpoint of the connection. Takes ownership of the given
515 /// file descriptor and will close it when the connection is closed.
516 ///
517 /// This function is available on posix systems only.
518 ///
519 /// # Safety
520 ///
521 /// The file descriptor must correspond to a connected stream socket. After
522 /// this call, the socket must not be accessed (read / written / closed)
523 /// by other code.
524 #[cfg(unix)]
connect_from_fd(mut self, target: &str, fd: ::std::os::raw::c_int) -> Channel525 pub unsafe fn connect_from_fd(mut self, target: &str, fd: ::std::os::raw::c_int) -> Channel {
526 let args = self.prepare_connect_args();
527 let target = CString::new(target).unwrap();
528 let target_ptr = target.as_ptr();
529 // Actually only insecure credentials are supported currently.
530 let mut creds = self
531 .credentials
532 .unwrap_or_else(ChannelCredentials::insecure);
533 let channel =
534 grpcio_sys::grpc_channel_create_from_fd(target_ptr, fd, creds.as_mut_ptr(), args.args);
535
536 Channel::new(self.env.pick_cq(), self.env, channel)
537 }
538 }
539
540 #[cfg(feature = "_secure")]
541 mod secure_channel {
542 use std::borrow::Cow;
543 use std::ffi::CString;
544
545 use crate::ChannelCredentials;
546
547 use super::{ChannelBuilder, Options};
548
549 const OPT_SSL_TARGET_NAME_OVERRIDE: &[u8] = b"grpc.ssl_target_name_override\0";
550
551 impl ChannelBuilder {
552 /// The caller of the secure_channel_create functions may override the target name used
553 /// for SSL host name checking using this channel argument.
554 ///
555 /// This *should* be used for testing only.
556 #[doc(hidden)]
override_ssl_target<S: Into<Vec<u8>>>(mut self, target: S) -> ChannelBuilder557 pub fn override_ssl_target<S: Into<Vec<u8>>>(mut self, target: S) -> ChannelBuilder {
558 let target = CString::new(target).unwrap();
559 self.options.insert(
560 Cow::Borrowed(OPT_SSL_TARGET_NAME_OVERRIDE),
561 Options::String(target),
562 );
563 self
564 }
565
566 /// Set the credentials used to build the connection.
set_credentials(mut self, creds: ChannelCredentials) -> ChannelBuilder567 pub fn set_credentials(mut self, creds: ChannelCredentials) -> ChannelBuilder {
568 self.credentials = Some(creds);
569 self
570 }
571 }
572 }
573
574 pub struct ChannelArgs {
575 args: *mut grpc_channel_args,
576 }
577
578 impl ChannelArgs {
as_ptr(&self) -> *const grpc_channel_args579 pub fn as_ptr(&self) -> *const grpc_channel_args {
580 self.args
581 }
582 }
583
584 impl Drop for ChannelArgs {
drop(&mut self)585 fn drop(&mut self) {
586 unsafe { grpc_sys::grpcwrap_channel_args_destroy(self.args) }
587 }
588 }
589
590 struct ChannelInner {
591 _env: Arc<Environment>,
592 channel: *mut grpc_channel,
593 }
594
595 impl ChannelInner {
596 // If try_to_connect is true, the channel will try to establish a connection, potentially
597 // changing the state.
check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState598 fn check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState {
599 unsafe {
600 grpc_sys::grpc_channel_check_connectivity_state(self.channel, try_to_connect as _)
601 }
602 }
603 }
604
605 impl Drop for ChannelInner {
drop(&mut self)606 fn drop(&mut self) {
607 unsafe {
608 grpc_sys::grpc_channel_destroy(self.channel);
609 }
610 }
611 }
612
613 /// A gRPC channel.
614 ///
615 /// Channels are an abstraction of long-lived connections to remote servers. More client objects
616 /// can reuse the same channel.
617 ///
618 /// Use [`ChannelBuilder`] to build a [`Channel`].
619 #[derive(Clone)]
620 pub struct Channel {
621 inner: Arc<ChannelInner>,
622 cq: CompletionQueue,
623 }
624
625 #[allow(clippy::non_send_fields_in_send_ty)]
626 unsafe impl Send for Channel {}
627 unsafe impl Sync for Channel {}
628
629 impl Channel {
630 /// Create a new channel. Avoid using this directly and use
631 /// [`ChannelBuilder`] to build a [`Channel`] instead.
632 ///
633 /// # Safety
634 ///
635 /// The given grpc_channel must correspond to an instantiated grpc core
636 /// channel. Takes exclusive ownership of the channel and will close it after
637 /// use.
new( cq: CompletionQueue, env: Arc<Environment>, channel: *mut grpc_channel, ) -> Channel638 pub unsafe fn new(
639 cq: CompletionQueue,
640 env: Arc<Environment>,
641 channel: *mut grpc_channel,
642 ) -> Channel {
643 Channel {
644 inner: Arc::new(ChannelInner { _env: env, channel }),
645 cq,
646 }
647 }
648
649 /// Create a lame channel that will fail all its operations.
lame(env: Arc<Environment>, target: &str) -> Channel650 pub fn lame(env: Arc<Environment>, target: &str) -> Channel {
651 unsafe {
652 let target = CString::new(target).unwrap();
653 let ch = grpc_sys::grpc_lame_client_channel_create(
654 target.as_ptr(),
655 RpcStatusCode::UNAVAILABLE.into(),
656 b"call on lame client\0".as_ptr() as _,
657 );
658 Self::new(env.pick_cq(), env, ch)
659 }
660 }
661
662 /// If try_to_connect is true, the channel will try to establish a connection, potentially
663 /// changing the state.
check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState664 pub fn check_connectivity_state(&self, try_to_connect: bool) -> ConnectivityState {
665 self.inner.check_connectivity_state(try_to_connect)
666 }
667
668 /// Blocking wait for channel state change or deadline expiration.
669 ///
670 /// `check_connectivity_state` needs to be called to get the current state. Returns false
671 /// means deadline excceeds before observing any state changes.
wait_for_state_change( &self, last_observed: ConnectivityState, deadline: impl Into<Deadline>, ) -> impl Future<Output = bool>672 pub fn wait_for_state_change(
673 &self,
674 last_observed: ConnectivityState,
675 deadline: impl Into<Deadline>,
676 ) -> impl Future<Output = bool> {
677 let (cq_f, prom) = CallTag::action_pair();
678 let prom_box = Box::new(prom);
679 let tag = Box::into_raw(prom_box);
680 let should_wait = if let Ok(cq_ref) = self.cq.borrow() {
681 unsafe {
682 grpcio_sys::grpc_channel_watch_connectivity_state(
683 self.inner.channel,
684 last_observed,
685 deadline.into().spec(),
686 cq_ref.as_ptr(),
687 tag as *mut _,
688 )
689 }
690 true
691 } else {
692 // It's already shutdown.
693 false
694 };
695 async move { should_wait && cq_f.await.unwrap() }
696 }
697
698 /// Wait for this channel to be connected.
699 ///
700 /// Returns false means deadline excceeds before connection is connected.
wait_for_connected(&self, deadline: impl Into<Deadline>) -> bool701 pub async fn wait_for_connected(&self, deadline: impl Into<Deadline>) -> bool {
702 // Fast path, it's probably connected.
703 let mut state = self.check_connectivity_state(true);
704 if ConnectivityState::GRPC_CHANNEL_READY == state {
705 return true;
706 }
707 let deadline = deadline.into();
708 loop {
709 if self.wait_for_state_change(state, deadline).await {
710 state = self.check_connectivity_state(true);
711 match state {
712 ConnectivityState::GRPC_CHANNEL_READY => return true,
713 ConnectivityState::GRPC_CHANNEL_SHUTDOWN => return false,
714 _ => (),
715 }
716 continue;
717 }
718 return false;
719 }
720 }
721
722 /// Create a Kicker.
create_kicker(&self) -> Result<Kicker>723 pub(crate) fn create_kicker(&self) -> Result<Kicker> {
724 let cq_ref = self.cq.borrow()?;
725 let raw_call = unsafe {
726 let ch = self.inner.channel;
727 let cq = cq_ref.as_ptr();
728 // Do not timeout.
729 let timeout = gpr_timespec::inf_future();
730 grpc_sys::grpcwrap_channel_create_call(
731 ch,
732 ptr::null_mut(),
733 0,
734 cq,
735 ptr::null(),
736 0,
737 ptr::null(),
738 0,
739 timeout,
740 )
741 };
742 let call = unsafe { Call::from_raw(raw_call, self.cq.clone()) };
743 Ok(Kicker::from_call(call))
744 }
745
746 /// Create a call using the method and option.
create_call<Req, Resp>( &self, method: &Method<Req, Resp>, opt: &CallOption, ) -> Result<Call>747 pub(crate) fn create_call<Req, Resp>(
748 &self,
749 method: &Method<Req, Resp>,
750 opt: &CallOption,
751 ) -> Result<Call> {
752 let cq_ref = self.cq.borrow()?;
753 let raw_call = unsafe {
754 let ch = self.inner.channel;
755 let cq = cq_ref.as_ptr();
756 let method_ptr = method.name.as_ptr();
757 let method_len = method.name.len();
758 let timeout = opt
759 .get_timeout()
760 .map_or_else(gpr_timespec::inf_future, gpr_timespec::from);
761 grpc_sys::grpcwrap_channel_create_call(
762 ch,
763 ptr::null_mut(),
764 0,
765 cq,
766 method_ptr as *const _,
767 method_len,
768 ptr::null(),
769 0,
770 timeout,
771 )
772 };
773
774 unsafe { Ok(Call::from_raw(raw_call, self.cq.clone())) }
775 }
776
cq(&self) -> &CompletionQueue777 pub(crate) fn cq(&self) -> &CompletionQueue {
778 &self.cq
779 }
780 }
781
782 #[cfg(test)]
783 #[cfg(feature = "nightly")]
784 mod tests {
785 use crate::env::Environment;
786 use crate::ChannelBuilder;
787 use std::sync::Arc;
788
789 #[test]
790 #[cfg(feature = "nightly")]
test_grpc_min_message_size_to_compress()791 fn test_grpc_min_message_size_to_compress() {
792 let env = Arc::new(Environment::new(1));
793 let cb = ChannelBuilder::new(env);
794 cb.default_grpc_min_message_size_to_compress(1);
795 }
796 #[test]
797 #[cfg(feature = "nightly")]
test_gzip_compression_level()798 fn test_gzip_compression_level() {
799 let env = Arc::new(Environment::new(1));
800 let cb = ChannelBuilder::new(env);
801 cb.default_gzip_compression_level(1);
802 }
803 }
804