1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/client_channel/client_channel_filter.h"
20
21 #include <inttypes.h>
22 #include <limits.h>
23
24 #include <algorithm>
25 #include <functional>
26 #include <new>
27 #include <set>
28 #include <type_traits>
29 #include <utility>
30 #include <vector>
31
32 #include "absl/cleanup/cleanup.h"
33 #include "absl/status/status.h"
34 #include "absl/status/statusor.h"
35 #include "absl/strings/cord.h"
36 #include "absl/strings/numbers.h"
37 #include "absl/strings/str_cat.h"
38 #include "absl/strings/str_join.h"
39 #include "absl/strings/string_view.h"
40 #include "absl/types/optional.h"
41 #include "absl/types/variant.h"
42
43 #include <grpc/event_engine/event_engine.h>
44 #include <grpc/impl/channel_arg_names.h>
45 #include <grpc/slice.h>
46 #include <grpc/status.h>
47 #include <grpc/support/json.h>
48 #include <grpc/support/log.h>
49 #include <grpc/support/string_util.h>
50 #include <grpc/support/time.h>
51
52 #include "src/core/client_channel/backup_poller.h"
53 #include "src/core/client_channel/client_channel_channelz.h"
54 #include "src/core/client_channel/client_channel_internal.h"
55 #include "src/core/client_channel/client_channel_service_config.h"
56 #include "src/core/client_channel/config_selector.h"
57 #include "src/core/client_channel/dynamic_filters.h"
58 #include "src/core/client_channel/global_subchannel_pool.h"
59 #include "src/core/client_channel/local_subchannel_pool.h"
60 #include "src/core/client_channel/retry_filter.h"
61 #include "src/core/client_channel/subchannel.h"
62 #include "src/core/client_channel/subchannel_interface_internal.h"
63 #include "src/core/ext/filters/deadline/deadline_filter.h"
64 #include "src/core/lib/channel/channel_args.h"
65 #include "src/core/lib/channel/channel_stack.h"
66 #include "src/core/lib/channel/channel_trace.h"
67 #include "src/core/lib/channel/status_util.h"
68 #include "src/core/lib/config/core_configuration.h"
69 #include "src/core/lib/debug/trace.h"
70 #include "src/core/lib/experiments/experiments.h"
71 #include "src/core/lib/gpr/useful.h"
72 #include "src/core/lib/gprpp/crash.h"
73 #include "src/core/lib/gprpp/debug_location.h"
74 #include "src/core/lib/gprpp/manual_constructor.h"
75 #include "src/core/lib/gprpp/status_helper.h"
76 #include "src/core/lib/gprpp/sync.h"
77 #include "src/core/lib/gprpp/unique_type_name.h"
78 #include "src/core/lib/gprpp/work_serializer.h"
79 #include "src/core/lib/handshaker/proxy_mapper_registry.h"
80 #include "src/core/lib/iomgr/exec_ctx.h"
81 #include "src/core/lib/iomgr/polling_entity.h"
82 #include "src/core/lib/iomgr/pollset_set.h"
83 #include "src/core/lib/iomgr/resolved_address.h"
84 #include "src/core/lib/json/json.h"
85 #include "src/core/lib/promise/cancel_callback.h"
86 #include "src/core/lib/promise/context.h"
87 #include "src/core/lib/promise/latch.h"
88 #include "src/core/lib/promise/map.h"
89 #include "src/core/lib/promise/pipe.h"
90 #include "src/core/lib/promise/poll.h"
91 #include "src/core/lib/promise/promise.h"
92 #include "src/core/lib/promise/try_seq.h"
93 #include "src/core/lib/security/credentials/credentials.h"
94 #include "src/core/lib/slice/slice.h"
95 #include "src/core/lib/slice/slice_internal.h"
96 #include "src/core/lib/surface/call.h"
97 #include "src/core/lib/transport/connectivity_state.h"
98 #include "src/core/lib/transport/error_utils.h"
99 #include "src/core/lib/transport/metadata_batch.h"
100 #include "src/core/load_balancing/backend_metric_parser.h"
101 #include "src/core/load_balancing/child_policy_handler.h"
102 #include "src/core/load_balancing/lb_policy_registry.h"
103 #include "src/core/load_balancing/subchannel_interface.h"
104 #include "src/core/resolver/endpoint_addresses.h"
105 #include "src/core/resolver/resolver_registry.h"
106 #include "src/core/service_config/service_config_call_data.h"
107 #include "src/core/service_config/service_config_impl.h"
108
109 //
110 // Client channel filter
111 //
112
113 namespace grpc_core {
114
115 using internal::ClientChannelMethodParsedConfig;
116
117 TraceFlag grpc_client_channel_trace(false, "client_channel");
118 TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
119 TraceFlag grpc_client_channel_lb_call_trace(false, "client_channel_lb_call");
120
121 //
122 // ClientChannelFilter::CallData definition
123 //
124
125 class ClientChannelFilter::CallData {
126 public:
127 // Removes the call from the channel's list of calls queued
128 // for name resolution.
129 void RemoveCallFromResolverQueuedCallsLocked()
130 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
131
132 // Called by the channel for each queued call when a new resolution
133 // result becomes available.
134 virtual void RetryCheckResolutionLocked()
135 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_) = 0;
136
dynamic_filters() const137 RefCountedPtr<DynamicFilters> dynamic_filters() const {
138 return dynamic_filters_;
139 }
140
141 protected:
142 CallData() = default;
143 virtual ~CallData() = default;
144
145 // Checks whether a resolver result is available. The following
146 // outcomes are possible:
147 // - No resolver result is available yet. The call will be queued and
148 // absl::nullopt will be returned. Later, when a resolver result
149 // becomes available, RetryCheckResolutionLocked() will be called.
150 // - The resolver has returned a transient failure. If the call is
151 // not wait_for_ready, a non-OK status will be returned. (If the
152 // call *is* wait_for_ready, it will be queued instead.)
153 // - There is a valid resolver result. The service config will be
154 // stored in the call context and an OK status will be returned.
155 absl::optional<absl::Status> CheckResolution(bool was_queued);
156
157 private:
158 // Accessors for data stored in the subclass.
159 virtual ClientChannelFilter* chand() const = 0;
160 virtual Arena* arena() const = 0;
161 virtual grpc_polling_entity* pollent() = 0;
162 virtual grpc_metadata_batch* send_initial_metadata() = 0;
163 virtual grpc_call_context_element* call_context() const = 0;
164
165 // Helper function for CheckResolution(). Returns true if the call
166 // can continue (i.e., there is a valid resolution result, or there is
167 // an invalid resolution result but the call is not wait_for_ready).
168 bool CheckResolutionLocked(
169 absl::StatusOr<RefCountedPtr<ConfigSelector>>* config_selector)
170 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
171
172 // Adds the call to the channel's list of calls queued for name resolution.
173 void AddCallToResolverQueuedCallsLocked()
174 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
175
176 // Called when adding the call to the resolver queue.
OnAddToQueueLocked()177 virtual void OnAddToQueueLocked()
178 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_) {}
179
180 // Applies service config to the call. Must be invoked once we know
181 // that the resolver has returned results to the channel.
182 // If an error is returned, the error indicates the status with which
183 // the call should be failed.
184 grpc_error_handle ApplyServiceConfigToCallLocked(
185 const absl::StatusOr<RefCountedPtr<ConfigSelector>>& config_selector);
186
187 // Called to reset the deadline based on the service config obtained
188 // from the resolver.
189 virtual void ResetDeadline(Duration timeout) = 0;
190
191 RefCountedPtr<DynamicFilters> dynamic_filters_;
192 };
193
194 class ClientChannelFilter::FilterBasedCallData final
195 : public ClientChannelFilter::CallData {
196 public:
197 static grpc_error_handle Init(grpc_call_element* elem,
198 const grpc_call_element_args* args);
199 static void Destroy(grpc_call_element* elem,
200 const grpc_call_final_info* final_info,
201 grpc_closure* then_schedule_closure);
202 static void StartTransportStreamOpBatch(
203 grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
204 static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
205
206 private:
207 class ResolverQueuedCallCanceller;
208
209 FilterBasedCallData(grpc_call_element* elem,
210 const grpc_call_element_args& args);
211 ~FilterBasedCallData() override;
212
elem() const213 grpc_call_element* elem() const { return deadline_state_.elem; }
owning_call() const214 grpc_call_stack* owning_call() const { return deadline_state_.call_stack; }
call_combiner() const215 CallCombiner* call_combiner() const { return deadline_state_.call_combiner; }
216
chand() const217 ClientChannelFilter* chand() const override {
218 return static_cast<ClientChannelFilter*>(elem()->channel_data);
219 }
arena() const220 Arena* arena() const override { return deadline_state_.arena; }
pollent()221 grpc_polling_entity* pollent() override { return pollent_; }
send_initial_metadata()222 grpc_metadata_batch* send_initial_metadata() override {
223 return pending_batches_[0]
224 ->payload->send_initial_metadata.send_initial_metadata;
225 }
call_context() const226 grpc_call_context_element* call_context() const override {
227 return call_context_;
228 }
229
230 // Returns the index into pending_batches_ to be used for batch.
231 static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
232 void PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
233 static void FailPendingBatchInCallCombiner(void* arg,
234 grpc_error_handle error);
235 // A predicate type and some useful implementations for PendingBatchesFail().
236 typedef bool (*YieldCallCombinerPredicate)(
237 const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)238 static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
239 return true;
240 }
NoYieldCallCombiner(const CallCombinerClosureList &)241 static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
242 return false;
243 }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)244 static bool YieldCallCombinerIfPendingBatchesFound(
245 const CallCombinerClosureList& closures) {
246 return closures.size() > 0;
247 }
248 // Fails all pending batches.
249 // If yield_call_combiner_predicate returns true, assumes responsibility for
250 // yielding the call combiner.
251 void PendingBatchesFail(
252 grpc_error_handle error,
253 YieldCallCombinerPredicate yield_call_combiner_predicate);
254 static void ResumePendingBatchInCallCombiner(void* arg,
255 grpc_error_handle ignored);
256 // Resumes all pending batches on dynamic_call_.
257 void PendingBatchesResume();
258
259 // Called to check for a resolution result, both when the call is
260 // initially started and when it is queued and the channel gets a new
261 // resolution result.
262 void TryCheckResolution(bool was_queued);
263
264 void OnAddToQueueLocked() override
265 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
266
267 void RetryCheckResolutionLocked() override
268 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
269
ResetDeadline(Duration timeout)270 void ResetDeadline(Duration timeout) override {
271 const Timestamp per_method_deadline =
272 Timestamp::FromCycleCounterRoundUp(call_start_time_) + timeout;
273 if (per_method_deadline < deadline_) {
274 deadline_ = per_method_deadline;
275 grpc_deadline_state_reset(&deadline_state_, deadline_);
276 }
277 }
278
279 void CreateDynamicCall();
280
281 static void RecvTrailingMetadataReadyForConfigSelectorCommitCallback(
282 void* arg, grpc_error_handle error);
283
284 grpc_slice path_; // Request path.
285 grpc_call_context_element* call_context_;
286 gpr_cycle_counter call_start_time_;
287 Timestamp deadline_;
288
289 // State for handling deadlines.
290 grpc_deadline_state deadline_state_;
291
292 grpc_polling_entity* pollent_ = nullptr;
293
294 // Accessed while holding ClientChannelFilter::resolution_mu_.
295 ResolverQueuedCallCanceller* resolver_call_canceller_
296 ABSL_GUARDED_BY(&ClientChannelFilter::resolution_mu_) = nullptr;
297
298 grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
299 grpc_closure recv_trailing_metadata_ready_;
300
301 RefCountedPtr<DynamicFilters::Call> dynamic_call_;
302
303 // Batches are added to this list when received from above.
304 // They are removed when we are done handling the batch (i.e., when
305 // either we have invoked all of the batch's callbacks or we have
306 // passed the batch down to the LB call and are not intercepting any of
307 // its callbacks).
308 grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
309
310 // Set when we get a cancel_stream op.
311 grpc_error_handle cancel_error_;
312 };
313
314 class ClientChannelFilter::PromiseBasedCallData final
315 : public ClientChannelFilter::CallData {
316 public:
PromiseBasedCallData(ClientChannelFilter * chand)317 explicit PromiseBasedCallData(ClientChannelFilter* chand) : chand_(chand) {}
318
~PromiseBasedCallData()319 ~PromiseBasedCallData() override {
320 if (was_queued_ && client_initial_metadata_ != nullptr) {
321 MutexLock lock(&chand_->resolution_mu_);
322 RemoveCallFromResolverQueuedCallsLocked();
323 chand_->resolver_queued_calls_.erase(this);
324 }
325 }
326
MakeNameResolutionPromise(CallArgs call_args)327 ArenaPromise<absl::StatusOr<CallArgs>> MakeNameResolutionPromise(
328 CallArgs call_args) {
329 pollent_ = NowOrNever(call_args.polling_entity->WaitAndCopy()).value();
330 client_initial_metadata_ = std::move(call_args.client_initial_metadata);
331 // If we're still in IDLE, we need to start resolving.
332 if (GPR_UNLIKELY(chand_->CheckConnectivityState(false) ==
333 GRPC_CHANNEL_IDLE)) {
334 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
335 gpr_log(GPR_INFO, "chand=%p calld=%p: %striggering exit idle", chand_,
336 this, GetContext<Activity>()->DebugTag().c_str());
337 }
338 // Bounce into the control plane work serializer to start resolving.
339 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExitIdle");
340 chand_->work_serializer_->Run(
341 [chand = chand_]()
342 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
343 chand->CheckConnectivityState(/*try_to_connect=*/true);
344 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "ExitIdle");
345 },
346 DEBUG_LOCATION);
347 }
348 return [this, call_args = std::move(
349 call_args)]() mutable -> Poll<absl::StatusOr<CallArgs>> {
350 auto result = CheckResolution(was_queued_);
351 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
352 gpr_log(GPR_INFO, "chand=%p calld=%p: %sCheckResolution returns %s",
353 chand_, this, GetContext<Activity>()->DebugTag().c_str(),
354 result.has_value() ? result->ToString().c_str() : "Pending");
355 }
356 if (!result.has_value()) return Pending{};
357 if (!result->ok()) return *result;
358 call_args.client_initial_metadata = std::move(client_initial_metadata_);
359 return std::move(call_args);
360 };
361 }
362
363 private:
chand() const364 ClientChannelFilter* chand() const override { return chand_; }
arena() const365 Arena* arena() const override { return GetContext<Arena>(); }
pollent()366 grpc_polling_entity* pollent() override { return &pollent_; }
send_initial_metadata()367 grpc_metadata_batch* send_initial_metadata() override {
368 return client_initial_metadata_.get();
369 }
call_context() const370 grpc_call_context_element* call_context() const override {
371 return GetContext<grpc_call_context_element>();
372 }
373
OnAddToQueueLocked()374 void OnAddToQueueLocked() override
375 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_) {
376 waker_ = GetContext<Activity>()->MakeNonOwningWaker();
377 was_queued_ = true;
378 }
379
RetryCheckResolutionLocked()380 void RetryCheckResolutionLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
381 &ClientChannelFilter::resolution_mu_) override {
382 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
383 gpr_log(GPR_INFO, "chand=%p calld=%p: RetryCheckResolutionLocked(): %s",
384 chand_, this, waker_.ActivityDebugTag().c_str());
385 }
386 waker_.WakeupAsync();
387 }
388
ResetDeadline(Duration timeout)389 void ResetDeadline(Duration timeout) override {
390 CallContext* call_context = GetContext<CallContext>();
391 const Timestamp per_method_deadline =
392 Timestamp::FromCycleCounterRoundUp(call_context->call_start_time()) +
393 timeout;
394 call_context->UpdateDeadline(per_method_deadline);
395 }
396
397 ClientChannelFilter* chand_;
398 grpc_polling_entity pollent_;
399 ClientMetadataHandle client_initial_metadata_;
400 bool was_queued_ = false;
401 Waker waker_ ABSL_GUARDED_BY(&ClientChannelFilter::resolution_mu_);
402 };
403
404 //
405 // Filter vtable
406 //
407
408 const grpc_channel_filter ClientChannelFilter::kFilterVtableWithPromises = {
409 ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch,
410 ClientChannelFilter::MakeCallPromise,
411 /* init_call: */ nullptr,
412 ClientChannelFilter::StartTransportOp,
413 sizeof(ClientChannelFilter::FilterBasedCallData),
414 ClientChannelFilter::FilterBasedCallData::Init,
415 ClientChannelFilter::FilterBasedCallData::SetPollent,
416 ClientChannelFilter::FilterBasedCallData::Destroy,
417 sizeof(ClientChannelFilter),
418 ClientChannelFilter::Init,
419 grpc_channel_stack_no_post_init,
420 ClientChannelFilter::Destroy,
421 ClientChannelFilter::GetChannelInfo,
422 "client-channel",
423 };
424
425 const grpc_channel_filter ClientChannelFilter::kFilterVtableWithoutPromises = {
426 ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch,
427 nullptr,
428 /* init_call: */ nullptr,
429 ClientChannelFilter::StartTransportOp,
430 sizeof(ClientChannelFilter::FilterBasedCallData),
431 ClientChannelFilter::FilterBasedCallData::Init,
432 ClientChannelFilter::FilterBasedCallData::SetPollent,
433 ClientChannelFilter::FilterBasedCallData::Destroy,
434 sizeof(ClientChannelFilter),
435 ClientChannelFilter::Init,
436 grpc_channel_stack_no_post_init,
437 ClientChannelFilter::Destroy,
438 ClientChannelFilter::GetChannelInfo,
439 "client-channel",
440 };
441
442 //
443 // dynamic termination filter
444 //
445
446 namespace {
447
GetServiceConfigCallData(grpc_call_context_element * context)448 ClientChannelServiceConfigCallData* GetServiceConfigCallData(
449 grpc_call_context_element* context) {
450 return static_cast<ClientChannelServiceConfigCallData*>(
451 context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
452 }
453
454 class DynamicTerminationFilter final {
455 public:
456 class CallData;
457
458 static const grpc_channel_filter kFilterVtable;
459
Init(grpc_channel_element * elem,grpc_channel_element_args * args)460 static grpc_error_handle Init(grpc_channel_element* elem,
461 grpc_channel_element_args* args) {
462 GPR_ASSERT(args->is_last);
463 GPR_ASSERT(elem->filter == &kFilterVtable);
464 new (elem->channel_data) DynamicTerminationFilter(args->channel_args);
465 return absl::OkStatus();
466 }
467
Destroy(grpc_channel_element * elem)468 static void Destroy(grpc_channel_element* elem) {
469 auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
470 chand->~DynamicTerminationFilter();
471 }
472
473 // Will never be called.
StartTransportOp(grpc_channel_element *,grpc_transport_op *)474 static void StartTransportOp(grpc_channel_element* /*elem*/,
475 grpc_transport_op* /*op*/) {}
GetChannelInfo(grpc_channel_element *,const grpc_channel_info *)476 static void GetChannelInfo(grpc_channel_element* /*elem*/,
477 const grpc_channel_info* /*info*/) {}
478
MakeCallPromise(grpc_channel_element * elem,CallArgs call_args,NextPromiseFactory)479 static ArenaPromise<ServerMetadataHandle> MakeCallPromise(
480 grpc_channel_element* elem, CallArgs call_args, NextPromiseFactory) {
481 auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
482 return chand->chand_->CreateLoadBalancedCallPromise(
483 std::move(call_args),
484 []() {
485 auto* service_config_call_data =
486 GetServiceConfigCallData(GetContext<grpc_call_context_element>());
487 service_config_call_data->Commit();
488 },
489 /*is_transparent_retry=*/false);
490 }
491
492 private:
DynamicTerminationFilter(const ChannelArgs & args)493 explicit DynamicTerminationFilter(const ChannelArgs& args)
494 : chand_(args.GetObject<ClientChannelFilter>()) {}
495
496 ClientChannelFilter* chand_;
497 };
498
499 class DynamicTerminationFilter::CallData final {
500 public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)501 static grpc_error_handle Init(grpc_call_element* elem,
502 const grpc_call_element_args* args) {
503 new (elem->call_data) CallData(*args);
504 return absl::OkStatus();
505 }
506
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)507 static void Destroy(grpc_call_element* elem,
508 const grpc_call_final_info* /*final_info*/,
509 grpc_closure* then_schedule_closure) {
510 auto* calld = static_cast<CallData*>(elem->call_data);
511 RefCountedPtr<SubchannelCall> subchannel_call;
512 if (GPR_LIKELY(calld->lb_call_ != nullptr)) {
513 subchannel_call = calld->lb_call_->subchannel_call();
514 }
515 calld->~CallData();
516 if (GPR_LIKELY(subchannel_call != nullptr)) {
517 subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
518 } else {
519 // TODO(yashkt) : This can potentially be a Closure::Run
520 ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, absl::OkStatus());
521 }
522 }
523
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)524 static void StartTransportStreamOpBatch(
525 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
526 auto* calld = static_cast<CallData*>(elem->call_data);
527 calld->lb_call_->StartTransportStreamOpBatch(batch);
528 }
529
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)530 static void SetPollent(grpc_call_element* elem,
531 grpc_polling_entity* pollent) {
532 auto* calld = static_cast<CallData*>(elem->call_data);
533 auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
534 ClientChannelFilter* client_channel = chand->chand_;
535 grpc_call_element_args args = {calld->owning_call_, nullptr,
536 calld->call_context_, calld->path_,
537 /*start_time=*/0, calld->deadline_,
538 calld->arena_, calld->call_combiner_};
539 auto* service_config_call_data =
540 GetServiceConfigCallData(calld->call_context_);
541 calld->lb_call_ = client_channel->CreateLoadBalancedCall(
542 args, pollent, nullptr,
543 [service_config_call_data]() { service_config_call_data->Commit(); },
544 /*is_transparent_retry=*/false);
545 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
546 gpr_log(GPR_INFO,
547 "chand=%p dynamic_termination_calld=%p: create lb_call=%p", chand,
548 client_channel, calld->lb_call_.get());
549 }
550 }
551
552 private:
CallData(const grpc_call_element_args & args)553 explicit CallData(const grpc_call_element_args& args)
554 : path_(CSliceRef(args.path)),
555 deadline_(args.deadline),
556 arena_(args.arena),
557 owning_call_(args.call_stack),
558 call_combiner_(args.call_combiner),
559 call_context_(args.context) {}
560
~CallData()561 ~CallData() { CSliceUnref(path_); }
562
563 grpc_slice path_; // Request path.
564 Timestamp deadline_;
565 Arena* arena_;
566 grpc_call_stack* owning_call_;
567 CallCombiner* call_combiner_;
568 grpc_call_context_element* call_context_;
569
570 OrphanablePtr<ClientChannelFilter::FilterBasedLoadBalancedCall> lb_call_;
571 };
572
573 const grpc_channel_filter DynamicTerminationFilter::kFilterVtable = {
574 DynamicTerminationFilter::CallData::StartTransportStreamOpBatch,
575 DynamicTerminationFilter::MakeCallPromise,
576 /* init_call: */ nullptr,
577 DynamicTerminationFilter::StartTransportOp,
578 sizeof(DynamicTerminationFilter::CallData),
579 DynamicTerminationFilter::CallData::Init,
580 DynamicTerminationFilter::CallData::SetPollent,
581 DynamicTerminationFilter::CallData::Destroy,
582 sizeof(DynamicTerminationFilter),
583 DynamicTerminationFilter::Init,
584 grpc_channel_stack_no_post_init,
585 DynamicTerminationFilter::Destroy,
586 DynamicTerminationFilter::GetChannelInfo,
587 "dynamic_filter_termination",
588 };
589
590 } // namespace
591
592 //
593 // ClientChannelFilter::ResolverResultHandler
594 //
595
596 class ClientChannelFilter::ResolverResultHandler final
597 : public Resolver::ResultHandler {
598 public:
ResolverResultHandler(ClientChannelFilter * chand)599 explicit ResolverResultHandler(ClientChannelFilter* chand) : chand_(chand) {
600 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ResolverResultHandler");
601 }
602
~ResolverResultHandler()603 ~ResolverResultHandler() override {
604 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
605 gpr_log(GPR_INFO, "chand=%p: resolver shutdown complete", chand_);
606 }
607 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ResolverResultHandler");
608 }
609
ReportResult(Resolver::Result result)610 void ReportResult(Resolver::Result result) override
611 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
612 chand_->OnResolverResultChangedLocked(std::move(result));
613 }
614
615 private:
616 ClientChannelFilter* chand_;
617 };
618
619 //
620 // ClientChannelFilter::SubchannelWrapper
621 //
622
623 // This class is a wrapper for Subchannel that hides details of the
624 // channel's implementation (such as the connected subchannel) from the
625 // LB policy API.
626 //
627 // Note that no synchronization is needed here, because even if the
628 // underlying subchannel is shared between channels, this wrapper will only
629 // be used within one channel, so it will always be synchronized by the
630 // control plane work_serializer.
631 class ClientChannelFilter::SubchannelWrapper final
632 : public SubchannelInterface {
633 public:
SubchannelWrapper(ClientChannelFilter * chand,RefCountedPtr<Subchannel> subchannel)634 SubchannelWrapper(ClientChannelFilter* chand,
635 RefCountedPtr<Subchannel> subchannel)
636 : SubchannelInterface(GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)
637 ? "SubchannelWrapper"
638 : nullptr),
639 chand_(chand),
640 subchannel_(std::move(subchannel)) {
641 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
642 gpr_log(GPR_INFO,
643 "chand=%p: creating subchannel wrapper %p for subchannel %p",
644 chand, this, subchannel_.get());
645 }
646 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper");
647 GPR_DEBUG_ASSERT(chand_->work_serializer_->RunningInWorkSerializer());
648 if (chand_->channelz_node_ != nullptr) {
649 auto* subchannel_node = subchannel_->channelz_node();
650 if (subchannel_node != nullptr) {
651 auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
652 if (it == chand_->subchannel_refcount_map_.end()) {
653 chand_->channelz_node_->AddChildSubchannel(subchannel_node->uuid());
654 it = chand_->subchannel_refcount_map_.emplace(subchannel_.get(), 0)
655 .first;
656 }
657 ++it->second;
658 }
659 }
660 chand_->subchannel_wrappers_.insert(this);
661 }
662
~SubchannelWrapper()663 ~SubchannelWrapper() override {
664 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
665 gpr_log(GPR_INFO,
666 "chand=%p: destroying subchannel wrapper %p for subchannel %p",
667 chand_, this, subchannel_.get());
668 }
669 if (!IsWorkSerializerDispatchEnabled()) {
670 chand_->subchannel_wrappers_.erase(this);
671 if (chand_->channelz_node_ != nullptr) {
672 auto* subchannel_node = subchannel_->channelz_node();
673 if (subchannel_node != nullptr) {
674 auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
675 GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
676 --it->second;
677 if (it->second == 0) {
678 chand_->channelz_node_->RemoveChildSubchannel(
679 subchannel_node->uuid());
680 chand_->subchannel_refcount_map_.erase(it);
681 }
682 }
683 }
684 }
685 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
686 }
687
Orphaned()688 void Orphaned() override {
689 if (!IsWorkSerializerDispatchEnabled()) return;
690 // Make sure we clean up the channel's subchannel maps inside the
691 // WorkSerializer.
692 // Ref held by callback.
693 WeakRef(DEBUG_LOCATION, "subchannel map cleanup").release();
694 chand_->work_serializer_->Run(
695 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
696 chand_->subchannel_wrappers_.erase(this);
697 if (chand_->channelz_node_ != nullptr) {
698 auto* subchannel_node = subchannel_->channelz_node();
699 if (subchannel_node != nullptr) {
700 auto it =
701 chand_->subchannel_refcount_map_.find(subchannel_.get());
702 GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
703 --it->second;
704 if (it->second == 0) {
705 chand_->channelz_node_->RemoveChildSubchannel(
706 subchannel_node->uuid());
707 chand_->subchannel_refcount_map_.erase(it);
708 }
709 }
710 }
711 WeakUnref(DEBUG_LOCATION, "subchannel map cleanup");
712 },
713 DEBUG_LOCATION);
714 }
715
WatchConnectivityState(std::unique_ptr<ConnectivityStateWatcherInterface> watcher)716 void WatchConnectivityState(
717 std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override
718 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
719 auto& watcher_wrapper = watcher_map_[watcher.get()];
720 GPR_ASSERT(watcher_wrapper == nullptr);
721 watcher_wrapper = new WatcherWrapper(
722 std::move(watcher),
723 RefAsSubclass<SubchannelWrapper>(DEBUG_LOCATION, "WatcherWrapper"));
724 subchannel_->WatchConnectivityState(
725 RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
726 watcher_wrapper));
727 }
728
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)729 void CancelConnectivityStateWatch(ConnectivityStateWatcherInterface* watcher)
730 override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
731 auto it = watcher_map_.find(watcher);
732 GPR_ASSERT(it != watcher_map_.end());
733 subchannel_->CancelConnectivityStateWatch(it->second);
734 watcher_map_.erase(it);
735 }
736
connected_subchannel() const737 RefCountedPtr<ConnectedSubchannel> connected_subchannel() const {
738 return subchannel_->connected_subchannel();
739 }
740
RequestConnection()741 void RequestConnection() override { subchannel_->RequestConnection(); }
742
ResetBackoff()743 void ResetBackoff() override { subchannel_->ResetBackoff(); }
744
AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher)745 void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher) override
746 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
747 static_cast<InternalSubchannelDataWatcherInterface*>(watcher.get())
748 ->SetSubchannel(subchannel_.get());
749 GPR_ASSERT(data_watchers_.insert(std::move(watcher)).second);
750 }
751
CancelDataWatcher(DataWatcherInterface * watcher)752 void CancelDataWatcher(DataWatcherInterface* watcher) override
753 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
754 auto it = data_watchers_.find(watcher);
755 if (it != data_watchers_.end()) data_watchers_.erase(it);
756 }
757
ThrottleKeepaliveTime(int new_keepalive_time)758 void ThrottleKeepaliveTime(int new_keepalive_time) {
759 subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
760 }
761
762 private:
763 // This wrapper provides a bridge between the internal Subchannel API
764 // and the SubchannelInterface API that we expose to LB policies.
765 // It implements Subchannel::ConnectivityStateWatcherInterface and wraps
766 // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
767 // that was passed in by the LB policy. We pass an instance of this
768 // class to the underlying Subchannel, and when we get updates from
769 // the subchannel, we pass those on to the wrapped watcher to return
770 // the update to the LB policy.
771 //
772 // This class handles things like hopping into the WorkSerializer
773 // before passing notifications to the LB policy and propagating
774 // keepalive information betwen subchannels.
775 class WatcherWrapper final
776 : public Subchannel::ConnectivityStateWatcherInterface {
777 public:
WatcherWrapper(std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher,RefCountedPtr<SubchannelWrapper> parent)778 WatcherWrapper(
779 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
780 watcher,
781 RefCountedPtr<SubchannelWrapper> parent)
782 : watcher_(std::move(watcher)), parent_(std::move(parent)) {}
783
~WatcherWrapper()784 ~WatcherWrapper() override {
785 if (!IsWorkSerializerDispatchEnabled()) {
786 auto* parent = parent_.release(); // ref owned by lambda
787 parent->chand_->work_serializer_->Run(
788 [parent]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
789 *parent_->chand_->work_serializer_) {
790 parent->Unref(DEBUG_LOCATION, "WatcherWrapper");
791 },
792 DEBUG_LOCATION);
793 return;
794 }
795 parent_.reset(DEBUG_LOCATION, "WatcherWrapper");
796 }
797
OnConnectivityStateChange(RefCountedPtr<ConnectivityStateWatcherInterface> self,grpc_connectivity_state state,const absl::Status & status)798 void OnConnectivityStateChange(
799 RefCountedPtr<ConnectivityStateWatcherInterface> self,
800 grpc_connectivity_state state, const absl::Status& status) override {
801 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
802 gpr_log(GPR_INFO,
803 "chand=%p: connectivity change for subchannel wrapper %p "
804 "subchannel %p; hopping into work_serializer",
805 parent_->chand_, parent_.get(), parent_->subchannel_.get());
806 }
807 self.release(); // Held by callback.
808 parent_->chand_->work_serializer_->Run(
809 [this, state, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
810 *parent_->chand_->work_serializer_) {
811 ApplyUpdateInControlPlaneWorkSerializer(state, status);
812 Unref();
813 },
814 DEBUG_LOCATION);
815 }
816
interested_parties()817 grpc_pollset_set* interested_parties() override {
818 return watcher_->interested_parties();
819 }
820
821 private:
ApplyUpdateInControlPlaneWorkSerializer(grpc_connectivity_state state,const absl::Status & status)822 void ApplyUpdateInControlPlaneWorkSerializer(grpc_connectivity_state state,
823 const absl::Status& status)
824 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*parent_->chand_->work_serializer_) {
825 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
826 gpr_log(GPR_INFO,
827 "chand=%p: processing connectivity change in work serializer "
828 "for subchannel wrapper %p subchannel %p watcher=%p "
829 "state=%s status=%s",
830 parent_->chand_, parent_.get(), parent_->subchannel_.get(),
831 watcher_.get(), ConnectivityStateName(state),
832 status.ToString().c_str());
833 }
834 absl::optional<absl::Cord> keepalive_throttling =
835 status.GetPayload(kKeepaliveThrottlingKey);
836 if (keepalive_throttling.has_value()) {
837 int new_keepalive_time = -1;
838 if (absl::SimpleAtoi(std::string(keepalive_throttling.value()),
839 &new_keepalive_time)) {
840 if (new_keepalive_time > parent_->chand_->keepalive_time_) {
841 parent_->chand_->keepalive_time_ = new_keepalive_time;
842 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
843 gpr_log(GPR_INFO, "chand=%p: throttling keepalive time to %d",
844 parent_->chand_, parent_->chand_->keepalive_time_);
845 }
846 // Propagate the new keepalive time to all subchannels. This is so
847 // that new transports created by any subchannel (and not just the
848 // subchannel that received the GOAWAY), use the new keepalive time.
849 for (auto* subchannel_wrapper :
850 parent_->chand_->subchannel_wrappers_) {
851 subchannel_wrapper->ThrottleKeepaliveTime(new_keepalive_time);
852 }
853 }
854 } else {
855 gpr_log(GPR_ERROR, "chand=%p: Illegal keepalive throttling value %s",
856 parent_->chand_,
857 std::string(keepalive_throttling.value()).c_str());
858 }
859 }
860 // Propagate status only in state TF.
861 // We specifically want to avoid propagating the status for
862 // state IDLE that the real subchannel gave us only for the
863 // purpose of keepalive propagation.
864 watcher_->OnConnectivityStateChange(
865 state,
866 state == GRPC_CHANNEL_TRANSIENT_FAILURE ? status : absl::OkStatus());
867 }
868
869 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
870 watcher_;
871 RefCountedPtr<SubchannelWrapper> parent_;
872 };
873
874 // A heterogenous lookup comparator for data watchers that allows
875 // unique_ptr keys to be looked up as raw pointers.
876 struct DataWatcherLessThan {
877 using is_transparent = void;
operator ()grpc_core::ClientChannelFilter::SubchannelWrapper::DataWatcherLessThan878 bool operator()(const std::unique_ptr<DataWatcherInterface>& p1,
879 const std::unique_ptr<DataWatcherInterface>& p2) const {
880 return p1 < p2;
881 }
operator ()grpc_core::ClientChannelFilter::SubchannelWrapper::DataWatcherLessThan882 bool operator()(const std::unique_ptr<DataWatcherInterface>& p1,
883 const DataWatcherInterface* p2) const {
884 return p1.get() < p2;
885 }
operator ()grpc_core::ClientChannelFilter::SubchannelWrapper::DataWatcherLessThan886 bool operator()(const DataWatcherInterface* p1,
887 const std::unique_ptr<DataWatcherInterface>& p2) const {
888 return p1 < p2.get();
889 }
890 };
891
892 ClientChannelFilter* chand_;
893 RefCountedPtr<Subchannel> subchannel_;
894 // Maps from the address of the watcher passed to us by the LB policy
895 // to the address of the WrapperWatcher that we passed to the underlying
896 // subchannel. This is needed so that when the LB policy calls
897 // CancelConnectivityStateWatch() with its watcher, we know the
898 // corresponding WrapperWatcher to cancel on the underlying subchannel.
899 std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_
900 ABSL_GUARDED_BY(*chand_->work_serializer_);
901 std::set<std::unique_ptr<DataWatcherInterface>, DataWatcherLessThan>
902 data_watchers_ ABSL_GUARDED_BY(*chand_->work_serializer_);
903 };
904
905 //
906 // ClientChannelFilter::ExternalConnectivityWatcher
907 //
908
ExternalConnectivityWatcher(ClientChannelFilter * chand,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)909 ClientChannelFilter::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
910 ClientChannelFilter* chand, grpc_polling_entity pollent,
911 grpc_connectivity_state* state, grpc_closure* on_complete,
912 grpc_closure* watcher_timer_init)
913 : chand_(chand),
914 pollent_(pollent),
915 initial_state_(*state),
916 state_(state),
917 on_complete_(on_complete),
918 watcher_timer_init_(watcher_timer_init) {
919 grpc_polling_entity_add_to_pollset_set(&pollent_,
920 chand_->interested_parties_);
921 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
922 {
923 MutexLock lock(&chand_->external_watchers_mu_);
924 // Will be deleted when the watch is complete.
925 GPR_ASSERT(chand->external_watchers_[on_complete] == nullptr);
926 // Store a ref to the watcher in the external_watchers_ map.
927 chand->external_watchers_[on_complete] =
928 RefAsSubclass<ExternalConnectivityWatcher>(
929 DEBUG_LOCATION, "AddWatcherToExternalWatchersMapLocked");
930 }
931 // Pass the ref from creating the object to Start().
932 chand_->work_serializer_->Run(
933 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
934 // The ref is passed to AddWatcherLocked().
935 AddWatcherLocked();
936 },
937 DEBUG_LOCATION);
938 }
939
940 ClientChannelFilter::ExternalConnectivityWatcher::
~ExternalConnectivityWatcher()941 ~ExternalConnectivityWatcher() {
942 grpc_polling_entity_del_from_pollset_set(&pollent_,
943 chand_->interested_parties_);
944 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
945 "ExternalConnectivityWatcher");
946 }
947
948 void ClientChannelFilter::ExternalConnectivityWatcher::
RemoveWatcherFromExternalWatchersMap(ClientChannelFilter * chand,grpc_closure * on_complete,bool cancel)949 RemoveWatcherFromExternalWatchersMap(ClientChannelFilter* chand,
950 grpc_closure* on_complete,
951 bool cancel) {
952 RefCountedPtr<ExternalConnectivityWatcher> watcher;
953 {
954 MutexLock lock(&chand->external_watchers_mu_);
955 auto it = chand->external_watchers_.find(on_complete);
956 if (it != chand->external_watchers_.end()) {
957 watcher = std::move(it->second);
958 chand->external_watchers_.erase(it);
959 }
960 }
961 // watcher->Cancel() will hop into the WorkSerializer, so we have to unlock
962 // the mutex before calling it.
963 if (watcher != nullptr && cancel) watcher->Cancel();
964 }
965
Notify(grpc_connectivity_state state,const absl::Status &)966 void ClientChannelFilter::ExternalConnectivityWatcher::Notify(
967 grpc_connectivity_state state, const absl::Status& /* status */) {
968 bool done = false;
969 if (!done_.compare_exchange_strong(done, true, std::memory_order_relaxed,
970 std::memory_order_relaxed)) {
971 return; // Already done.
972 }
973 // Remove external watcher.
974 ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap(
975 chand_, on_complete_, /*cancel=*/false);
976 // Report new state to the user.
977 *state_ = state;
978 ExecCtx::Run(DEBUG_LOCATION, on_complete_, absl::OkStatus());
979 // Hop back into the work_serializer to clean up.
980 // Not needed in state SHUTDOWN, because the tracker will
981 // automatically remove all watchers in that case.
982 // Note: The callback takes a ref in case the ref inside the state tracker
983 // gets removed before the callback runs via a SHUTDOWN notification.
984 if (state != GRPC_CHANNEL_SHUTDOWN) {
985 Ref(DEBUG_LOCATION, "RemoveWatcherLocked()").release();
986 chand_->work_serializer_->Run(
987 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
988 RemoveWatcherLocked();
989 Unref(DEBUG_LOCATION, "RemoveWatcherLocked()");
990 },
991 DEBUG_LOCATION);
992 }
993 }
994
Cancel()995 void ClientChannelFilter::ExternalConnectivityWatcher::Cancel() {
996 bool done = false;
997 if (!done_.compare_exchange_strong(done, true, std::memory_order_relaxed,
998 std::memory_order_relaxed)) {
999 return; // Already done.
1000 }
1001 ExecCtx::Run(DEBUG_LOCATION, on_complete_, absl::CancelledError());
1002 // Hop back into the work_serializer to clean up.
1003 // Note: The callback takes a ref in case the ref inside the state tracker
1004 // gets removed before the callback runs via a SHUTDOWN notification.
1005 Ref(DEBUG_LOCATION, "RemoveWatcherLocked()").release();
1006 chand_->work_serializer_->Run(
1007 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1008 RemoveWatcherLocked();
1009 Unref(DEBUG_LOCATION, "RemoveWatcherLocked()");
1010 },
1011 DEBUG_LOCATION);
1012 }
1013
AddWatcherLocked()1014 void ClientChannelFilter::ExternalConnectivityWatcher::AddWatcherLocked() {
1015 Closure::Run(DEBUG_LOCATION, watcher_timer_init_, absl::OkStatus());
1016 // Add new watcher. Pass the ref of the object from creation to OrphanablePtr.
1017 chand_->state_tracker_.AddWatcher(
1018 initial_state_, OrphanablePtr<ConnectivityStateWatcherInterface>(this));
1019 }
1020
RemoveWatcherLocked()1021 void ClientChannelFilter::ExternalConnectivityWatcher::RemoveWatcherLocked() {
1022 chand_->state_tracker_.RemoveWatcher(this);
1023 }
1024
1025 //
1026 // ClientChannelFilter::ConnectivityWatcherAdder
1027 //
1028
1029 class ClientChannelFilter::ConnectivityWatcherAdder final {
1030 public:
ConnectivityWatcherAdder(ClientChannelFilter * chand,grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)1031 ConnectivityWatcherAdder(
1032 ClientChannelFilter* chand, grpc_connectivity_state initial_state,
1033 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)
1034 : chand_(chand),
1035 initial_state_(initial_state),
1036 watcher_(std::move(watcher)) {
1037 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder");
1038 chand_->work_serializer_->Run(
1039 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1040 AddWatcherLocked();
1041 },
1042 DEBUG_LOCATION);
1043 }
1044
1045 private:
AddWatcherLocked()1046 void AddWatcherLocked()
1047 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1048 chand_->state_tracker_.AddWatcher(initial_state_, std::move(watcher_));
1049 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ConnectivityWatcherAdder");
1050 delete this;
1051 }
1052
1053 ClientChannelFilter* chand_;
1054 grpc_connectivity_state initial_state_;
1055 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher_;
1056 };
1057
1058 //
1059 // ClientChannelFilter::ConnectivityWatcherRemover
1060 //
1061
1062 class ClientChannelFilter::ConnectivityWatcherRemover final {
1063 public:
ConnectivityWatcherRemover(ClientChannelFilter * chand,AsyncConnectivityStateWatcherInterface * watcher)1064 ConnectivityWatcherRemover(ClientChannelFilter* chand,
1065 AsyncConnectivityStateWatcherInterface* watcher)
1066 : chand_(chand), watcher_(watcher) {
1067 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover");
1068 chand_->work_serializer_->Run(
1069 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1070 RemoveWatcherLocked();
1071 },
1072 DEBUG_LOCATION);
1073 }
1074
1075 private:
RemoveWatcherLocked()1076 void RemoveWatcherLocked()
1077 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1078 chand_->state_tracker_.RemoveWatcher(watcher_);
1079 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1080 "ConnectivityWatcherRemover");
1081 delete this;
1082 }
1083
1084 ClientChannelFilter* chand_;
1085 AsyncConnectivityStateWatcherInterface* watcher_;
1086 };
1087
1088 //
1089 // ClientChannelFilter::ClientChannelControlHelper
1090 //
1091
1092 class ClientChannelFilter::ClientChannelControlHelper final
1093 : public LoadBalancingPolicy::ChannelControlHelper {
1094 public:
ClientChannelControlHelper(ClientChannelFilter * chand)1095 explicit ClientChannelControlHelper(ClientChannelFilter* chand)
1096 : chand_(chand) {
1097 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
1098 }
1099
~ClientChannelControlHelper()1100 ~ClientChannelControlHelper() override {
1101 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1102 "ClientChannelControlHelper");
1103 }
1104
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)1105 RefCountedPtr<SubchannelInterface> CreateSubchannel(
1106 const grpc_resolved_address& address, const ChannelArgs& per_address_args,
1107 const ChannelArgs& args) override
1108 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1109 if (chand_->resolver_ == nullptr) return nullptr; // Shutting down.
1110 ChannelArgs subchannel_args = ClientChannelFilter::MakeSubchannelArgs(
1111 args, per_address_args, chand_->subchannel_pool_,
1112 chand_->default_authority_);
1113 // Create subchannel.
1114 RefCountedPtr<Subchannel> subchannel =
1115 chand_->client_channel_factory_->CreateSubchannel(address,
1116 subchannel_args);
1117 if (subchannel == nullptr) return nullptr;
1118 // Make sure the subchannel has updated keepalive time.
1119 subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_);
1120 // Create and return wrapper for the subchannel.
1121 return MakeRefCounted<SubchannelWrapper>(chand_, std::move(subchannel));
1122 }
1123
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)1124 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
1125 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)
1126 override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1127 if (chand_->resolver_ == nullptr) return; // Shutting down.
1128 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1129 const char* extra = chand_->disconnect_error_.ok()
1130 ? ""
1131 : " (ignoring -- channel shutting down)";
1132 gpr_log(GPR_INFO, "chand=%p: update: state=%s status=(%s) picker=%p%s",
1133 chand_, ConnectivityStateName(state), status.ToString().c_str(),
1134 picker.get(), extra);
1135 }
1136 // Do update only if not shutting down.
1137 if (chand_->disconnect_error_.ok()) {
1138 chand_->UpdateStateAndPickerLocked(state, status, "helper",
1139 std::move(picker));
1140 }
1141 }
1142
RequestReresolution()1143 void RequestReresolution() override
1144 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1145 if (chand_->resolver_ == nullptr) return; // Shutting down.
1146 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1147 gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand_);
1148 }
1149 chand_->resolver_->RequestReresolutionLocked();
1150 }
1151
GetTarget()1152 absl::string_view GetTarget() override { return chand_->target_uri_; }
1153
GetAuthority()1154 absl::string_view GetAuthority() override {
1155 return chand_->default_authority_;
1156 }
1157
GetChannelCredentials()1158 RefCountedPtr<grpc_channel_credentials> GetChannelCredentials() override {
1159 return chand_->channel_args_.GetObject<grpc_channel_credentials>()
1160 ->duplicate_without_call_credentials();
1161 }
1162
GetUnsafeChannelCredentials()1163 RefCountedPtr<grpc_channel_credentials> GetUnsafeChannelCredentials()
1164 override {
1165 return chand_->channel_args_.GetObject<grpc_channel_credentials>()->Ref();
1166 }
1167
GetEventEngine()1168 grpc_event_engine::experimental::EventEngine* GetEventEngine() override {
1169 return chand_->owning_stack_->EventEngine();
1170 }
1171
GetStatsPluginGroup()1172 GlobalStatsPluginRegistry::StatsPluginGroup& GetStatsPluginGroup() override {
1173 return *chand_->owning_stack_->stats_plugin_group;
1174 }
1175
AddTraceEvent(TraceSeverity severity,absl::string_view message)1176 void AddTraceEvent(TraceSeverity severity, absl::string_view message) override
1177 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1178 if (chand_->resolver_ == nullptr) return; // Shutting down.
1179 if (chand_->channelz_node_ != nullptr) {
1180 chand_->channelz_node_->AddTraceEvent(
1181 ConvertSeverityEnum(severity),
1182 grpc_slice_from_copied_buffer(message.data(), message.size()));
1183 }
1184 }
1185
1186 private:
ConvertSeverityEnum(TraceSeverity severity)1187 static channelz::ChannelTrace::Severity ConvertSeverityEnum(
1188 TraceSeverity severity) {
1189 if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
1190 if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
1191 return channelz::ChannelTrace::Error;
1192 }
1193
1194 ClientChannelFilter* chand_;
1195 };
1196
1197 //
1198 // ClientChannelFilter implementation
1199 //
1200
Init(grpc_channel_element * elem,grpc_channel_element_args * args)1201 grpc_error_handle ClientChannelFilter::Init(grpc_channel_element* elem,
1202 grpc_channel_element_args* args) {
1203 GPR_ASSERT(args->is_last);
1204 GPR_ASSERT(elem->filter == &kFilterVtableWithPromises ||
1205 elem->filter == &kFilterVtableWithoutPromises);
1206 grpc_error_handle error;
1207 new (elem->channel_data) ClientChannelFilter(args, &error);
1208 return error;
1209 }
1210
Destroy(grpc_channel_element * elem)1211 void ClientChannelFilter::Destroy(grpc_channel_element* elem) {
1212 auto* chand = static_cast<ClientChannelFilter*>(elem->channel_data);
1213 chand->~ClientChannelFilter();
1214 }
1215
1216 namespace {
1217
GetSubchannelPool(const ChannelArgs & args)1218 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
1219 const ChannelArgs& args) {
1220 if (args.GetBool(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL).value_or(false)) {
1221 return MakeRefCounted<LocalSubchannelPool>();
1222 }
1223 return GlobalSubchannelPool::instance();
1224 }
1225
1226 } // namespace
1227
ClientChannelFilter(grpc_channel_element_args * args,grpc_error_handle * error)1228 ClientChannelFilter::ClientChannelFilter(grpc_channel_element_args* args,
1229 grpc_error_handle* error)
1230 : channel_args_(args->channel_args),
1231 deadline_checking_enabled_(
1232 channel_args_.GetBool(GRPC_ARG_ENABLE_DEADLINE_CHECKS)
1233 .value_or(!channel_args_.WantMinimalStack())),
1234 owning_stack_(args->channel_stack),
1235 client_channel_factory_(channel_args_.GetObject<ClientChannelFactory>()),
1236 channelz_node_(channel_args_.GetObject<channelz::ChannelNode>()),
1237 interested_parties_(grpc_pollset_set_create()),
1238 service_config_parser_index_(
1239 internal::ClientChannelServiceConfigParser::ParserIndex()),
1240 work_serializer_(
1241 std::make_shared<WorkSerializer>(*args->channel_stack->event_engine)),
1242 state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
1243 subchannel_pool_(GetSubchannelPool(channel_args_)) {
1244 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1245 gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
1246 this, owning_stack_);
1247 }
1248 // Start backup polling.
1249 grpc_client_channel_start_backup_polling(interested_parties_);
1250 // Check client channel factory.
1251 if (client_channel_factory_ == nullptr) {
1252 *error = GRPC_ERROR_CREATE(
1253 "Missing client channel factory in args for client channel filter");
1254 return;
1255 }
1256 // Get default service config. If none is specified via the client API,
1257 // we use an empty config.
1258 absl::optional<absl::string_view> service_config_json =
1259 channel_args_.GetString(GRPC_ARG_SERVICE_CONFIG);
1260 if (!service_config_json.has_value()) service_config_json = "{}";
1261 *error = absl::OkStatus();
1262 auto service_config =
1263 ServiceConfigImpl::Create(channel_args_, *service_config_json);
1264 if (!service_config.ok()) {
1265 *error = absl_status_to_grpc_error(service_config.status());
1266 return;
1267 }
1268 default_service_config_ = std::move(*service_config);
1269 // Get URI to resolve, using proxy mapper if needed.
1270 absl::optional<std::string> target_uri =
1271 channel_args_.GetOwnedString(GRPC_ARG_SERVER_URI);
1272 if (!target_uri.has_value()) {
1273 *error = GRPC_ERROR_CREATE(
1274 "target URI channel arg missing or wrong type in client channel "
1275 "filter");
1276 return;
1277 }
1278 target_uri_ = std::move(*target_uri);
1279 uri_to_resolve_ = CoreConfiguration::Get()
1280 .proxy_mapper_registry()
1281 .MapName(target_uri_, &channel_args_)
1282 .value_or(target_uri_);
1283 // Make sure the URI to resolve is valid, so that we know that
1284 // resolver creation will succeed later.
1285 if (!CoreConfiguration::Get().resolver_registry().IsValidTarget(
1286 uri_to_resolve_)) {
1287 *error = GRPC_ERROR_CREATE(
1288 absl::StrCat("the target uri is not valid: ", uri_to_resolve_));
1289 return;
1290 }
1291 // Strip out service config channel arg, so that it doesn't affect
1292 // subchannel uniqueness when the args flow down to that layer.
1293 channel_args_ = channel_args_.Remove(GRPC_ARG_SERVICE_CONFIG);
1294 // Set initial keepalive time.
1295 auto keepalive_arg = channel_args_.GetInt(GRPC_ARG_KEEPALIVE_TIME_MS);
1296 if (keepalive_arg.has_value()) {
1297 keepalive_time_ = Clamp(*keepalive_arg, 1, INT_MAX);
1298 } else {
1299 keepalive_time_ = -1; // unset
1300 }
1301 // Set default authority.
1302 absl::optional<std::string> default_authority =
1303 channel_args_.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY);
1304 if (!default_authority.has_value()) {
1305 default_authority_ =
1306 CoreConfiguration::Get().resolver_registry().GetDefaultAuthority(
1307 target_uri_);
1308 } else {
1309 default_authority_ = std::move(*default_authority);
1310 }
1311 // Success.
1312 *error = absl::OkStatus();
1313 }
1314
~ClientChannelFilter()1315 ClientChannelFilter::~ClientChannelFilter() {
1316 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1317 gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
1318 }
1319 DestroyResolverAndLbPolicyLocked();
1320 // Stop backup polling.
1321 grpc_client_channel_stop_backup_polling(interested_parties_);
1322 grpc_pollset_set_destroy(interested_parties_);
1323 }
1324
MakeCallPromise(grpc_channel_element * elem,CallArgs call_args,NextPromiseFactory)1325 ArenaPromise<ServerMetadataHandle> ClientChannelFilter::MakeCallPromise(
1326 grpc_channel_element* elem, CallArgs call_args, NextPromiseFactory) {
1327 auto* chand = static_cast<ClientChannelFilter*>(elem->channel_data);
1328 // TODO(roth): Is this the right lifetime story for calld?
1329 auto* calld = GetContext<Arena>()->ManagedNew<PromiseBasedCallData>(chand);
1330 return TrySeq(
1331 // Name resolution.
1332 calld->MakeNameResolutionPromise(std::move(call_args)),
1333 // Dynamic filter stack.
1334 [calld](CallArgs call_args) mutable {
1335 return calld->dynamic_filters()->channel_stack()->MakeClientCallPromise(
1336 std::move(call_args));
1337 });
1338 }
1339
1340 OrphanablePtr<ClientChannelFilter::FilterBasedLoadBalancedCall>
CreateLoadBalancedCall(const grpc_call_element_args & args,grpc_polling_entity * pollent,grpc_closure * on_call_destruction_complete,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)1341 ClientChannelFilter::CreateLoadBalancedCall(
1342 const grpc_call_element_args& args, grpc_polling_entity* pollent,
1343 grpc_closure* on_call_destruction_complete,
1344 absl::AnyInvocable<void()> on_commit, bool is_transparent_retry) {
1345 promise_detail::Context<Arena> arena_ctx(args.arena);
1346 return OrphanablePtr<FilterBasedLoadBalancedCall>(
1347 args.arena->New<FilterBasedLoadBalancedCall>(
1348 this, args, pollent, on_call_destruction_complete,
1349 std::move(on_commit), is_transparent_retry));
1350 }
1351
1352 ArenaPromise<ServerMetadataHandle>
CreateLoadBalancedCallPromise(CallArgs call_args,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)1353 ClientChannelFilter::CreateLoadBalancedCallPromise(
1354 CallArgs call_args, absl::AnyInvocable<void()> on_commit,
1355 bool is_transparent_retry) {
1356 OrphanablePtr<PromiseBasedLoadBalancedCall> lb_call(
1357 GetContext<Arena>()->New<PromiseBasedLoadBalancedCall>(
1358 this, std::move(on_commit), is_transparent_retry));
1359 auto* call_ptr = lb_call.get();
1360 return call_ptr->MakeCallPromise(std::move(call_args), std::move(lb_call));
1361 }
1362
MakeSubchannelArgs(const ChannelArgs & channel_args,const ChannelArgs & address_args,const RefCountedPtr<SubchannelPoolInterface> & subchannel_pool,const std::string & channel_default_authority)1363 ChannelArgs ClientChannelFilter::MakeSubchannelArgs(
1364 const ChannelArgs& channel_args, const ChannelArgs& address_args,
1365 const RefCountedPtr<SubchannelPoolInterface>& subchannel_pool,
1366 const std::string& channel_default_authority) {
1367 // Note that we start with the channel-level args and then apply the
1368 // per-address args, so that if a value is present in both, the one
1369 // in the channel-level args is used. This is particularly important
1370 // for the GRPC_ARG_DEFAULT_AUTHORITY arg, which we want to allow
1371 // resolvers to set on a per-address basis only if the application
1372 // did not explicitly set it at the channel level.
1373 return channel_args.UnionWith(address_args)
1374 .SetObject(subchannel_pool)
1375 // If we haven't already set the default authority arg (i.e., it
1376 // was not explicitly set by the application nor overridden by
1377 // the resolver), add it from the channel's default.
1378 .SetIfUnset(GRPC_ARG_DEFAULT_AUTHORITY, channel_default_authority)
1379 // Remove channel args that should not affect subchannel
1380 // uniqueness.
1381 .Remove(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME)
1382 .Remove(GRPC_ARG_INHIBIT_HEALTH_CHECKING)
1383 .Remove(GRPC_ARG_CHANNELZ_CHANNEL_NODE)
1384 // Remove all keys with the no-subchannel prefix.
1385 .RemoveAllKeysWithPrefix(GRPC_ARG_NO_SUBCHANNEL_PREFIX);
1386 }
1387
ReprocessQueuedResolverCalls()1388 void ClientChannelFilter::ReprocessQueuedResolverCalls() {
1389 for (CallData* calld : resolver_queued_calls_) {
1390 calld->RemoveCallFromResolverQueuedCallsLocked();
1391 calld->RetryCheckResolutionLocked();
1392 }
1393 resolver_queued_calls_.clear();
1394 }
1395
1396 namespace {
1397
ChooseLbPolicy(const Resolver::Result & resolver_result,const internal::ClientChannelGlobalParsedConfig * parsed_service_config)1398 RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(
1399 const Resolver::Result& resolver_result,
1400 const internal::ClientChannelGlobalParsedConfig* parsed_service_config) {
1401 // Prefer the LB policy config found in the service config.
1402 if (parsed_service_config->parsed_lb_config() != nullptr) {
1403 return parsed_service_config->parsed_lb_config();
1404 }
1405 // Try the deprecated LB policy name from the service config.
1406 // If not, try the setting from channel args.
1407 absl::optional<absl::string_view> policy_name;
1408 if (!parsed_service_config->parsed_deprecated_lb_policy().empty()) {
1409 policy_name = parsed_service_config->parsed_deprecated_lb_policy();
1410 } else {
1411 policy_name = resolver_result.args.GetString(GRPC_ARG_LB_POLICY_NAME);
1412 bool requires_config = false;
1413 if (policy_name.has_value() &&
1414 (!CoreConfiguration::Get()
1415 .lb_policy_registry()
1416 .LoadBalancingPolicyExists(*policy_name, &requires_config) ||
1417 requires_config)) {
1418 if (requires_config) {
1419 gpr_log(GPR_ERROR,
1420 "LB policy: %s passed through channel_args must not "
1421 "require a config. Using pick_first instead.",
1422 std::string(*policy_name).c_str());
1423 } else {
1424 gpr_log(GPR_ERROR,
1425 "LB policy: %s passed through channel_args does not exist. "
1426 "Using pick_first instead.",
1427 std::string(*policy_name).c_str());
1428 }
1429 policy_name = "pick_first";
1430 }
1431 }
1432 // Use pick_first if nothing was specified and we didn't select grpclb
1433 // above.
1434 if (!policy_name.has_value()) policy_name = "pick_first";
1435 // Now that we have the policy name, construct an empty config for it.
1436 Json config_json = Json::FromArray({Json::FromObject({
1437 {std::string(*policy_name), Json::FromObject({})},
1438 })});
1439 auto lb_policy_config =
1440 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
1441 config_json);
1442 // The policy name came from one of three places:
1443 // - The deprecated loadBalancingPolicy field in the service config,
1444 // in which case the code in ClientChannelServiceConfigParser
1445 // already verified that the policy does not require a config.
1446 // - One of the hard-coded values here, all of which are known to not
1447 // require a config.
1448 // - A channel arg, in which case we check that the specified policy exists
1449 // and accepts an empty config. If not, we revert to using pick_first
1450 // lb_policy
1451 GPR_ASSERT(lb_policy_config.ok());
1452 return std::move(*lb_policy_config);
1453 }
1454
1455 } // namespace
1456
OnResolverResultChangedLocked(Resolver::Result result)1457 void ClientChannelFilter::OnResolverResultChangedLocked(
1458 Resolver::Result result) {
1459 // Handle race conditions.
1460 if (resolver_ == nullptr) return;
1461 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1462 gpr_log(GPR_INFO, "chand=%p: got resolver result", this);
1463 }
1464 // Grab resolver result health callback.
1465 auto resolver_callback = std::move(result.result_health_callback);
1466 absl::Status resolver_result_status;
1467 // We only want to trace the address resolution in the follow cases:
1468 // (a) Address resolution resulted in service config change.
1469 // (b) Address resolution that causes number of backends to go from
1470 // zero to non-zero.
1471 // (c) Address resolution that causes number of backends to go from
1472 // non-zero to zero.
1473 // (d) Address resolution that causes a new LB policy to be created.
1474 //
1475 // We track a list of strings to eventually be concatenated and traced.
1476 std::vector<const char*> trace_strings;
1477 const bool resolution_contains_addresses =
1478 result.addresses.ok() && !result.addresses->empty();
1479 if (!resolution_contains_addresses &&
1480 previous_resolution_contained_addresses_) {
1481 trace_strings.push_back("Address list became empty");
1482 } else if (resolution_contains_addresses &&
1483 !previous_resolution_contained_addresses_) {
1484 trace_strings.push_back("Address list became non-empty");
1485 }
1486 previous_resolution_contained_addresses_ = resolution_contains_addresses;
1487 std::string service_config_error_string_storage;
1488 if (!result.service_config.ok()) {
1489 service_config_error_string_storage =
1490 result.service_config.status().ToString();
1491 trace_strings.push_back(service_config_error_string_storage.c_str());
1492 }
1493 // Choose the service config.
1494 RefCountedPtr<ServiceConfig> service_config;
1495 RefCountedPtr<ConfigSelector> config_selector;
1496 if (!result.service_config.ok()) {
1497 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1498 gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s",
1499 this, result.service_config.status().ToString().c_str());
1500 }
1501 // If the service config was invalid, then fallback to the
1502 // previously returned service config.
1503 if (saved_service_config_ != nullptr) {
1504 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1505 gpr_log(GPR_INFO,
1506 "chand=%p: resolver returned invalid service config. "
1507 "Continuing to use previous service config.",
1508 this);
1509 }
1510 service_config = saved_service_config_;
1511 config_selector = saved_config_selector_;
1512 } else {
1513 // We received a service config error and we don't have a
1514 // previous service config to fall back to. Put the channel into
1515 // TRANSIENT_FAILURE.
1516 OnResolverErrorLocked(result.service_config.status());
1517 trace_strings.push_back("no valid service config");
1518 resolver_result_status =
1519 absl::UnavailableError("no valid service config");
1520 }
1521 } else if (*result.service_config == nullptr) {
1522 // Resolver did not return any service config.
1523 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1524 gpr_log(GPR_INFO,
1525 "chand=%p: resolver returned no service config. Using default "
1526 "service config for channel.",
1527 this);
1528 }
1529 service_config = default_service_config_;
1530 } else {
1531 // Use ServiceConfig and ConfigSelector returned by resolver.
1532 service_config = std::move(*result.service_config);
1533 config_selector = result.args.GetObjectRef<ConfigSelector>();
1534 }
1535 // Note: The only case in which service_config is null here is if the resolver
1536 // returned a service config error and we don't have a previous service
1537 // config to fall back to.
1538 if (service_config != nullptr) {
1539 // Extract global config for client channel.
1540 const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
1541 static_cast<const internal::ClientChannelGlobalParsedConfig*>(
1542 service_config->GetGlobalParsedConfig(
1543 service_config_parser_index_));
1544 // Choose LB policy config.
1545 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config =
1546 ChooseLbPolicy(result, parsed_service_config);
1547 // Check if the ServiceConfig has changed.
1548 const bool service_config_changed =
1549 saved_service_config_ == nullptr ||
1550 service_config->json_string() != saved_service_config_->json_string();
1551 // Check if the ConfigSelector has changed.
1552 const bool config_selector_changed = !ConfigSelector::Equals(
1553 saved_config_selector_.get(), config_selector.get());
1554 // If either has changed, apply the global parameters now.
1555 if (service_config_changed || config_selector_changed) {
1556 // Update service config in control plane.
1557 UpdateServiceConfigInControlPlaneLocked(
1558 std::move(service_config), std::move(config_selector),
1559 std::string(lb_policy_config->name()));
1560 } else if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1561 gpr_log(GPR_INFO, "chand=%p: service config not changed", this);
1562 }
1563 // Create or update LB policy, as needed.
1564 resolver_result_status = CreateOrUpdateLbPolicyLocked(
1565 std::move(lb_policy_config),
1566 parsed_service_config->health_check_service_name(), std::move(result));
1567 if (service_config_changed || config_selector_changed) {
1568 // Start using new service config for calls.
1569 // This needs to happen after the LB policy has been updated, since
1570 // the ConfigSelector may need the LB policy to know about new
1571 // destinations before it can send RPCs to those destinations.
1572 UpdateServiceConfigInDataPlaneLocked();
1573 // TODO(ncteisen): might be worth somehow including a snippet of the
1574 // config in the trace, at the risk of bloating the trace logs.
1575 trace_strings.push_back("Service config changed");
1576 }
1577 }
1578 // Invoke resolver callback if needed.
1579 if (resolver_callback != nullptr) {
1580 resolver_callback(std::move(resolver_result_status));
1581 }
1582 // Add channel trace event.
1583 if (!trace_strings.empty()) {
1584 std::string message =
1585 absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
1586 if (channelz_node_ != nullptr) {
1587 channelz_node_->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
1588 grpc_slice_from_cpp_string(message));
1589 }
1590 }
1591 }
1592
OnResolverErrorLocked(absl::Status status)1593 void ClientChannelFilter::OnResolverErrorLocked(absl::Status status) {
1594 if (resolver_ == nullptr) return;
1595 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1596 gpr_log(GPR_INFO, "chand=%p: resolver transient failure: %s", this,
1597 status.ToString().c_str());
1598 }
1599 // If we already have an LB policy from a previous resolution
1600 // result, then we continue to let it set the connectivity state.
1601 // Otherwise, we go into TRANSIENT_FAILURE.
1602 if (lb_policy_ == nullptr) {
1603 // Update connectivity state.
1604 UpdateStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1605 "resolver failure");
1606 {
1607 MutexLock lock(&resolution_mu_);
1608 // Update resolver transient failure.
1609 resolver_transient_failure_error_ =
1610 MaybeRewriteIllegalStatusCode(status, "resolver");
1611 ReprocessQueuedResolverCalls();
1612 }
1613 }
1614 }
1615
CreateOrUpdateLbPolicyLocked(RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,const absl::optional<std::string> & health_check_service_name,Resolver::Result result)1616 absl::Status ClientChannelFilter::CreateOrUpdateLbPolicyLocked(
1617 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
1618 const absl::optional<std::string>& health_check_service_name,
1619 Resolver::Result result) {
1620 // Construct update.
1621 LoadBalancingPolicy::UpdateArgs update_args;
1622 if (!result.addresses.ok()) {
1623 update_args.addresses = result.addresses.status();
1624 } else {
1625 update_args.addresses = std::make_shared<EndpointAddressesListIterator>(
1626 std::move(*result.addresses));
1627 }
1628 update_args.config = std::move(lb_policy_config);
1629 update_args.resolution_note = std::move(result.resolution_note);
1630 // Remove the config selector from channel args so that we're not holding
1631 // unnecessary refs that cause it to be destroyed somewhere other than in the
1632 // WorkSerializer.
1633 update_args.args = result.args.Remove(GRPC_ARG_CONFIG_SELECTOR);
1634 // Add health check service name to channel args.
1635 if (health_check_service_name.has_value()) {
1636 update_args.args = update_args.args.Set(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME,
1637 *health_check_service_name);
1638 }
1639 // Create policy if needed.
1640 if (lb_policy_ == nullptr) {
1641 lb_policy_ = CreateLbPolicyLocked(update_args.args);
1642 }
1643 // Update the policy.
1644 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1645 gpr_log(GPR_INFO, "chand=%p: Updating child policy %p", this,
1646 lb_policy_.get());
1647 }
1648 return lb_policy_->UpdateLocked(std::move(update_args));
1649 }
1650
1651 // Creates a new LB policy.
CreateLbPolicyLocked(const ChannelArgs & args)1652 OrphanablePtr<LoadBalancingPolicy> ClientChannelFilter::CreateLbPolicyLocked(
1653 const ChannelArgs& args) {
1654 // The LB policy will start in state CONNECTING but will not
1655 // necessarily send us an update synchronously, so set state to
1656 // CONNECTING (in case the resolver had previously failed and put the
1657 // channel into TRANSIENT_FAILURE) and make sure we have a queueing picker.
1658 UpdateStateAndPickerLocked(
1659 GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
1660 MakeRefCounted<LoadBalancingPolicy::QueuePicker>(nullptr));
1661 // Now create the LB policy.
1662 LoadBalancingPolicy::Args lb_policy_args;
1663 lb_policy_args.work_serializer = work_serializer_;
1664 lb_policy_args.channel_control_helper =
1665 std::make_unique<ClientChannelControlHelper>(this);
1666 lb_policy_args.args = args;
1667 OrphanablePtr<LoadBalancingPolicy> lb_policy =
1668 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
1669 &grpc_client_channel_trace);
1670 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1671 gpr_log(GPR_INFO, "chand=%p: created new LB policy %p", this,
1672 lb_policy.get());
1673 }
1674 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1675 interested_parties_);
1676 return lb_policy;
1677 }
1678
UpdateServiceConfigInControlPlaneLocked(RefCountedPtr<ServiceConfig> service_config,RefCountedPtr<ConfigSelector> config_selector,std::string lb_policy_name)1679 void ClientChannelFilter::UpdateServiceConfigInControlPlaneLocked(
1680 RefCountedPtr<ServiceConfig> service_config,
1681 RefCountedPtr<ConfigSelector> config_selector, std::string lb_policy_name) {
1682 std::string service_config_json(service_config->json_string());
1683 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1684 gpr_log(GPR_INFO, "chand=%p: using service config: \"%s\"", this,
1685 service_config_json.c_str());
1686 }
1687 // Save service config.
1688 saved_service_config_ = std::move(service_config);
1689 // Swap out the data used by GetChannelInfo().
1690 {
1691 MutexLock lock(&info_mu_);
1692 info_lb_policy_name_ = std::move(lb_policy_name);
1693 info_service_config_json_ = std::move(service_config_json);
1694 }
1695 // Save config selector.
1696 saved_config_selector_ = std::move(config_selector);
1697 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1698 gpr_log(GPR_INFO, "chand=%p: using ConfigSelector %p", this,
1699 saved_config_selector_.get());
1700 }
1701 }
1702
UpdateServiceConfigInDataPlaneLocked()1703 void ClientChannelFilter::UpdateServiceConfigInDataPlaneLocked() {
1704 // Grab ref to service config.
1705 RefCountedPtr<ServiceConfig> service_config = saved_service_config_;
1706 // Grab ref to config selector. Use default if resolver didn't supply one.
1707 RefCountedPtr<ConfigSelector> config_selector = saved_config_selector_;
1708 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1709 gpr_log(GPR_INFO, "chand=%p: switching to ConfigSelector %p", this,
1710 saved_config_selector_.get());
1711 }
1712 if (config_selector == nullptr) {
1713 config_selector =
1714 MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
1715 }
1716 ChannelArgs new_args =
1717 channel_args_.SetObject(this).SetObject(service_config);
1718 bool enable_retries =
1719 !new_args.WantMinimalStack() &&
1720 new_args.GetBool(GRPC_ARG_ENABLE_RETRIES).value_or(true);
1721 // Construct dynamic filter stack.
1722 std::vector<const grpc_channel_filter*> filters =
1723 config_selector->GetFilters();
1724 if (enable_retries) {
1725 filters.push_back(&RetryFilter::kVtable);
1726 } else {
1727 filters.push_back(&DynamicTerminationFilter::kFilterVtable);
1728 }
1729 RefCountedPtr<DynamicFilters> dynamic_filters =
1730 DynamicFilters::Create(new_args, std::move(filters));
1731 GPR_ASSERT(dynamic_filters != nullptr);
1732 // Grab data plane lock to update service config.
1733 //
1734 // We defer unreffing the old values (and deallocating memory) until
1735 // after releasing the lock to keep the critical section small.
1736 {
1737 MutexLock lock(&resolution_mu_);
1738 resolver_transient_failure_error_ = absl::OkStatus();
1739 // Update service config.
1740 received_service_config_data_ = true;
1741 // Old values will be unreffed after lock is released.
1742 service_config_.swap(service_config);
1743 config_selector_.swap(config_selector);
1744 dynamic_filters_.swap(dynamic_filters);
1745 // Re-process queued calls asynchronously.
1746 ReprocessQueuedResolverCalls();
1747 }
1748 // Old values will be unreffed after lock is released when they go out
1749 // of scope.
1750 }
1751
CreateResolverLocked()1752 void ClientChannelFilter::CreateResolverLocked() {
1753 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1754 gpr_log(GPR_INFO, "chand=%p: starting name resolution for %s", this,
1755 uri_to_resolve_.c_str());
1756 }
1757 resolver_ = CoreConfiguration::Get().resolver_registry().CreateResolver(
1758 uri_to_resolve_, channel_args_, interested_parties_, work_serializer_,
1759 std::make_unique<ResolverResultHandler>(this));
1760 // Since the validity of the args was checked when the channel was created,
1761 // CreateResolver() must return a non-null result.
1762 GPR_ASSERT(resolver_ != nullptr);
1763 UpdateStateLocked(GRPC_CHANNEL_CONNECTING, absl::Status(),
1764 "started resolving");
1765 resolver_->StartLocked();
1766 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1767 gpr_log(GPR_INFO, "chand=%p: created resolver=%p", this, resolver_.get());
1768 }
1769 }
1770
DestroyResolverAndLbPolicyLocked()1771 void ClientChannelFilter::DestroyResolverAndLbPolicyLocked() {
1772 if (resolver_ != nullptr) {
1773 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1774 gpr_log(GPR_INFO, "chand=%p: shutting down resolver=%p", this,
1775 resolver_.get());
1776 }
1777 resolver_.reset();
1778 // Clear resolution state.
1779 saved_service_config_.reset();
1780 saved_config_selector_.reset();
1781 // Acquire resolution lock to update config selector and associated state.
1782 // To minimize lock contention, we wait to unref these objects until
1783 // after we release the lock.
1784 RefCountedPtr<ServiceConfig> service_config_to_unref;
1785 RefCountedPtr<ConfigSelector> config_selector_to_unref;
1786 RefCountedPtr<DynamicFilters> dynamic_filters_to_unref;
1787 {
1788 MutexLock lock(&resolution_mu_);
1789 received_service_config_data_ = false;
1790 service_config_to_unref = std::move(service_config_);
1791 config_selector_to_unref = std::move(config_selector_);
1792 dynamic_filters_to_unref = std::move(dynamic_filters_);
1793 }
1794 // Clear LB policy if set.
1795 if (lb_policy_ != nullptr) {
1796 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1797 gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", this,
1798 lb_policy_.get());
1799 }
1800 grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
1801 interested_parties_);
1802 lb_policy_.reset();
1803 }
1804 }
1805 }
1806
UpdateStateLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason)1807 void ClientChannelFilter::UpdateStateLocked(grpc_connectivity_state state,
1808 const absl::Status& status,
1809 const char* reason) {
1810 if (state != GRPC_CHANNEL_SHUTDOWN &&
1811 state_tracker_.state() == GRPC_CHANNEL_SHUTDOWN) {
1812 Crash("Illegal transition SHUTDOWN -> anything");
1813 }
1814 state_tracker_.SetState(state, status, reason);
1815 if (channelz_node_ != nullptr) {
1816 channelz_node_->SetConnectivityState(state);
1817 channelz_node_->AddTraceEvent(
1818 channelz::ChannelTrace::Severity::Info,
1819 grpc_slice_from_static_string(
1820 channelz::ChannelNode::GetChannelConnectivityStateChangeString(
1821 state)));
1822 }
1823 }
1824
UpdateStateAndPickerLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)1825 void ClientChannelFilter::UpdateStateAndPickerLocked(
1826 grpc_connectivity_state state, const absl::Status& status,
1827 const char* reason,
1828 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
1829 UpdateStateLocked(state, status, reason);
1830 // Grab the LB lock to update the picker and trigger reprocessing of the
1831 // queued picks.
1832 // Old picker will be unreffed after releasing the lock.
1833 MutexLock lock(&lb_mu_);
1834 picker_.swap(picker);
1835 // Reprocess queued picks.
1836 for (auto& call : lb_queued_calls_) {
1837 call->RemoveCallFromLbQueuedCallsLocked();
1838 call->RetryPickLocked();
1839 }
1840 lb_queued_calls_.clear();
1841 }
1842
1843 namespace {
1844
1845 // TODO(roth): Remove this in favor of the gprpp Match() function once
1846 // we can do that without breaking lock annotations.
1847 template <typename T>
HandlePickResult(LoadBalancingPolicy::PickResult * result,std::function<T (LoadBalancingPolicy::PickResult::Complete *)> complete_func,std::function<T (LoadBalancingPolicy::PickResult::Queue *)> queue_func,std::function<T (LoadBalancingPolicy::PickResult::Fail *)> fail_func,std::function<T (LoadBalancingPolicy::PickResult::Drop *)> drop_func)1848 T HandlePickResult(
1849 LoadBalancingPolicy::PickResult* result,
1850 std::function<T(LoadBalancingPolicy::PickResult::Complete*)> complete_func,
1851 std::function<T(LoadBalancingPolicy::PickResult::Queue*)> queue_func,
1852 std::function<T(LoadBalancingPolicy::PickResult::Fail*)> fail_func,
1853 std::function<T(LoadBalancingPolicy::PickResult::Drop*)> drop_func) {
1854 auto* complete_pick =
1855 absl::get_if<LoadBalancingPolicy::PickResult::Complete>(&result->result);
1856 if (complete_pick != nullptr) {
1857 return complete_func(complete_pick);
1858 }
1859 auto* queue_pick =
1860 absl::get_if<LoadBalancingPolicy::PickResult::Queue>(&result->result);
1861 if (queue_pick != nullptr) {
1862 return queue_func(queue_pick);
1863 }
1864 auto* fail_pick =
1865 absl::get_if<LoadBalancingPolicy::PickResult::Fail>(&result->result);
1866 if (fail_pick != nullptr) {
1867 return fail_func(fail_pick);
1868 }
1869 auto* drop_pick =
1870 absl::get_if<LoadBalancingPolicy::PickResult::Drop>(&result->result);
1871 GPR_ASSERT(drop_pick != nullptr);
1872 return drop_func(drop_pick);
1873 }
1874
1875 } // namespace
1876
DoPingLocked(grpc_transport_op * op)1877 grpc_error_handle ClientChannelFilter::DoPingLocked(grpc_transport_op* op) {
1878 if (state_tracker_.state() != GRPC_CHANNEL_READY) {
1879 return GRPC_ERROR_CREATE("channel not connected");
1880 }
1881 LoadBalancingPolicy::PickResult result;
1882 {
1883 MutexLock lock(&lb_mu_);
1884 result = picker_->Pick(LoadBalancingPolicy::PickArgs());
1885 }
1886 return HandlePickResult<grpc_error_handle>(
1887 &result,
1888 // Complete pick.
1889 [op](LoadBalancingPolicy::PickResult::Complete* complete_pick)
1890 ABSL_EXCLUSIVE_LOCKS_REQUIRED(
1891 *ClientChannelFilter::work_serializer_) {
1892 SubchannelWrapper* subchannel = static_cast<SubchannelWrapper*>(
1893 complete_pick->subchannel.get());
1894 RefCountedPtr<ConnectedSubchannel> connected_subchannel =
1895 subchannel->connected_subchannel();
1896 if (connected_subchannel == nullptr) {
1897 return GRPC_ERROR_CREATE("LB pick for ping not connected");
1898 }
1899 connected_subchannel->Ping(op->send_ping.on_initiate,
1900 op->send_ping.on_ack);
1901 return absl::OkStatus();
1902 },
1903 // Queue pick.
1904 [](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) {
1905 return GRPC_ERROR_CREATE("LB picker queued call");
1906 },
1907 // Fail pick.
1908 [](LoadBalancingPolicy::PickResult::Fail* fail_pick) {
1909 return absl_status_to_grpc_error(fail_pick->status);
1910 },
1911 // Drop pick.
1912 [](LoadBalancingPolicy::PickResult::Drop* drop_pick) {
1913 return absl_status_to_grpc_error(drop_pick->status);
1914 });
1915 }
1916
StartTransportOpLocked(grpc_transport_op * op)1917 void ClientChannelFilter::StartTransportOpLocked(grpc_transport_op* op) {
1918 // Connectivity watch.
1919 if (op->start_connectivity_watch != nullptr) {
1920 state_tracker_.AddWatcher(op->start_connectivity_watch_state,
1921 std::move(op->start_connectivity_watch));
1922 }
1923 if (op->stop_connectivity_watch != nullptr) {
1924 state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
1925 }
1926 // Ping.
1927 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1928 grpc_error_handle error = DoPingLocked(op);
1929 if (!error.ok()) {
1930 ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_initiate, error);
1931 ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_ack, error);
1932 }
1933 op->bind_pollset = nullptr;
1934 op->send_ping.on_initiate = nullptr;
1935 op->send_ping.on_ack = nullptr;
1936 }
1937 // Reset backoff.
1938 if (op->reset_connect_backoff) {
1939 if (lb_policy_ != nullptr) {
1940 lb_policy_->ResetBackoffLocked();
1941 }
1942 }
1943 // Disconnect or enter IDLE.
1944 if (!op->disconnect_with_error.ok()) {
1945 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_trace)) {
1946 gpr_log(GPR_INFO, "chand=%p: disconnect_with_error: %s", this,
1947 StatusToString(op->disconnect_with_error).c_str());
1948 }
1949 DestroyResolverAndLbPolicyLocked();
1950 intptr_t value;
1951 if (grpc_error_get_int(op->disconnect_with_error,
1952 StatusIntProperty::ChannelConnectivityState,
1953 &value) &&
1954 static_cast<grpc_connectivity_state>(value) == GRPC_CHANNEL_IDLE) {
1955 if (disconnect_error_.ok()) { // Ignore if we're shutting down.
1956 // Enter IDLE state.
1957 UpdateStateAndPickerLocked(GRPC_CHANNEL_IDLE, absl::Status(),
1958 "channel entering IDLE", nullptr);
1959 // TODO(roth): Do we need to check for any queued picks here, in
1960 // case there's a race condition in the client_idle filter?
1961 // And maybe also check for calls in the resolver queue?
1962 }
1963 } else {
1964 // Disconnect.
1965 GPR_ASSERT(disconnect_error_.ok());
1966 disconnect_error_ = op->disconnect_with_error;
1967 UpdateStateAndPickerLocked(
1968 GRPC_CHANNEL_SHUTDOWN, absl::Status(), "shutdown from API",
1969 MakeRefCounted<LoadBalancingPolicy::TransientFailurePicker>(
1970 grpc_error_to_absl_status(op->disconnect_with_error)));
1971 // TODO(roth): If this happens when we're still waiting for a
1972 // resolver result, we need to trigger failures for all calls in
1973 // the resolver queue here.
1974 }
1975 }
1976 GRPC_CHANNEL_STACK_UNREF(owning_stack_, "start_transport_op");
1977 ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
1978 }
1979
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)1980 void ClientChannelFilter::StartTransportOp(grpc_channel_element* elem,
1981 grpc_transport_op* op) {
1982 auto* chand = static_cast<ClientChannelFilter*>(elem->channel_data);
1983 GPR_ASSERT(op->set_accept_stream == false);
1984 // Handle bind_pollset.
1985 if (op->bind_pollset != nullptr) {
1986 grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
1987 }
1988 // Pop into control plane work_serializer for remaining ops.
1989 GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
1990 chand->work_serializer_->Run(
1991 [chand, op]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) {
1992 chand->StartTransportOpLocked(op);
1993 },
1994 DEBUG_LOCATION);
1995 }
1996
GetChannelInfo(grpc_channel_element * elem,const grpc_channel_info * info)1997 void ClientChannelFilter::GetChannelInfo(grpc_channel_element* elem,
1998 const grpc_channel_info* info) {
1999 auto* chand = static_cast<ClientChannelFilter*>(elem->channel_data);
2000 MutexLock lock(&chand->info_mu_);
2001 if (info->lb_policy_name != nullptr) {
2002 *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.c_str());
2003 }
2004 if (info->service_config_json != nullptr) {
2005 *info->service_config_json =
2006 gpr_strdup(chand->info_service_config_json_.c_str());
2007 }
2008 }
2009
TryToConnectLocked()2010 void ClientChannelFilter::TryToConnectLocked() {
2011 if (disconnect_error_.ok()) {
2012 if (lb_policy_ != nullptr) {
2013 lb_policy_->ExitIdleLocked();
2014 } else if (resolver_ == nullptr) {
2015 CreateResolverLocked();
2016 }
2017 }
2018 GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect");
2019 }
2020
CheckConnectivityState(bool try_to_connect)2021 grpc_connectivity_state ClientChannelFilter::CheckConnectivityState(
2022 bool try_to_connect) {
2023 // state_tracker_ is guarded by work_serializer_, which we're not
2024 // holding here. But the one method of state_tracker_ that *is*
2025 // thread-safe to call without external synchronization is the state()
2026 // method, so we can disable thread-safety analysis for this one read.
2027 grpc_connectivity_state out = ABSL_TS_UNCHECKED_READ(state_tracker_).state();
2028 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
2029 GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
2030 work_serializer_->Run([this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
2031 *work_serializer_) { TryToConnectLocked(); },
2032 DEBUG_LOCATION);
2033 }
2034 return out;
2035 }
2036
AddConnectivityWatcher(grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)2037 void ClientChannelFilter::AddConnectivityWatcher(
2038 grpc_connectivity_state initial_state,
2039 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) {
2040 new ConnectivityWatcherAdder(this, initial_state, std::move(watcher));
2041 }
2042
RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface * watcher)2043 void ClientChannelFilter::RemoveConnectivityWatcher(
2044 AsyncConnectivityStateWatcherInterface* watcher) {
2045 new ConnectivityWatcherRemover(this, watcher);
2046 }
2047
2048 //
2049 // CallData implementation
2050 //
2051
RemoveCallFromResolverQueuedCallsLocked()2052 void ClientChannelFilter::CallData::RemoveCallFromResolverQueuedCallsLocked() {
2053 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2054 gpr_log(GPR_INFO,
2055 "chand=%p calld=%p: removing from resolver queued picks list",
2056 chand(), this);
2057 }
2058 // Remove call's pollent from channel's interested_parties.
2059 grpc_polling_entity_del_from_pollset_set(pollent(),
2060 chand()->interested_parties_);
2061 // Note: There's no need to actually remove the call from the queue
2062 // here, because that will be done in
2063 // ResolverQueuedCallCanceller::CancelLocked() or
2064 // ClientChannelFilter::ReprocessQueuedResolverCalls().
2065 }
2066
AddCallToResolverQueuedCallsLocked()2067 void ClientChannelFilter::CallData::AddCallToResolverQueuedCallsLocked() {
2068 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2069 gpr_log(
2070 GPR_INFO,
2071 "chand=%p calld=%p: adding to resolver queued picks list; pollent=%s",
2072 chand(), this, grpc_polling_entity_string(pollent()).c_str());
2073 }
2074 // Add call's pollent to channel's interested_parties, so that I/O
2075 // can be done under the call's CQ.
2076 grpc_polling_entity_add_to_pollset_set(pollent(),
2077 chand()->interested_parties_);
2078 // Add to queue.
2079 chand()->resolver_queued_calls_.insert(this);
2080 OnAddToQueueLocked();
2081 }
2082
ApplyServiceConfigToCallLocked(const absl::StatusOr<RefCountedPtr<ConfigSelector>> & config_selector)2083 grpc_error_handle ClientChannelFilter::CallData::ApplyServiceConfigToCallLocked(
2084 const absl::StatusOr<RefCountedPtr<ConfigSelector>>& config_selector) {
2085 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2086 gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
2087 chand(), this);
2088 }
2089 if (!config_selector.ok()) return config_selector.status();
2090 // Create a ClientChannelServiceConfigCallData for the call. This stores
2091 // a ref to the ServiceConfig and caches the right set of parsed configs
2092 // to use for the call. The ClientChannelServiceConfigCallData will store
2093 // itself in the call context, so that it can be accessed by filters
2094 // below us in the stack, and it will be cleaned up when the call ends.
2095 auto* service_config_call_data =
2096 arena()->New<ClientChannelServiceConfigCallData>(arena(), call_context());
2097 // Use the ConfigSelector to determine the config for the call.
2098 absl::Status call_config_status =
2099 (*config_selector)
2100 ->GetCallConfig(
2101 {send_initial_metadata(), arena(), service_config_call_data});
2102 if (!call_config_status.ok()) {
2103 return absl_status_to_grpc_error(
2104 MaybeRewriteIllegalStatusCode(call_config_status, "ConfigSelector"));
2105 }
2106 // Apply our own method params to the call.
2107 auto* method_params = static_cast<ClientChannelMethodParsedConfig*>(
2108 service_config_call_data->GetMethodParsedConfig(
2109 chand()->service_config_parser_index_));
2110 if (method_params != nullptr) {
2111 // If the deadline from the service config is shorter than the one
2112 // from the client API, reset the deadline timer.
2113 if (chand()->deadline_checking_enabled_ &&
2114 method_params->timeout() != Duration::Zero()) {
2115 ResetDeadline(method_params->timeout());
2116 }
2117 // If the service config set wait_for_ready and the application
2118 // did not explicitly set it, use the value from the service config.
2119 auto* wait_for_ready =
2120 send_initial_metadata()->GetOrCreatePointer(WaitForReady());
2121 if (method_params->wait_for_ready().has_value() &&
2122 !wait_for_ready->explicitly_set) {
2123 wait_for_ready->value = method_params->wait_for_ready().value();
2124 }
2125 }
2126 return absl::OkStatus();
2127 }
2128
CheckResolution(bool was_queued)2129 absl::optional<absl::Status> ClientChannelFilter::CallData::CheckResolution(
2130 bool was_queued) {
2131 // Check if we have a resolver result to use.
2132 absl::StatusOr<RefCountedPtr<ConfigSelector>> config_selector;
2133 {
2134 MutexLock lock(&chand()->resolution_mu_);
2135 bool result_ready = CheckResolutionLocked(&config_selector);
2136 // If no result is available, queue the call.
2137 if (!result_ready) {
2138 AddCallToResolverQueuedCallsLocked();
2139 return absl::nullopt;
2140 }
2141 }
2142 // We have a result. Apply service config to call.
2143 grpc_error_handle error = ApplyServiceConfigToCallLocked(config_selector);
2144 // ConfigSelector must be unreffed inside the WorkSerializer.
2145 if (!IsWorkSerializerDispatchEnabled() && config_selector.ok()) {
2146 chand()->work_serializer_->Run(
2147 [config_selector = std::move(*config_selector)]() mutable {
2148 config_selector.reset();
2149 },
2150 DEBUG_LOCATION);
2151 }
2152 // Handle errors.
2153 if (!error.ok()) {
2154 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2155 gpr_log(GPR_INFO,
2156 "chand=%p calld=%p: error applying config to call: error=%s",
2157 chand(), this, StatusToString(error).c_str());
2158 }
2159 return error;
2160 }
2161 // If the call was queued, add trace annotation.
2162 if (was_queued) {
2163 auto* call_tracer = static_cast<CallTracerAnnotationInterface*>(
2164 call_context()[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
2165 if (call_tracer != nullptr) {
2166 call_tracer->RecordAnnotation("Delayed name resolution complete.");
2167 }
2168 }
2169 return absl::OkStatus();
2170 }
2171
CheckResolutionLocked(absl::StatusOr<RefCountedPtr<ConfigSelector>> * config_selector)2172 bool ClientChannelFilter::CallData::CheckResolutionLocked(
2173 absl::StatusOr<RefCountedPtr<ConfigSelector>>* config_selector) {
2174 // If we don't yet have a resolver result, we need to queue the call
2175 // until we get one.
2176 if (GPR_UNLIKELY(!chand()->received_service_config_data_)) {
2177 // If the resolver returned transient failure before returning the
2178 // first service config, fail any non-wait_for_ready calls.
2179 absl::Status resolver_error = chand()->resolver_transient_failure_error_;
2180 if (!resolver_error.ok() &&
2181 !send_initial_metadata()->GetOrCreatePointer(WaitForReady())->value) {
2182 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2183 gpr_log(GPR_INFO, "chand=%p calld=%p: resolution failed, failing call",
2184 chand(), this);
2185 }
2186 *config_selector = absl_status_to_grpc_error(resolver_error);
2187 return true;
2188 }
2189 // Either the resolver has not yet returned a result, or it has
2190 // returned transient failure but the call is wait_for_ready. In
2191 // either case, queue the call.
2192 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2193 gpr_log(GPR_INFO, "chand=%p calld=%p: no resolver result yet", chand(),
2194 this);
2195 }
2196 return false;
2197 }
2198 // Result found.
2199 *config_selector = chand()->config_selector_;
2200 dynamic_filters_ = chand()->dynamic_filters_;
2201 return true;
2202 }
2203
2204 //
2205 // FilterBasedCallData implementation
2206 //
2207
FilterBasedCallData(grpc_call_element * elem,const grpc_call_element_args & args)2208 ClientChannelFilter::FilterBasedCallData::FilterBasedCallData(
2209 grpc_call_element* elem, const grpc_call_element_args& args)
2210 : path_(CSliceRef(args.path)),
2211 call_context_(args.context),
2212 call_start_time_(args.start_time),
2213 deadline_(args.deadline),
2214 deadline_state_(
2215 elem, args,
2216 GPR_LIKELY(static_cast<ClientChannelFilter*>(elem->channel_data)
2217 ->deadline_checking_enabled_)
2218 ? args.deadline
2219 : Timestamp::InfFuture()) {
2220 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2221 gpr_log(GPR_INFO, "chand=%p calld=%p: created call", chand(), this);
2222 }
2223 }
2224
~FilterBasedCallData()2225 ClientChannelFilter::FilterBasedCallData::~FilterBasedCallData() {
2226 CSliceUnref(path_);
2227 // Make sure there are no remaining pending batches.
2228 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2229 GPR_ASSERT(pending_batches_[i] == nullptr);
2230 }
2231 }
2232
Init(grpc_call_element * elem,const grpc_call_element_args * args)2233 grpc_error_handle ClientChannelFilter::FilterBasedCallData::Init(
2234 grpc_call_element* elem, const grpc_call_element_args* args) {
2235 new (elem->call_data) FilterBasedCallData(elem, *args);
2236 return absl::OkStatus();
2237 }
2238
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)2239 void ClientChannelFilter::FilterBasedCallData::Destroy(
2240 grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
2241 grpc_closure* then_schedule_closure) {
2242 auto* calld = static_cast<FilterBasedCallData*>(elem->call_data);
2243 RefCountedPtr<DynamicFilters::Call> dynamic_call =
2244 std::move(calld->dynamic_call_);
2245 calld->~FilterBasedCallData();
2246 if (GPR_LIKELY(dynamic_call != nullptr)) {
2247 dynamic_call->SetAfterCallStackDestroy(then_schedule_closure);
2248 } else {
2249 // TODO(yashkt) : This can potentially be a Closure::Run
2250 ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, absl::OkStatus());
2251 }
2252 }
2253
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2254 void ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch(
2255 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2256 auto* calld = static_cast<FilterBasedCallData*>(elem->call_data);
2257 auto* chand = static_cast<ClientChannelFilter*>(elem->channel_data);
2258 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace) &&
2259 !GRPC_TRACE_FLAG_ENABLED(grpc_trace_channel)) {
2260 gpr_log(GPR_INFO, "chand=%p calld=%p: batch started from above: %s", chand,
2261 calld, grpc_transport_stream_op_batch_string(batch, false).c_str());
2262 }
2263 if (GPR_LIKELY(chand->deadline_checking_enabled_)) {
2264 grpc_deadline_state_client_start_transport_stream_op_batch(
2265 &calld->deadline_state_, batch);
2266 }
2267 // Intercept recv_trailing_metadata to commit the call, in case we wind up
2268 // failing the call before we get down to the retry or LB call layer.
2269 if (batch->recv_trailing_metadata) {
2270 calld->original_recv_trailing_metadata_ready_ =
2271 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
2272 GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_,
2273 RecvTrailingMetadataReadyForConfigSelectorCommitCallback,
2274 calld, nullptr);
2275 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2276 &calld->recv_trailing_metadata_ready_;
2277 }
2278 // If we already have a dynamic call, pass the batch down to it.
2279 // Note that once we have done so, we do not need to acquire the channel's
2280 // resolution mutex, which is more efficient (especially for streaming calls).
2281 if (calld->dynamic_call_ != nullptr) {
2282 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2283 gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on dynamic_call=%p",
2284 chand, calld, calld->dynamic_call_.get());
2285 }
2286 calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2287 return;
2288 }
2289 // We do not yet have a dynamic call.
2290 //
2291 // If we've previously been cancelled, immediately fail any new batches.
2292 if (GPR_UNLIKELY(!calld->cancel_error_.ok())) {
2293 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2294 gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
2295 chand, calld, StatusToString(calld->cancel_error_).c_str());
2296 }
2297 // Note: This will release the call combiner.
2298 grpc_transport_stream_op_batch_finish_with_failure(
2299 batch, calld->cancel_error_, calld->call_combiner());
2300 return;
2301 }
2302 // Handle cancellation.
2303 if (GPR_UNLIKELY(batch->cancel_stream)) {
2304 // Stash a copy of cancel_error in our call data, so that we can use
2305 // it for subsequent operations. This ensures that if the call is
2306 // cancelled before any batches are passed down (e.g., if the deadline
2307 // is in the past when the call starts), we can return the right
2308 // error to the caller when the first batch does get passed down.
2309 calld->cancel_error_ = batch->payload->cancel_stream.cancel_error;
2310 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2311 gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
2312 calld, StatusToString(calld->cancel_error_).c_str());
2313 }
2314 // Fail all pending batches.
2315 calld->PendingBatchesFail(calld->cancel_error_, NoYieldCallCombiner);
2316 // Note: This will release the call combiner.
2317 grpc_transport_stream_op_batch_finish_with_failure(
2318 batch, calld->cancel_error_, calld->call_combiner());
2319 return;
2320 }
2321 // Add the batch to the pending list.
2322 calld->PendingBatchesAdd(batch);
2323 // For batches containing a send_initial_metadata op, acquire the
2324 // channel's resolution mutex to apply the service config to the call,
2325 // after which we will create a dynamic call.
2326 if (GPR_LIKELY(batch->send_initial_metadata)) {
2327 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2328 gpr_log(GPR_INFO,
2329 "chand=%p calld=%p: grabbing resolution mutex to apply service "
2330 "config",
2331 chand, calld);
2332 }
2333 // If we're still in IDLE, we need to start resolving.
2334 if (GPR_UNLIKELY(chand->CheckConnectivityState(false) ==
2335 GRPC_CHANNEL_IDLE)) {
2336 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2337 gpr_log(GPR_INFO, "chand=%p calld=%p: triggering exit idle", chand,
2338 calld);
2339 }
2340 // Bounce into the control plane work serializer to start resolving.
2341 GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ExitIdle");
2342 chand->work_serializer_->Run(
2343 [chand]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) {
2344 chand->CheckConnectivityState(/*try_to_connect=*/true);
2345 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "ExitIdle");
2346 },
2347 DEBUG_LOCATION);
2348 }
2349 calld->TryCheckResolution(/*was_queued=*/false);
2350 } else {
2351 // For all other batches, release the call combiner.
2352 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2353 gpr_log(GPR_INFO,
2354 "chand=%p calld=%p: saved batch, yielding call combiner", chand,
2355 calld);
2356 }
2357 GRPC_CALL_COMBINER_STOP(calld->call_combiner(),
2358 "batch does not include send_initial_metadata");
2359 }
2360 }
2361
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)2362 void ClientChannelFilter::FilterBasedCallData::SetPollent(
2363 grpc_call_element* elem, grpc_polling_entity* pollent) {
2364 auto* calld = static_cast<FilterBasedCallData*>(elem->call_data);
2365 calld->pollent_ = pollent;
2366 }
2367
GetBatchIndex(grpc_transport_stream_op_batch * batch)2368 size_t ClientChannelFilter::FilterBasedCallData::GetBatchIndex(
2369 grpc_transport_stream_op_batch* batch) {
2370 // Note: It is important the send_initial_metadata be the first entry
2371 // here, since the code in CheckResolution() assumes it will be.
2372 if (batch->send_initial_metadata) return 0;
2373 if (batch->send_message) return 1;
2374 if (batch->send_trailing_metadata) return 2;
2375 if (batch->recv_initial_metadata) return 3;
2376 if (batch->recv_message) return 4;
2377 if (batch->recv_trailing_metadata) return 5;
2378 GPR_UNREACHABLE_CODE(return (size_t)-1);
2379 }
2380
2381 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)2382 void ClientChannelFilter::FilterBasedCallData::PendingBatchesAdd(
2383 grpc_transport_stream_op_batch* batch) {
2384 const size_t idx = GetBatchIndex(batch);
2385 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2386 gpr_log(GPR_INFO,
2387 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR,
2388 chand(), this, idx);
2389 }
2390 grpc_transport_stream_op_batch*& pending = pending_batches_[idx];
2391 GPR_ASSERT(pending == nullptr);
2392 pending = batch;
2393 }
2394
2395 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)2396 void ClientChannelFilter::FilterBasedCallData::FailPendingBatchInCallCombiner(
2397 void* arg, grpc_error_handle error) {
2398 grpc_transport_stream_op_batch* batch =
2399 static_cast<grpc_transport_stream_op_batch*>(arg);
2400 auto* calld =
2401 static_cast<FilterBasedCallData*>(batch->handler_private.extra_arg);
2402 // Note: This will release the call combiner.
2403 grpc_transport_stream_op_batch_finish_with_failure(batch, error,
2404 calld->call_combiner());
2405 }
2406
2407 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error_handle error,YieldCallCombinerPredicate yield_call_combiner_predicate)2408 void ClientChannelFilter::FilterBasedCallData::PendingBatchesFail(
2409 grpc_error_handle error,
2410 YieldCallCombinerPredicate yield_call_combiner_predicate) {
2411 GPR_ASSERT(!error.ok());
2412 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2413 size_t num_batches = 0;
2414 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2415 if (pending_batches_[i] != nullptr) ++num_batches;
2416 }
2417 gpr_log(GPR_INFO,
2418 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2419 chand(), this, num_batches, StatusToString(error).c_str());
2420 }
2421 CallCombinerClosureList closures;
2422 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2423 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2424 if (batch != nullptr) {
2425 batch->handler_private.extra_arg = this;
2426 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2427 FailPendingBatchInCallCombiner, batch,
2428 grpc_schedule_on_exec_ctx);
2429 closures.Add(&batch->handler_private.closure, error,
2430 "PendingBatchesFail");
2431 batch = nullptr;
2432 }
2433 }
2434 if (yield_call_combiner_predicate(closures)) {
2435 closures.RunClosures(call_combiner());
2436 } else {
2437 closures.RunClosuresWithoutYielding(call_combiner());
2438 }
2439 }
2440
2441 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error_handle)2442 void ClientChannelFilter::FilterBasedCallData::ResumePendingBatchInCallCombiner(
2443 void* arg, grpc_error_handle /*ignored*/) {
2444 grpc_transport_stream_op_batch* batch =
2445 static_cast<grpc_transport_stream_op_batch*>(arg);
2446 auto* calld =
2447 static_cast<FilterBasedCallData*>(batch->handler_private.extra_arg);
2448 // Note: This will release the call combiner.
2449 calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2450 }
2451
2452 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()2453 void ClientChannelFilter::FilterBasedCallData::PendingBatchesResume() {
2454 // Retries not enabled; send down batches as-is.
2455 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2456 size_t num_batches = 0;
2457 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2458 if (pending_batches_[i] != nullptr) ++num_batches;
2459 }
2460 gpr_log(GPR_INFO,
2461 "chand=%p calld=%p: starting %" PRIuPTR
2462 " pending batches on dynamic_call=%p",
2463 chand(), this, num_batches, dynamic_call_.get());
2464 }
2465 CallCombinerClosureList closures;
2466 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2467 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2468 if (batch != nullptr) {
2469 batch->handler_private.extra_arg = this;
2470 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2471 ResumePendingBatchInCallCombiner, batch, nullptr);
2472 closures.Add(&batch->handler_private.closure, absl::OkStatus(),
2473 "resuming pending batch from client channel call");
2474 batch = nullptr;
2475 }
2476 }
2477 // Note: This will release the call combiner.
2478 closures.RunClosures(call_combiner());
2479 }
2480
2481 // A class to handle the call combiner cancellation callback for a
2482 // queued pick.
2483 class ClientChannelFilter::FilterBasedCallData::ResolverQueuedCallCanceller
2484 final {
2485 public:
ResolverQueuedCallCanceller(FilterBasedCallData * calld)2486 explicit ResolverQueuedCallCanceller(FilterBasedCallData* calld)
2487 : calld_(calld) {
2488 GRPC_CALL_STACK_REF(calld->owning_call(), "ResolverQueuedCallCanceller");
2489 GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
2490 grpc_schedule_on_exec_ctx);
2491 calld->call_combiner()->SetNotifyOnCancel(&closure_);
2492 }
2493
2494 private:
CancelLocked(void * arg,grpc_error_handle error)2495 static void CancelLocked(void* arg, grpc_error_handle error) {
2496 auto* self = static_cast<ResolverQueuedCallCanceller*>(arg);
2497 auto* calld = self->calld_;
2498 auto* chand = calld->chand();
2499 {
2500 MutexLock lock(&chand->resolution_mu_);
2501 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2502 gpr_log(GPR_INFO,
2503 "chand=%p calld=%p: cancelling resolver queued pick: "
2504 "error=%s self=%p calld->resolver_pick_canceller=%p",
2505 chand, calld, StatusToString(error).c_str(), self,
2506 calld->resolver_call_canceller_);
2507 }
2508 if (calld->resolver_call_canceller_ == self && !error.ok()) {
2509 // Remove pick from list of queued picks.
2510 calld->RemoveCallFromResolverQueuedCallsLocked();
2511 chand->resolver_queued_calls_.erase(calld);
2512 // Fail pending batches on the call.
2513 calld->PendingBatchesFail(error,
2514 YieldCallCombinerIfPendingBatchesFound);
2515 }
2516 }
2517 GRPC_CALL_STACK_UNREF(calld->owning_call(), "ResolvingQueuedCallCanceller");
2518 delete self;
2519 }
2520
2521 FilterBasedCallData* calld_;
2522 grpc_closure closure_;
2523 };
2524
TryCheckResolution(bool was_queued)2525 void ClientChannelFilter::FilterBasedCallData::TryCheckResolution(
2526 bool was_queued) {
2527 auto result = CheckResolution(was_queued);
2528 if (result.has_value()) {
2529 if (!result->ok()) {
2530 PendingBatchesFail(*result, YieldCallCombiner);
2531 return;
2532 }
2533 CreateDynamicCall();
2534 }
2535 }
2536
OnAddToQueueLocked()2537 void ClientChannelFilter::FilterBasedCallData::OnAddToQueueLocked() {
2538 // Register call combiner cancellation callback.
2539 resolver_call_canceller_ = new ResolverQueuedCallCanceller(this);
2540 }
2541
RetryCheckResolutionLocked()2542 void ClientChannelFilter::FilterBasedCallData::RetryCheckResolutionLocked() {
2543 // Lame the call combiner canceller.
2544 resolver_call_canceller_ = nullptr;
2545 // Do an async callback to resume call processing, so that we're not
2546 // doing it while holding the channel's resolution mutex.
2547 chand()->owning_stack_->EventEngine()->Run([this]() {
2548 ApplicationCallbackExecCtx application_exec_ctx;
2549 ExecCtx exec_ctx;
2550 TryCheckResolution(/*was_queued=*/true);
2551 });
2552 }
2553
CreateDynamicCall()2554 void ClientChannelFilter::FilterBasedCallData::CreateDynamicCall() {
2555 DynamicFilters::Call::Args args = {dynamic_filters(), pollent_, path_,
2556 call_start_time_, deadline_, arena(),
2557 call_context_, call_combiner()};
2558 grpc_error_handle error;
2559 DynamicFilters* channel_stack = args.channel_stack.get();
2560 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2561 gpr_log(
2562 GPR_INFO,
2563 "chand=%p calld=%p: creating dynamic call stack on channel_stack=%p",
2564 chand(), this, channel_stack);
2565 }
2566 dynamic_call_ = channel_stack->CreateCall(std::move(args), &error);
2567 if (!error.ok()) {
2568 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2569 gpr_log(GPR_INFO,
2570 "chand=%p calld=%p: failed to create dynamic call: error=%s",
2571 chand(), this, StatusToString(error).c_str());
2572 }
2573 PendingBatchesFail(error, YieldCallCombiner);
2574 return;
2575 }
2576 PendingBatchesResume();
2577 }
2578
2579 void ClientChannelFilter::FilterBasedCallData::
RecvTrailingMetadataReadyForConfigSelectorCommitCallback(void * arg,grpc_error_handle error)2580 RecvTrailingMetadataReadyForConfigSelectorCommitCallback(
2581 void* arg, grpc_error_handle error) {
2582 auto* calld = static_cast<FilterBasedCallData*>(arg);
2583 auto* chand = calld->chand();
2584 auto* service_config_call_data =
2585 GetServiceConfigCallData(calld->call_context());
2586 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2587 gpr_log(GPR_INFO,
2588 "chand=%p calld=%p: got recv_trailing_metadata_ready: error=%s "
2589 "service_config_call_data=%p",
2590 chand, calld, StatusToString(error).c_str(),
2591 service_config_call_data);
2592 }
2593 if (service_config_call_data != nullptr) {
2594 service_config_call_data->Commit();
2595 }
2596 // Chain to original callback.
2597 Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_,
2598 error);
2599 }
2600
2601 //
2602 // ClientChannelFilter::LoadBalancedCall::LbCallState
2603 //
2604
2605 class ClientChannelFilter::LoadBalancedCall::LbCallState final
2606 : public ClientChannelLbCallState {
2607 public:
LbCallState(LoadBalancedCall * lb_call)2608 explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {}
2609
Alloc(size_t size)2610 void* Alloc(size_t size) override { return lb_call_->arena()->Alloc(size); }
2611
2612 // Internal API to allow first-party LB policies to access per-call
2613 // attributes set by the ConfigSelector.
2614 ServiceConfigCallData::CallAttributeInterface* GetCallAttribute(
2615 UniqueTypeName type) const override;
2616
2617 ClientCallTracer::CallAttemptTracer* GetCallAttemptTracer() const override;
2618
2619 private:
2620 LoadBalancedCall* lb_call_;
2621 };
2622
2623 //
2624 // ClientChannelFilter::LoadBalancedCall::Metadata
2625 //
2626
2627 class ClientChannelFilter::LoadBalancedCall::Metadata final
2628 : public LoadBalancingPolicy::MetadataInterface {
2629 public:
Metadata(grpc_metadata_batch * batch)2630 explicit Metadata(grpc_metadata_batch* batch) : batch_(batch) {}
2631
Add(absl::string_view key,absl::string_view value)2632 void Add(absl::string_view key, absl::string_view value) override {
2633 if (batch_ == nullptr) return;
2634 // Gross, egregious hack to support legacy grpclb behavior.
2635 // TODO(ctiller): Use a promise context for this once that plumbing is done.
2636 if (key == GrpcLbClientStatsMetadata::key()) {
2637 batch_->Set(
2638 GrpcLbClientStatsMetadata(),
2639 const_cast<GrpcLbClientStats*>(
2640 reinterpret_cast<const GrpcLbClientStats*>(value.data())));
2641 return;
2642 }
2643 batch_->Append(key, Slice::FromStaticString(value),
2644 [key](absl::string_view error, const Slice& value) {
2645 gpr_log(GPR_ERROR, "%s",
2646 absl::StrCat(error, " key:", key,
2647 " value:", value.as_string_view())
2648 .c_str());
2649 });
2650 }
2651
TestOnlyCopyToVector()2652 std::vector<std::pair<std::string, std::string>> TestOnlyCopyToVector()
2653 override {
2654 if (batch_ == nullptr) return {};
2655 Encoder encoder;
2656 batch_->Encode(&encoder);
2657 return encoder.Take();
2658 }
2659
Lookup(absl::string_view key,std::string * buffer) const2660 absl::optional<absl::string_view> Lookup(absl::string_view key,
2661 std::string* buffer) const override {
2662 if (batch_ == nullptr) return absl::nullopt;
2663 return batch_->GetStringValue(key, buffer);
2664 }
2665
2666 private:
2667 class Encoder final {
2668 public:
Encode(const Slice & key,const Slice & value)2669 void Encode(const Slice& key, const Slice& value) {
2670 out_.emplace_back(std::string(key.as_string_view()),
2671 std::string(value.as_string_view()));
2672 }
2673
2674 template <class Which>
Encode(Which,const typename Which::ValueType & value)2675 void Encode(Which, const typename Which::ValueType& value) {
2676 auto value_slice = Which::Encode(value);
2677 out_.emplace_back(std::string(Which::key()),
2678 std::string(value_slice.as_string_view()));
2679 }
2680
Encode(GrpcTimeoutMetadata,const typename GrpcTimeoutMetadata::ValueType &)2681 void Encode(GrpcTimeoutMetadata,
2682 const typename GrpcTimeoutMetadata::ValueType&) {}
Encode(HttpPathMetadata,const Slice &)2683 void Encode(HttpPathMetadata, const Slice&) {}
Encode(HttpMethodMetadata,const typename HttpMethodMetadata::ValueType &)2684 void Encode(HttpMethodMetadata,
2685 const typename HttpMethodMetadata::ValueType&) {}
2686
Take()2687 std::vector<std::pair<std::string, std::string>> Take() {
2688 return std::move(out_);
2689 }
2690
2691 private:
2692 std::vector<std::pair<std::string, std::string>> out_;
2693 };
2694
2695 grpc_metadata_batch* batch_;
2696 };
2697
2698 //
2699 // ClientChannelFilter::LoadBalancedCall::LbCallState
2700 //
2701
2702 ServiceConfigCallData::CallAttributeInterface*
GetCallAttribute(UniqueTypeName type) const2703 ClientChannelFilter::LoadBalancedCall::LbCallState::GetCallAttribute(
2704 UniqueTypeName type) const {
2705 auto* service_config_call_data =
2706 GetServiceConfigCallData(lb_call_->call_context_);
2707 return service_config_call_data->GetCallAttribute(type);
2708 }
2709
2710 ClientCallTracer::CallAttemptTracer*
GetCallAttemptTracer() const2711 ClientChannelFilter::LoadBalancedCall::LbCallState::GetCallAttemptTracer()
2712 const {
2713 return lb_call_->call_attempt_tracer();
2714 }
2715
2716 //
2717 // ClientChannelFilter::LoadBalancedCall::BackendMetricAccessor
2718 //
2719
2720 class ClientChannelFilter::LoadBalancedCall::BackendMetricAccessor final
2721 : public LoadBalancingPolicy::BackendMetricAccessor {
2722 public:
BackendMetricAccessor(LoadBalancedCall * lb_call,grpc_metadata_batch * recv_trailing_metadata)2723 BackendMetricAccessor(LoadBalancedCall* lb_call,
2724 grpc_metadata_batch* recv_trailing_metadata)
2725 : lb_call_(lb_call), recv_trailing_metadata_(recv_trailing_metadata) {}
2726
GetBackendMetricData()2727 const BackendMetricData* GetBackendMetricData() override {
2728 if (lb_call_->backend_metric_data_ == nullptr &&
2729 recv_trailing_metadata_ != nullptr) {
2730 if (const auto* md = recv_trailing_metadata_->get_pointer(
2731 EndpointLoadMetricsBinMetadata())) {
2732 BackendMetricAllocator allocator(lb_call_->arena());
2733 lb_call_->backend_metric_data_ =
2734 ParseBackendMetricData(md->as_string_view(), &allocator);
2735 }
2736 }
2737 return lb_call_->backend_metric_data_;
2738 }
2739
2740 private:
2741 class BackendMetricAllocator final : public BackendMetricAllocatorInterface {
2742 public:
BackendMetricAllocator(Arena * arena)2743 explicit BackendMetricAllocator(Arena* arena) : arena_(arena) {}
2744
AllocateBackendMetricData()2745 BackendMetricData* AllocateBackendMetricData() override {
2746 return arena_->New<BackendMetricData>();
2747 }
2748
AllocateString(size_t size)2749 char* AllocateString(size_t size) override {
2750 return static_cast<char*>(arena_->Alloc(size));
2751 }
2752
2753 private:
2754 Arena* arena_;
2755 };
2756
2757 LoadBalancedCall* lb_call_;
2758 grpc_metadata_batch* recv_trailing_metadata_;
2759 };
2760
2761 //
2762 // ClientChannelFilter::LoadBalancedCall
2763 //
2764
2765 namespace {
2766
CreateCallAttemptTracer(grpc_call_context_element * context,bool is_transparent_retry)2767 void CreateCallAttemptTracer(grpc_call_context_element* context,
2768 bool is_transparent_retry) {
2769 auto* call_tracer = static_cast<ClientCallTracer*>(
2770 context[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE].value);
2771 if (call_tracer == nullptr) return;
2772 auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry);
2773 context[GRPC_CONTEXT_CALL_TRACER].value = tracer;
2774 }
2775
2776 } // namespace
2777
LoadBalancedCall(ClientChannelFilter * chand,grpc_call_context_element * call_context,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)2778 ClientChannelFilter::LoadBalancedCall::LoadBalancedCall(
2779 ClientChannelFilter* chand, grpc_call_context_element* call_context,
2780 absl::AnyInvocable<void()> on_commit, bool is_transparent_retry)
2781 : InternallyRefCounted(
2782 GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)
2783 ? "LoadBalancedCall"
2784 : nullptr),
2785 chand_(chand),
2786 on_commit_(std::move(on_commit)),
2787 call_context_(call_context) {
2788 CreateCallAttemptTracer(call_context, is_transparent_retry);
2789 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2790 gpr_log(GPR_INFO, "chand=%p lb_call=%p: created", chand_, this);
2791 }
2792 }
2793
~LoadBalancedCall()2794 ClientChannelFilter::LoadBalancedCall::~LoadBalancedCall() {
2795 if (backend_metric_data_ != nullptr) {
2796 backend_metric_data_->BackendMetricData::~BackendMetricData();
2797 }
2798 }
2799
RecordCallCompletion(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,grpc_transport_stream_stats * transport_stream_stats,absl::string_view peer_address)2800 void ClientChannelFilter::LoadBalancedCall::RecordCallCompletion(
2801 absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
2802 grpc_transport_stream_stats* transport_stream_stats,
2803 absl::string_view peer_address) {
2804 // If we have a tracer, notify it.
2805 if (call_attempt_tracer() != nullptr) {
2806 call_attempt_tracer()->RecordReceivedTrailingMetadata(
2807 status, recv_trailing_metadata, transport_stream_stats);
2808 }
2809 // If the LB policy requested a callback for trailing metadata, invoke
2810 // the callback.
2811 if (lb_subchannel_call_tracker_ != nullptr) {
2812 Metadata trailing_metadata(recv_trailing_metadata);
2813 BackendMetricAccessor backend_metric_accessor(this, recv_trailing_metadata);
2814 LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
2815 peer_address, status, &trailing_metadata, &backend_metric_accessor};
2816 lb_subchannel_call_tracker_->Finish(args);
2817 lb_subchannel_call_tracker_.reset();
2818 }
2819 }
2820
RecordLatency()2821 void ClientChannelFilter::LoadBalancedCall::RecordLatency() {
2822 // Compute latency and report it to the tracer.
2823 if (call_attempt_tracer() != nullptr) {
2824 gpr_timespec latency =
2825 gpr_cycle_counter_sub(gpr_get_cycle_counter(), lb_call_start_time_);
2826 call_attempt_tracer()->RecordEnd(latency);
2827 }
2828 }
2829
2830 void ClientChannelFilter::LoadBalancedCall::
RemoveCallFromLbQueuedCallsLocked()2831 RemoveCallFromLbQueuedCallsLocked() {
2832 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2833 gpr_log(GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list",
2834 chand_, this);
2835 }
2836 // Remove pollset_set linkage.
2837 grpc_polling_entity_del_from_pollset_set(pollent(),
2838 chand_->interested_parties_);
2839 // Note: There's no need to actually remove the call from the queue
2840 // here, beacuse that will be done in either
2841 // LbQueuedCallCanceller::CancelLocked() or
2842 // in ClientChannelFilter::UpdateStateAndPickerLocked().
2843 }
2844
AddCallToLbQueuedCallsLocked()2845 void ClientChannelFilter::LoadBalancedCall::AddCallToLbQueuedCallsLocked() {
2846 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2847 gpr_log(GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list",
2848 chand_, this);
2849 }
2850 // Add call's pollent to channel's interested_parties, so that I/O
2851 // can be done under the call's CQ.
2852 grpc_polling_entity_add_to_pollset_set(pollent(),
2853 chand_->interested_parties_);
2854 // Add to queue.
2855 chand_->lb_queued_calls_.insert(Ref());
2856 OnAddToQueueLocked();
2857 }
2858
2859 absl::optional<absl::Status>
PickSubchannel(bool was_queued)2860 ClientChannelFilter::LoadBalancedCall::PickSubchannel(bool was_queued) {
2861 // We may accumulate multiple pickers here, because if a picker says
2862 // to queue the call, we check again to see if the picker has been
2863 // updated before we queue it.
2864 // We need to unref pickers in the WorkSerializer.
2865 std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers;
2866 auto cleanup = absl::MakeCleanup([&]() {
2867 if (IsWorkSerializerDispatchEnabled()) return;
2868 chand_->work_serializer_->Run(
2869 [pickers = std::move(pickers)]() mutable {
2870 for (auto& picker : pickers) {
2871 picker.reset(DEBUG_LOCATION, "PickSubchannel");
2872 }
2873 },
2874 DEBUG_LOCATION);
2875 });
2876 absl::AnyInvocable<void(RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>)>
2877 set_picker;
2878 if (!IsWorkSerializerDispatchEnabled()) {
2879 set_picker =
2880 [&](RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
2881 pickers.emplace_back(std::move(picker));
2882 };
2883 } else {
2884 pickers.emplace_back();
2885 set_picker =
2886 [&](RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
2887 pickers[0] = std::move(picker);
2888 };
2889 }
2890 // Grab mutex and take a ref to the picker.
2891 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2892 gpr_log(GPR_INFO, "chand=%p lb_call=%p: grabbing LB mutex to get picker",
2893 chand_, this);
2894 }
2895 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
2896 {
2897 MutexLock lock(&chand_->lb_mu_);
2898 set_picker(chand_->picker_);
2899 }
2900 while (true) {
2901 // TODO(roth): Fix race condition in channel_idle filter and any
2902 // other possible causes of this.
2903 if (pickers.back() == nullptr) {
2904 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2905 gpr_log(GPR_ERROR, "chand=%p lb_call=%p: picker is null, failing call",
2906 chand_, this);
2907 }
2908 return absl::InternalError("picker is null -- shouldn't happen");
2909 }
2910 // Do pick.
2911 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2912 gpr_log(GPR_INFO, "chand=%p lb_call=%p: performing pick with picker=%p",
2913 chand_, this, pickers.back().get());
2914 }
2915 grpc_error_handle error;
2916 bool pick_complete = PickSubchannelImpl(pickers.back().get(), &error);
2917 if (!pick_complete) {
2918 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> old_picker;
2919 MutexLock lock(&chand_->lb_mu_);
2920 // If picker has been swapped out since we grabbed it, try again.
2921 if (pickers.back() != chand_->picker_) {
2922 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2923 gpr_log(GPR_INFO,
2924 "chand=%p lb_call=%p: pick not complete, but picker changed",
2925 chand_, this);
2926 }
2927 if (IsWorkSerializerDispatchEnabled()) {
2928 // Don't unref until after we release the mutex.
2929 old_picker = std::move(pickers.back());
2930 }
2931 set_picker(chand_->picker_);
2932 continue;
2933 }
2934 // Otherwise queue the pick to try again later when we get a new picker.
2935 AddCallToLbQueuedCallsLocked();
2936 return absl::nullopt;
2937 }
2938 // Pick is complete.
2939 // If it was queued, add a trace annotation.
2940 if (was_queued && call_attempt_tracer() != nullptr) {
2941 call_attempt_tracer()->RecordAnnotation("Delayed LB pick complete.");
2942 }
2943 // If the pick failed, fail the call.
2944 if (!error.ok()) {
2945 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2946 gpr_log(GPR_INFO,
2947 "chand=%p lb_call=%p: failed to pick subchannel: error=%s",
2948 chand_, this, StatusToString(error).c_str());
2949 }
2950 return error;
2951 }
2952 // Pick succeeded.
2953 Commit();
2954 return absl::OkStatus();
2955 }
2956 }
2957
PickSubchannelImpl(LoadBalancingPolicy::SubchannelPicker * picker,grpc_error_handle * error)2958 bool ClientChannelFilter::LoadBalancedCall::PickSubchannelImpl(
2959 LoadBalancingPolicy::SubchannelPicker* picker, grpc_error_handle* error) {
2960 GPR_ASSERT(connected_subchannel_ == nullptr);
2961 // Perform LB pick.
2962 LoadBalancingPolicy::PickArgs pick_args;
2963 Slice* path = send_initial_metadata()->get_pointer(HttpPathMetadata());
2964 GPR_ASSERT(path != nullptr);
2965 pick_args.path = path->as_string_view();
2966 LbCallState lb_call_state(this);
2967 pick_args.call_state = &lb_call_state;
2968 Metadata initial_metadata(send_initial_metadata());
2969 pick_args.initial_metadata = &initial_metadata;
2970 auto result = picker->Pick(pick_args);
2971 return HandlePickResult<bool>(
2972 &result,
2973 // CompletePick
2974 [this](LoadBalancingPolicy::PickResult::Complete* complete_pick) {
2975 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2976 gpr_log(GPR_INFO,
2977 "chand=%p lb_call=%p: LB pick succeeded: subchannel=%p",
2978 chand_, this, complete_pick->subchannel.get());
2979 }
2980 GPR_ASSERT(complete_pick->subchannel != nullptr);
2981 // Grab a ref to the connected subchannel while we're still
2982 // holding the data plane mutex.
2983 SubchannelWrapper* subchannel =
2984 static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
2985 connected_subchannel_ = subchannel->connected_subchannel();
2986 // If the subchannel has no connected subchannel (e.g., if the
2987 // subchannel has moved out of state READY but the LB policy hasn't
2988 // yet seen that change and given us a new picker), then just
2989 // queue the pick. We'll try again as soon as we get a new picker.
2990 if (connected_subchannel_ == nullptr) {
2991 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
2992 gpr_log(GPR_INFO,
2993 "chand=%p lb_call=%p: subchannel returned by LB picker "
2994 "has no connected subchannel; queueing pick",
2995 chand_, this);
2996 }
2997 return false;
2998 }
2999 lb_subchannel_call_tracker_ =
3000 std::move(complete_pick->subchannel_call_tracker);
3001 if (lb_subchannel_call_tracker_ != nullptr) {
3002 lb_subchannel_call_tracker_->Start();
3003 }
3004 return true;
3005 },
3006 // QueuePick
3007 [this](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) {
3008 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3009 gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick queued", chand_,
3010 this);
3011 }
3012 return false;
3013 },
3014 // FailPick
3015 [this, &error](LoadBalancingPolicy::PickResult::Fail* fail_pick) {
3016 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3017 gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick failed: %s", chand_,
3018 this, fail_pick->status.ToString().c_str());
3019 }
3020 // If wait_for_ready is false, then the error indicates the RPC
3021 // attempt's final status.
3022 if (!send_initial_metadata()
3023 ->GetOrCreatePointer(WaitForReady())
3024 ->value) {
3025 *error = absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode(
3026 std::move(fail_pick->status), "LB pick"));
3027 return true;
3028 }
3029 // If wait_for_ready is true, then queue to retry when we get a new
3030 // picker.
3031 return false;
3032 },
3033 // DropPick
3034 [this, &error](LoadBalancingPolicy::PickResult::Drop* drop_pick) {
3035 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3036 gpr_log(GPR_INFO, "chand=%p lb_call=%p: LB pick dropped: %s", chand_,
3037 this, drop_pick->status.ToString().c_str());
3038 }
3039 *error = grpc_error_set_int(
3040 absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode(
3041 std::move(drop_pick->status), "LB drop")),
3042 StatusIntProperty::kLbPolicyDrop, 1);
3043 return true;
3044 });
3045 }
3046
3047 //
3048 // ClientChannelFilter::FilterBasedLoadBalancedCall
3049 //
3050
FilterBasedLoadBalancedCall(ClientChannelFilter * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent,grpc_closure * on_call_destruction_complete,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)3051 ClientChannelFilter::FilterBasedLoadBalancedCall::FilterBasedLoadBalancedCall(
3052 ClientChannelFilter* chand, const grpc_call_element_args& args,
3053 grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete,
3054 absl::AnyInvocable<void()> on_commit, bool is_transparent_retry)
3055 : LoadBalancedCall(chand, args.context, std::move(on_commit),
3056 is_transparent_retry),
3057 deadline_(args.deadline),
3058 arena_(args.arena),
3059 owning_call_(args.call_stack),
3060 call_combiner_(args.call_combiner),
3061 pollent_(pollent),
3062 on_call_destruction_complete_(on_call_destruction_complete) {}
3063
3064 ClientChannelFilter::FilterBasedLoadBalancedCall::
~FilterBasedLoadBalancedCall()3065 ~FilterBasedLoadBalancedCall() {
3066 // Make sure there are no remaining pending batches.
3067 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3068 GPR_ASSERT(pending_batches_[i] == nullptr);
3069 }
3070 if (on_call_destruction_complete_ != nullptr) {
3071 ExecCtx::Run(DEBUG_LOCATION, on_call_destruction_complete_,
3072 absl::OkStatus());
3073 }
3074 }
3075
Orphan()3076 void ClientChannelFilter::FilterBasedLoadBalancedCall::Orphan() {
3077 // If the recv_trailing_metadata op was never started, then notify
3078 // about call completion here, as best we can. We assume status
3079 // CANCELLED in this case.
3080 if (recv_trailing_metadata_ == nullptr) {
3081 RecordCallCompletion(absl::CancelledError("call cancelled"), nullptr,
3082 nullptr, "");
3083 }
3084 RecordLatency();
3085 // Delegate to parent.
3086 LoadBalancedCall::Orphan();
3087 }
3088
GetBatchIndex(grpc_transport_stream_op_batch * batch)3089 size_t ClientChannelFilter::FilterBasedLoadBalancedCall::GetBatchIndex(
3090 grpc_transport_stream_op_batch* batch) {
3091 // Note: It is important the send_initial_metadata be the first entry
3092 // here, since the code in PickSubchannelImpl() assumes it will be.
3093 if (batch->send_initial_metadata) return 0;
3094 if (batch->send_message) return 1;
3095 if (batch->send_trailing_metadata) return 2;
3096 if (batch->recv_initial_metadata) return 3;
3097 if (batch->recv_message) return 4;
3098 if (batch->recv_trailing_metadata) return 5;
3099 GPR_UNREACHABLE_CODE(return (size_t)-1);
3100 }
3101
3102 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)3103 void ClientChannelFilter::FilterBasedLoadBalancedCall::PendingBatchesAdd(
3104 grpc_transport_stream_op_batch* batch) {
3105 const size_t idx = GetBatchIndex(batch);
3106 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3107 gpr_log(GPR_INFO,
3108 "chand=%p lb_call=%p: adding pending batch at index %" PRIuPTR,
3109 chand(), this, idx);
3110 }
3111 GPR_ASSERT(pending_batches_[idx] == nullptr);
3112 pending_batches_[idx] = batch;
3113 }
3114
3115 // This is called via the call combiner, so access to calld is synchronized.
3116 void ClientChannelFilter::FilterBasedLoadBalancedCall::
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)3117 FailPendingBatchInCallCombiner(void* arg, grpc_error_handle error) {
3118 grpc_transport_stream_op_batch* batch =
3119 static_cast<grpc_transport_stream_op_batch*>(arg);
3120 auto* self = static_cast<FilterBasedLoadBalancedCall*>(
3121 batch->handler_private.extra_arg);
3122 // Note: This will release the call combiner.
3123 grpc_transport_stream_op_batch_finish_with_failure(batch, error,
3124 self->call_combiner_);
3125 }
3126
3127 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error_handle error,YieldCallCombinerPredicate yield_call_combiner_predicate)3128 void ClientChannelFilter::FilterBasedLoadBalancedCall::PendingBatchesFail(
3129 grpc_error_handle error,
3130 YieldCallCombinerPredicate yield_call_combiner_predicate) {
3131 GPR_ASSERT(!error.ok());
3132 failure_error_ = error;
3133 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3134 size_t num_batches = 0;
3135 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3136 if (pending_batches_[i] != nullptr) ++num_batches;
3137 }
3138 gpr_log(GPR_INFO,
3139 "chand=%p lb_call=%p: failing %" PRIuPTR " pending batches: %s",
3140 chand(), this, num_batches, StatusToString(error).c_str());
3141 }
3142 CallCombinerClosureList closures;
3143 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3144 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
3145 if (batch != nullptr) {
3146 batch->handler_private.extra_arg = this;
3147 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
3148 FailPendingBatchInCallCombiner, batch,
3149 grpc_schedule_on_exec_ctx);
3150 closures.Add(&batch->handler_private.closure, error,
3151 "PendingBatchesFail");
3152 batch = nullptr;
3153 }
3154 }
3155 if (yield_call_combiner_predicate(closures)) {
3156 closures.RunClosures(call_combiner_);
3157 } else {
3158 closures.RunClosuresWithoutYielding(call_combiner_);
3159 }
3160 }
3161
3162 // This is called via the call combiner, so access to calld is synchronized.
3163 void ClientChannelFilter::FilterBasedLoadBalancedCall::
ResumePendingBatchInCallCombiner(void * arg,grpc_error_handle)3164 ResumePendingBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
3165 grpc_transport_stream_op_batch* batch =
3166 static_cast<grpc_transport_stream_op_batch*>(arg);
3167 SubchannelCall* subchannel_call =
3168 static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
3169 // Note: This will release the call combiner.
3170 subchannel_call->StartTransportStreamOpBatch(batch);
3171 }
3172
3173 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()3174 void ClientChannelFilter::FilterBasedLoadBalancedCall::PendingBatchesResume() {
3175 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3176 size_t num_batches = 0;
3177 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3178 if (pending_batches_[i] != nullptr) ++num_batches;
3179 }
3180 gpr_log(GPR_INFO,
3181 "chand=%p lb_call=%p: starting %" PRIuPTR
3182 " pending batches on subchannel_call=%p",
3183 chand(), this, num_batches, subchannel_call_.get());
3184 }
3185 CallCombinerClosureList closures;
3186 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3187 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
3188 if (batch != nullptr) {
3189 batch->handler_private.extra_arg = subchannel_call_.get();
3190 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
3191 ResumePendingBatchInCallCombiner, batch,
3192 grpc_schedule_on_exec_ctx);
3193 closures.Add(&batch->handler_private.closure, absl::OkStatus(),
3194 "resuming pending batch from LB call");
3195 batch = nullptr;
3196 }
3197 }
3198 // Note: This will release the call combiner.
3199 closures.RunClosures(call_combiner_);
3200 }
3201
3202 void ClientChannelFilter::FilterBasedLoadBalancedCall::
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)3203 StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch) {
3204 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace) ||
3205 GRPC_TRACE_FLAG_ENABLED(grpc_trace_channel)) {
3206 gpr_log(GPR_INFO,
3207 "chand=%p lb_call=%p: batch started from above: %s, "
3208 "call_attempt_tracer()=%p",
3209 chand(), this,
3210 grpc_transport_stream_op_batch_string(batch, false).c_str(),
3211 call_attempt_tracer());
3212 }
3213 // Handle call tracing.
3214 if (call_attempt_tracer() != nullptr) {
3215 // Record send ops in tracer.
3216 if (batch->cancel_stream) {
3217 call_attempt_tracer()->RecordCancel(
3218 batch->payload->cancel_stream.cancel_error);
3219 }
3220 if (batch->send_initial_metadata) {
3221 call_attempt_tracer()->RecordSendInitialMetadata(
3222 batch->payload->send_initial_metadata.send_initial_metadata);
3223 }
3224 if (batch->send_trailing_metadata) {
3225 call_attempt_tracer()->RecordSendTrailingMetadata(
3226 batch->payload->send_trailing_metadata.send_trailing_metadata);
3227 }
3228 // Intercept recv ops.
3229 if (batch->recv_initial_metadata) {
3230 recv_initial_metadata_ =
3231 batch->payload->recv_initial_metadata.recv_initial_metadata;
3232 original_recv_initial_metadata_ready_ =
3233 batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
3234 GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
3235 this, nullptr);
3236 batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
3237 &recv_initial_metadata_ready_;
3238 }
3239 }
3240 // Intercept recv_trailing_metadata even if there is no call tracer,
3241 // since we may need to notify the LB policy about trailing metadata.
3242 if (batch->recv_trailing_metadata) {
3243 recv_trailing_metadata_ =
3244 batch->payload->recv_trailing_metadata.recv_trailing_metadata;
3245 transport_stream_stats_ =
3246 batch->payload->recv_trailing_metadata.collect_stats;
3247 original_recv_trailing_metadata_ready_ =
3248 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
3249 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
3250 this, nullptr);
3251 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
3252 &recv_trailing_metadata_ready_;
3253 }
3254 // If we've already gotten a subchannel call, pass the batch down to it.
3255 // Note that once we have picked a subchannel, we do not need to acquire
3256 // the channel's data plane mutex, which is more efficient (especially for
3257 // streaming calls).
3258 if (subchannel_call_ != nullptr) {
3259 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3260 gpr_log(GPR_INFO,
3261 "chand=%p lb_call=%p: starting batch on subchannel_call=%p",
3262 chand(), this, subchannel_call_.get());
3263 }
3264 subchannel_call_->StartTransportStreamOpBatch(batch);
3265 return;
3266 }
3267 // We do not yet have a subchannel call.
3268 //
3269 // If we've previously been cancelled, immediately fail any new batches.
3270 if (GPR_UNLIKELY(!cancel_error_.ok())) {
3271 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3272 gpr_log(GPR_INFO, "chand=%p lb_call=%p: failing batch with error: %s",
3273 chand(), this, StatusToString(cancel_error_).c_str());
3274 }
3275 // Note: This will release the call combiner.
3276 grpc_transport_stream_op_batch_finish_with_failure(batch, cancel_error_,
3277 call_combiner_);
3278 return;
3279 }
3280 // Handle cancellation.
3281 if (GPR_UNLIKELY(batch->cancel_stream)) {
3282 // Stash a copy of cancel_error in our call data, so that we can use
3283 // it for subsequent operations. This ensures that if the call is
3284 // cancelled before any batches are passed down (e.g., if the deadline
3285 // is in the past when the call starts), we can return the right
3286 // error to the caller when the first batch does get passed down.
3287 cancel_error_ = batch->payload->cancel_stream.cancel_error;
3288 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3289 gpr_log(GPR_INFO, "chand=%p lb_call=%p: recording cancel_error=%s",
3290 chand(), this, StatusToString(cancel_error_).c_str());
3291 }
3292 // Fail all pending batches.
3293 PendingBatchesFail(cancel_error_, NoYieldCallCombiner);
3294 // Note: This will release the call combiner.
3295 grpc_transport_stream_op_batch_finish_with_failure(batch, cancel_error_,
3296 call_combiner_);
3297 return;
3298 }
3299 // Add the batch to the pending list.
3300 PendingBatchesAdd(batch);
3301 // For batches containing a send_initial_metadata op, acquire the
3302 // channel's LB mutex to pick a subchannel.
3303 if (GPR_LIKELY(batch->send_initial_metadata)) {
3304 TryPick(/*was_queued=*/false);
3305 } else {
3306 // For all other batches, release the call combiner.
3307 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3308 gpr_log(GPR_INFO,
3309 "chand=%p lb_call=%p: saved batch, yielding call combiner",
3310 chand(), this);
3311 }
3312 GRPC_CALL_COMBINER_STOP(call_combiner_,
3313 "batch does not include send_initial_metadata");
3314 }
3315 }
3316
RecvInitialMetadataReady(void * arg,grpc_error_handle error)3317 void ClientChannelFilter::FilterBasedLoadBalancedCall::RecvInitialMetadataReady(
3318 void* arg, grpc_error_handle error) {
3319 auto* self = static_cast<FilterBasedLoadBalancedCall*>(arg);
3320 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3321 gpr_log(GPR_INFO,
3322 "chand=%p lb_call=%p: got recv_initial_metadata_ready: error=%s",
3323 self->chand(), self, StatusToString(error).c_str());
3324 }
3325 if (error.ok()) {
3326 // recv_initial_metadata_flags is not populated for clients
3327 self->call_attempt_tracer()->RecordReceivedInitialMetadata(
3328 self->recv_initial_metadata_);
3329 auto* peer_string = self->recv_initial_metadata_->get_pointer(PeerString());
3330 if (peer_string != nullptr) self->peer_string_ = peer_string->Ref();
3331 }
3332 Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_,
3333 error);
3334 }
3335
3336 void ClientChannelFilter::FilterBasedLoadBalancedCall::
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)3337 RecvTrailingMetadataReady(void* arg, grpc_error_handle error) {
3338 auto* self = static_cast<FilterBasedLoadBalancedCall*>(arg);
3339 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3340 gpr_log(GPR_INFO,
3341 "chand=%p lb_call=%p: got recv_trailing_metadata_ready: error=%s "
3342 "call_attempt_tracer()=%p lb_subchannel_call_tracker_=%p "
3343 "failure_error_=%s",
3344 self->chand(), self, StatusToString(error).c_str(),
3345 self->call_attempt_tracer(), self->lb_subchannel_call_tracker(),
3346 StatusToString(self->failure_error_).c_str());
3347 }
3348 // Check if we have a tracer or an LB callback to invoke.
3349 if (self->call_attempt_tracer() != nullptr ||
3350 self->lb_subchannel_call_tracker() != nullptr) {
3351 // Get the call's status.
3352 absl::Status status;
3353 if (!error.ok()) {
3354 // Get status from error.
3355 grpc_status_code code;
3356 std::string message;
3357 grpc_error_get_status(error, self->deadline_, &code, &message,
3358 /*http_error=*/nullptr, /*error_string=*/nullptr);
3359 status = absl::Status(static_cast<absl::StatusCode>(code), message);
3360 } else {
3361 // Get status from headers.
3362 const auto& md = *self->recv_trailing_metadata_;
3363 grpc_status_code code =
3364 md.get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN);
3365 if (code != GRPC_STATUS_OK) {
3366 absl::string_view message;
3367 if (const auto* grpc_message = md.get_pointer(GrpcMessageMetadata())) {
3368 message = grpc_message->as_string_view();
3369 }
3370 status = absl::Status(static_cast<absl::StatusCode>(code), message);
3371 }
3372 }
3373 absl::string_view peer_string;
3374 if (self->peer_string_.has_value()) {
3375 peer_string = self->peer_string_->as_string_view();
3376 }
3377 self->RecordCallCompletion(status, self->recv_trailing_metadata_,
3378 self->transport_stream_stats_, peer_string);
3379 }
3380 // Chain to original callback.
3381 if (!self->failure_error_.ok()) {
3382 error = self->failure_error_;
3383 self->failure_error_ = absl::OkStatus();
3384 }
3385 Closure::Run(DEBUG_LOCATION, self->original_recv_trailing_metadata_ready_,
3386 error);
3387 }
3388
3389 // A class to handle the call combiner cancellation callback for a
3390 // queued pick.
3391 // TODO(roth): When we implement hedging support, we won't be able to
3392 // register a call combiner cancellation closure for each LB pick,
3393 // because there may be multiple LB picks happening in parallel.
3394 // Instead, we will probably need to maintain a list in the CallData
3395 // object of pending LB picks to be cancelled when the closure runs.
3396 class ClientChannelFilter::FilterBasedLoadBalancedCall::LbQueuedCallCanceller
3397 final {
3398 public:
LbQueuedCallCanceller(RefCountedPtr<FilterBasedLoadBalancedCall> lb_call)3399 explicit LbQueuedCallCanceller(
3400 RefCountedPtr<FilterBasedLoadBalancedCall> lb_call)
3401 : lb_call_(std::move(lb_call)) {
3402 GRPC_CALL_STACK_REF(lb_call_->owning_call_, "LbQueuedCallCanceller");
3403 GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, nullptr);
3404 lb_call_->call_combiner_->SetNotifyOnCancel(&closure_);
3405 }
3406
3407 private:
CancelLocked(void * arg,grpc_error_handle error)3408 static void CancelLocked(void* arg, grpc_error_handle error) {
3409 auto* self = static_cast<LbQueuedCallCanceller*>(arg);
3410 auto* lb_call = self->lb_call_.get();
3411 auto* chand = lb_call->chand();
3412 {
3413 MutexLock lock(&chand->lb_mu_);
3414 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3415 gpr_log(GPR_INFO,
3416 "chand=%p lb_call=%p: cancelling queued pick: "
3417 "error=%s self=%p calld->pick_canceller=%p",
3418 chand, lb_call, StatusToString(error).c_str(), self,
3419 lb_call->lb_call_canceller_);
3420 }
3421 if (lb_call->lb_call_canceller_ == self && !error.ok()) {
3422 lb_call->Commit();
3423 // Remove pick from list of queued picks.
3424 lb_call->RemoveCallFromLbQueuedCallsLocked();
3425 // Remove from queued picks list.
3426 chand->lb_queued_calls_.erase(self->lb_call_);
3427 // Fail pending batches on the call.
3428 lb_call->PendingBatchesFail(error,
3429 YieldCallCombinerIfPendingBatchesFound);
3430 }
3431 }
3432 // Unref lb_call before unreffing the call stack, since unreffing
3433 // the call stack may destroy the arena in which lb_call is allocated.
3434 auto* owning_call = lb_call->owning_call_;
3435 self->lb_call_.reset();
3436 GRPC_CALL_STACK_UNREF(owning_call, "LbQueuedCallCanceller");
3437 delete self;
3438 }
3439
3440 RefCountedPtr<FilterBasedLoadBalancedCall> lb_call_;
3441 grpc_closure closure_;
3442 };
3443
TryPick(bool was_queued)3444 void ClientChannelFilter::FilterBasedLoadBalancedCall::TryPick(
3445 bool was_queued) {
3446 auto result = PickSubchannel(was_queued);
3447 if (result.has_value()) {
3448 if (!result->ok()) {
3449 PendingBatchesFail(*result, YieldCallCombiner);
3450 return;
3451 }
3452 CreateSubchannelCall();
3453 }
3454 }
3455
OnAddToQueueLocked()3456 void ClientChannelFilter::FilterBasedLoadBalancedCall::OnAddToQueueLocked() {
3457 // Register call combiner cancellation callback.
3458 lb_call_canceller_ =
3459 new LbQueuedCallCanceller(RefAsSubclass<FilterBasedLoadBalancedCall>());
3460 }
3461
RetryPickLocked()3462 void ClientChannelFilter::FilterBasedLoadBalancedCall::RetryPickLocked() {
3463 // Lame the call combiner canceller.
3464 lb_call_canceller_ = nullptr;
3465 // Do an async callback to resume call processing, so that we're not
3466 // doing it while holding the channel's LB mutex.
3467 // TODO(roth): We should really be using EventEngine::Run() here
3468 // instead of ExecCtx::Run(). Unfortunately, doing that seems to cause
3469 // a flaky TSAN failure for reasons that I do not fully understand.
3470 // However, given that we are working toward eliminating this code as
3471 // part of the promise conversion, it doesn't seem worth further
3472 // investigation right now.
3473 ExecCtx::Run(DEBUG_LOCATION, NewClosure([this](grpc_error_handle) {
3474 // If there are a lot of queued calls here, resuming them
3475 // all may cause us to stay inside C-core for a long period
3476 // of time. All of that work would be done using the same
3477 // ExecCtx instance and therefore the same cached value of
3478 // "now". The longer it takes to finish all of this work
3479 // and exit from C-core, the more stale the cached value of
3480 // "now" may become. This can cause problems whereby (e.g.)
3481 // we calculate a timer deadline based on the stale value,
3482 // which results in the timer firing too early. To avoid
3483 // this, we invalidate the cached value for each call we
3484 // process.
3485 ExecCtx::Get()->InvalidateNow();
3486 TryPick(/*was_queued=*/true);
3487 }),
3488 absl::OkStatus());
3489 }
3490
CreateSubchannelCall()3491 void ClientChannelFilter::FilterBasedLoadBalancedCall::CreateSubchannelCall() {
3492 Slice* path = send_initial_metadata()->get_pointer(HttpPathMetadata());
3493 GPR_ASSERT(path != nullptr);
3494 SubchannelCall::Args call_args = {
3495 connected_subchannel()->Ref(), pollent_, path->Ref(), /*start_time=*/0,
3496 deadline_, arena_,
3497 // TODO(roth): When we implement hedging support, we will probably
3498 // need to use a separate call context for each subchannel call.
3499 call_context(), call_combiner_};
3500 grpc_error_handle error;
3501 subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error);
3502 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3503 gpr_log(GPR_INFO,
3504 "chand=%p lb_call=%p: create subchannel_call=%p: error=%s", chand(),
3505 this, subchannel_call_.get(), StatusToString(error).c_str());
3506 }
3507 if (on_call_destruction_complete_ != nullptr) {
3508 subchannel_call_->SetAfterCallStackDestroy(on_call_destruction_complete_);
3509 on_call_destruction_complete_ = nullptr;
3510 }
3511 if (GPR_UNLIKELY(!error.ok())) {
3512 PendingBatchesFail(error, YieldCallCombiner);
3513 } else {
3514 PendingBatchesResume();
3515 }
3516 }
3517
3518 //
3519 // ClientChannelFilter::PromiseBasedLoadBalancedCall
3520 //
3521
PromiseBasedLoadBalancedCall(ClientChannelFilter * chand,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)3522 ClientChannelFilter::PromiseBasedLoadBalancedCall::PromiseBasedLoadBalancedCall(
3523 ClientChannelFilter* chand, absl::AnyInvocable<void()> on_commit,
3524 bool is_transparent_retry)
3525 : LoadBalancedCall(chand, GetContext<grpc_call_context_element>(),
3526 std::move(on_commit), is_transparent_retry) {}
3527
3528 ArenaPromise<ServerMetadataHandle>
MakeCallPromise(CallArgs call_args,OrphanablePtr<PromiseBasedLoadBalancedCall> lb_call)3529 ClientChannelFilter::PromiseBasedLoadBalancedCall::MakeCallPromise(
3530 CallArgs call_args, OrphanablePtr<PromiseBasedLoadBalancedCall> lb_call) {
3531 pollent_ = NowOrNever(call_args.polling_entity->WaitAndCopy()).value();
3532 // Record ops in tracer.
3533 if (call_attempt_tracer() != nullptr) {
3534 call_attempt_tracer()->RecordSendInitialMetadata(
3535 call_args.client_initial_metadata.get());
3536 // TODO(ctiller): Find a way to do this without registering a no-op mapper.
3537 call_args.client_to_server_messages->InterceptAndMapWithHalfClose(
3538 [](MessageHandle message) { return message; }, // No-op.
3539 [this]() {
3540 // TODO(roth): Change CallTracer API to not pass metadata
3541 // batch to this method, since the batch is always empty.
3542 grpc_metadata_batch metadata;
3543 call_attempt_tracer()->RecordSendTrailingMetadata(&metadata);
3544 });
3545 }
3546 // Extract peer name from server initial metadata.
3547 call_args.server_initial_metadata->InterceptAndMap(
3548 [self = lb_call->RefAsSubclass<PromiseBasedLoadBalancedCall>()](
3549 ServerMetadataHandle metadata) {
3550 if (self->call_attempt_tracer() != nullptr) {
3551 self->call_attempt_tracer()->RecordReceivedInitialMetadata(
3552 metadata.get());
3553 }
3554 Slice* peer_string = metadata->get_pointer(PeerString());
3555 if (peer_string != nullptr) self->peer_string_ = peer_string->Ref();
3556 return metadata;
3557 });
3558 client_initial_metadata_ = std::move(call_args.client_initial_metadata);
3559 return OnCancel(
3560 Map(TrySeq(
3561 // LB pick.
3562 [this]() -> Poll<absl::Status> {
3563 auto result = PickSubchannel(was_queued_);
3564 if (GRPC_TRACE_FLAG_ENABLED(
3565 grpc_client_channel_lb_call_trace)) {
3566 gpr_log(GPR_INFO,
3567 "chand=%p lb_call=%p: %sPickSubchannel() returns %s",
3568 chand(), this,
3569 GetContext<Activity>()->DebugTag().c_str(),
3570 result.has_value() ? result->ToString().c_str()
3571 : "Pending");
3572 }
3573 if (result == absl::nullopt) return Pending{};
3574 return std::move(*result);
3575 },
3576 [this, call_args = std::move(call_args)]() mutable
3577 -> ArenaPromise<ServerMetadataHandle> {
3578 call_args.client_initial_metadata =
3579 std::move(client_initial_metadata_);
3580 return connected_subchannel()->MakeCallPromise(
3581 std::move(call_args));
3582 }),
3583 // Record call completion.
3584 [this](ServerMetadataHandle metadata) {
3585 if (call_attempt_tracer() != nullptr ||
3586 lb_subchannel_call_tracker() != nullptr) {
3587 absl::Status status;
3588 grpc_status_code code = metadata->get(GrpcStatusMetadata())
3589 .value_or(GRPC_STATUS_UNKNOWN);
3590 if (code != GRPC_STATUS_OK) {
3591 absl::string_view message;
3592 if (const auto* grpc_message =
3593 metadata->get_pointer(GrpcMessageMetadata())) {
3594 message = grpc_message->as_string_view();
3595 }
3596 status =
3597 absl::Status(static_cast<absl::StatusCode>(code), message);
3598 }
3599 RecordCallCompletion(status, metadata.get(),
3600 &GetContext<CallContext>()
3601 ->call_stats()
3602 ->transport_stream_stats,
3603 peer_string_.as_string_view());
3604 }
3605 RecordLatency();
3606 return metadata;
3607 }),
3608 [lb_call = std::move(lb_call)]() {
3609 // If the waker is pending, then we need to remove ourself from
3610 // the list of queued LB calls.
3611 if (!lb_call->waker_.is_unwakeable()) {
3612 MutexLock lock(&lb_call->chand()->lb_mu_);
3613 lb_call->Commit();
3614 // Remove pick from list of queued picks.
3615 lb_call->RemoveCallFromLbQueuedCallsLocked();
3616 // Remove from queued picks list.
3617 lb_call->chand()->lb_queued_calls_.erase(lb_call.get());
3618 }
3619 // TODO(ctiller): We don't have access to the call's actual status
3620 // here, so we just assume CANCELLED. We could change this to use
3621 // CallFinalization instead of OnCancel() so that we can get the
3622 // actual status. But we should also have access to the trailing
3623 // metadata, which we don't have in either case. Ultimately, we
3624 // need a better story for code that needs to run at the end of a
3625 // call in both cancellation and non-cancellation cases that needs
3626 // access to server trailing metadata and the call's real status.
3627 if (lb_call->call_attempt_tracer() != nullptr) {
3628 lb_call->call_attempt_tracer()->RecordCancel(
3629 absl::CancelledError("call cancelled"));
3630 }
3631 if (lb_call->call_attempt_tracer() != nullptr ||
3632 lb_call->lb_subchannel_call_tracker() != nullptr) {
3633 // If we were cancelled without recording call completion, then
3634 // record call completion here, as best we can. We assume status
3635 // CANCELLED in this case.
3636 lb_call->RecordCallCompletion(absl::CancelledError("call cancelled"),
3637 nullptr, nullptr, "");
3638 }
3639 });
3640 }
3641
arena() const3642 Arena* ClientChannelFilter::PromiseBasedLoadBalancedCall::arena() const {
3643 return GetContext<Arena>();
3644 }
3645
3646 grpc_metadata_batch*
send_initial_metadata() const3647 ClientChannelFilter::PromiseBasedLoadBalancedCall::send_initial_metadata()
3648 const {
3649 return client_initial_metadata_.get();
3650 }
3651
OnAddToQueueLocked()3652 void ClientChannelFilter::PromiseBasedLoadBalancedCall::OnAddToQueueLocked() {
3653 waker_ = GetContext<Activity>()->MakeNonOwningWaker();
3654 was_queued_ = true;
3655 }
3656
RetryPickLocked()3657 void ClientChannelFilter::PromiseBasedLoadBalancedCall::RetryPickLocked() {
3658 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
3659 gpr_log(GPR_INFO, "chand=%p lb_call=%p: RetryPickLocked()", chand(), this);
3660 }
3661 waker_.WakeupAsync();
3662 }
3663
3664 } // namespace grpc_core
3665