xref: /aosp_15_r20/external/grpc-grpc/src/core/lib/surface/channel_init.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/surface/channel_init.h"
22 
23 #include <string.h>
24 
25 #include <algorithm>
26 #include <map>
27 #include <set>
28 #include <string>
29 #include <type_traits>
30 
31 #include "absl/strings/str_cat.h"
32 #include "absl/strings/str_join.h"
33 #include "absl/strings/string_view.h"
34 #include "absl/types/optional.h"
35 
36 #include <grpc/support/log.h>
37 
38 #include "src/core/lib/channel/channel_stack_trace.h"
39 #include "src/core/lib/debug/trace.h"
40 #include "src/core/lib/gprpp/crash.h"
41 #include "src/core/lib/gprpp/sync.h"
42 #include "src/core/lib/surface/channel_stack_type.h"
43 
44 namespace grpc_core {
45 
46 const char* (*NameFromChannelFilter)(const grpc_channel_filter*);
47 
48 namespace {
49 struct CompareChannelFiltersByName {
operator ()grpc_core::__anona21a48c80111::CompareChannelFiltersByName50   bool operator()(const grpc_channel_filter* a,
51                   const grpc_channel_filter* b) const {
52     return strcmp(NameFromChannelFilter(a), NameFromChannelFilter(b)) < 0;
53   }
54 };
55 }  // namespace
56 
After(std::initializer_list<const grpc_channel_filter * > filters)57 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::After(
58     std::initializer_list<const grpc_channel_filter*> filters) {
59   for (auto filter : filters) {
60     after_.push_back(filter);
61   }
62   return *this;
63 }
64 
Before(std::initializer_list<const grpc_channel_filter * > filters)65 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::Before(
66     std::initializer_list<const grpc_channel_filter*> filters) {
67   for (auto filter : filters) {
68     before_.push_back(filter);
69   }
70   return *this;
71 }
72 
If(InclusionPredicate predicate)73 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::If(
74     InclusionPredicate predicate) {
75   predicates_.emplace_back(std::move(predicate));
76   return *this;
77 }
78 
IfNot(InclusionPredicate predicate)79 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::IfNot(
80     InclusionPredicate predicate) {
81   predicates_.emplace_back(
82       [predicate = std::move(predicate)](const ChannelArgs& args) {
83         return !predicate(args);
84       });
85   return *this;
86 }
87 
88 ChannelInit::FilterRegistration&
IfHasChannelArg(const char * arg)89 ChannelInit::FilterRegistration::IfHasChannelArg(const char* arg) {
90   return If([arg](const ChannelArgs& args) { return args.Contains(arg); });
91 }
92 
IfChannelArg(const char * arg,bool default_value)93 ChannelInit::FilterRegistration& ChannelInit::FilterRegistration::IfChannelArg(
94     const char* arg, bool default_value) {
95   return If([arg, default_value](const ChannelArgs& args) {
96     return args.GetBool(arg).value_or(default_value);
97   });
98 }
99 
100 ChannelInit::FilterRegistration&
ExcludeFromMinimalStack()101 ChannelInit::FilterRegistration::ExcludeFromMinimalStack() {
102   return If([](const ChannelArgs& args) { return !args.WantMinimalStack(); });
103 }
104 
RegisterFilter(grpc_channel_stack_type type,const grpc_channel_filter * filter,const ChannelFilterVtable * vtable,SourceLocation registration_source)105 ChannelInit::FilterRegistration& ChannelInit::Builder::RegisterFilter(
106     grpc_channel_stack_type type, const grpc_channel_filter* filter,
107     const ChannelFilterVtable* vtable, SourceLocation registration_source) {
108   filters_[type].emplace_back(std::make_unique<FilterRegistration>(
109       filter, vtable, registration_source));
110   return *filters_[type].back();
111 }
112 
BuildStackConfig(const std::vector<std::unique_ptr<ChannelInit::FilterRegistration>> & registrations,PostProcessor * post_processors,grpc_channel_stack_type type)113 ChannelInit::StackConfig ChannelInit::BuildStackConfig(
114     const std::vector<std::unique_ptr<ChannelInit::FilterRegistration>>&
115         registrations,
116     PostProcessor* post_processors, grpc_channel_stack_type type) {
117   // Phase 1: Build a map from filter to the set of filters that must be
118   // initialized before it.
119   // We order this map (and the set of dependent filters) by filter name to
120   // ensure algorithm ordering stability is deterministic for a given build.
121   // We should not require this, but at the time of writing it's expected that
122   // this will help overall stability.
123   using F = const grpc_channel_filter*;
124   std::map<F, FilterRegistration*> filter_to_registration;
125   using DependencyMap = std::map<F, std::set<F, CompareChannelFiltersByName>,
126                                  CompareChannelFiltersByName>;
127   DependencyMap dependencies;
128   std::vector<Filter> terminal_filters;
129   for (const auto& registration : registrations) {
130     if (filter_to_registration.count(registration->filter_) > 0) {
131       const auto first =
132           filter_to_registration[registration->filter_]->registration_source_;
133       const auto second = registration->registration_source_;
134       Crash(absl::StrCat("Duplicate registration of channel filter ",
135                          NameFromChannelFilter(registration->filter_),
136                          "\nfirst: ", first.file(), ":", first.line(),
137                          "\nsecond: ", second.file(), ":", second.line()));
138     }
139     filter_to_registration[registration->filter_] = registration.get();
140     if (registration->terminal_) {
141       GPR_ASSERT(registration->after_.empty());
142       GPR_ASSERT(registration->before_.empty());
143       GPR_ASSERT(!registration->before_all_);
144       terminal_filters.emplace_back(
145           registration->filter_, nullptr, std::move(registration->predicates_),
146           registration->skip_v3_, registration->registration_source_);
147     } else {
148       dependencies[registration->filter_];  // Ensure it's in the map.
149     }
150   }
151   for (const auto& registration : registrations) {
152     if (registration->terminal_) continue;
153     GPR_ASSERT(filter_to_registration.count(registration->filter_) > 0);
154     for (F after : registration->after_) {
155       if (filter_to_registration.count(after) == 0) {
156         gpr_log(
157             GPR_DEBUG, "%s",
158             absl::StrCat(
159                 "Filter ", NameFromChannelFilter(after),
160                 " not registered, but is referenced in the after clause of ",
161                 NameFromChannelFilter(registration->filter_),
162                 " when building channel stack ",
163                 grpc_channel_stack_type_string(type))
164                 .c_str());
165         continue;
166       }
167       dependencies[registration->filter_].insert(after);
168     }
169     for (F before : registration->before_) {
170       if (filter_to_registration.count(before) == 0) {
171         gpr_log(
172             GPR_DEBUG, "%s",
173             absl::StrCat(
174                 "Filter ", NameFromChannelFilter(before),
175                 " not registered, but is referenced in the before clause of ",
176                 NameFromChannelFilter(registration->filter_),
177                 " when building channel stack ",
178                 grpc_channel_stack_type_string(type))
179                 .c_str());
180         continue;
181       }
182       dependencies[before].insert(registration->filter_);
183     }
184     if (registration->before_all_) {
185       for (const auto& other : registrations) {
186         if (other.get() == registration.get()) continue;
187         if (other->terminal_) continue;
188         dependencies[other->filter_].insert(registration->filter_);
189       }
190     }
191   }
192   // Phase 2: Build a list of filters in dependency order.
193   // We can simply iterate through and add anything with no dependency.
194   // We then remove that filter from the dependency list of all other filters.
195   // We repeat until we have no more filters to add.
196   auto build_remaining_dependency_graph =
197       [](const DependencyMap& dependencies) {
198         std::string result;
199         for (const auto& p : dependencies) {
200           absl::StrAppend(&result, NameFromChannelFilter(p.first), " ->");
201           for (const auto& d : p.second) {
202             absl::StrAppend(&result, " ", NameFromChannelFilter(d));
203           }
204           absl::StrAppend(&result, "\n");
205         }
206         return result;
207       };
208   const DependencyMap original = dependencies;
209   auto take_ready_dependency = [&]() {
210     for (auto it = dependencies.begin(); it != dependencies.end(); ++it) {
211       if (it->second.empty()) {
212         auto r = it->first;
213         dependencies.erase(it);
214         return r;
215       }
216     }
217     Crash(absl::StrCat(
218         "Unresolvable graph of channel filters - remaining graph:\n",
219         build_remaining_dependency_graph(dependencies), "original:\n",
220         build_remaining_dependency_graph(original)));
221   };
222   std::vector<Filter> filters;
223   while (!dependencies.empty()) {
224     auto filter = take_ready_dependency();
225     auto* registration = filter_to_registration[filter];
226     filters.emplace_back(
227         filter, registration->vtable_, std::move(registration->predicates_),
228         registration->skip_v3_, registration->registration_source_);
229     for (auto& p : dependencies) {
230       p.second.erase(filter);
231     }
232   }
233   // Collect post processors that need to be applied.
234   // We've already ensured the one-per-slot constraint, so now we can just
235   // collect everything up into a vector and run it in order.
236   std::vector<PostProcessor> post_processor_functions;
237   for (int i = 0; i < static_cast<int>(PostProcessorSlot::kCount); i++) {
238     if (post_processors[i] == nullptr) continue;
239     post_processor_functions.emplace_back(std::move(post_processors[i]));
240   }
241   // Log out the graph we built if that's been requested.
242   if (grpc_trace_channel_stack.enabled()) {
243     // It can happen that multiple threads attempt to construct a core config at
244     // once.
245     // This is benign - the first one wins and others are discarded.
246     // However, it messes up our logging and makes it harder to reason about the
247     // graph, so we add some protection here.
248     static Mutex* const m = new Mutex();
249     MutexLock lock(m);
250     // List the channel stack type (since we'll be repeatedly printing graphs in
251     // this loop).
252     gpr_log(GPR_INFO,
253             "ORDERED CHANNEL STACK %s:", grpc_channel_stack_type_string(type));
254     // First build up a map of filter -> file:line: strings, because it helps
255     // the readability of this log to get later fields aligned vertically.
256     std::map<const grpc_channel_filter*, std::string> loc_strs;
257     size_t max_loc_str_len = 0;
258     size_t max_filter_name_len = 0;
259     auto add_loc_str = [&max_loc_str_len, &loc_strs, &filter_to_registration,
260                         &max_filter_name_len](
261                            const grpc_channel_filter* filter) {
262       max_filter_name_len =
263           std::max(strlen(NameFromChannelFilter(filter)), max_filter_name_len);
264       const auto registration =
265           filter_to_registration[filter]->registration_source_;
266       absl::string_view file = registration.file();
267       auto slash_pos = file.rfind('/');
268       if (slash_pos != file.npos) {
269         file = file.substr(slash_pos + 1);
270       }
271       auto loc_str = absl::StrCat(file, ":", registration.line(), ":");
272       max_loc_str_len = std::max(max_loc_str_len, loc_str.length());
273       loc_strs.emplace(filter, std::move(loc_str));
274     };
275     for (const auto& filter : filters) {
276       add_loc_str(filter.filter);
277     }
278     for (const auto& terminal : terminal_filters) {
279       add_loc_str(terminal.filter);
280     }
281     for (auto& loc_str : loc_strs) {
282       loc_str.second = absl::StrCat(
283           loc_str.second,
284           std::string(max_loc_str_len + 2 - loc_str.second.length(), ' '));
285     }
286     // For each regular filter, print the location registered, the name of the
287     // filter, and if it needed to occur after some other filters list those
288     // filters too.
289     // Note that we use the processed after list here - earlier we turned Before
290     // registrations into After registrations and we used those converted
291     // registrations to build the final ordering.
292     // If you're trying to track down why 'A' is listed as after 'B', look at
293     // the following:
294     //  - If A is registered with .After({B}), then A will be 'after' B here.
295     //  - If B is registered with .Before({A}), then A will be 'after' B here.
296     //  - If B is registered as BeforeAll, then A will be 'after' B here.
297     for (const auto& filter : filters) {
298       auto dep_it = original.find(filter.filter);
299       std::string after_str;
300       if (dep_it != original.end() && !dep_it->second.empty()) {
301         after_str = absl::StrCat(
302             std::string(max_filter_name_len + 1 -
303                             strlen(NameFromChannelFilter(filter.filter)),
304                         ' '),
305             "after ",
306             absl::StrJoin(
307                 dep_it->second, ", ",
308                 [](std::string* out, const grpc_channel_filter* filter) {
309                   out->append(NameFromChannelFilter(filter));
310                 }));
311       }
312       const auto filter_str =
313           absl::StrCat("  ", loc_strs[filter.filter],
314                        NameFromChannelFilter(filter.filter), after_str);
315       gpr_log(GPR_INFO, "%s", filter_str.c_str());
316     }
317     // Finally list out the terminal filters and where they were registered
318     // from.
319     for (const auto& terminal : terminal_filters) {
320       const auto filter_str = absl::StrCat(
321           "  ", loc_strs[terminal.filter],
322           NameFromChannelFilter(terminal.filter),
323           std::string(max_filter_name_len + 1 -
324                           strlen(NameFromChannelFilter(terminal.filter)),
325                       ' '),
326           "[terminal]");
327       gpr_log(GPR_INFO, "%s", filter_str.c_str());
328     }
329   }
330   // Check if there are no terminal filters: this would be an error.
331   // GRPC_CLIENT_DYNAMIC stacks don't use this mechanism, so we don't check that
332   // condition here.
333   // Right now we only log: many tests end up with a core configuration that
334   // is invalid.
335   // TODO(ctiller): evaluate if we can turn this into a crash one day.
336   // Right now it forces too many tests to know about channel initialization,
337   // either by supplying a valid configuration or by including an opt-out flag.
338   if (terminal_filters.empty() && type != GRPC_CLIENT_DYNAMIC) {
339     gpr_log(
340         GPR_ERROR,
341         "No terminal filters registered for channel stack type %s; this is "
342         "common for unit tests messing with CoreConfiguration, but will result "
343         "in a ChannelInit::CreateStack that never completes successfully.",
344         grpc_channel_stack_type_string(type));
345   }
346   return StackConfig{std::move(filters), std::move(terminal_filters),
347                      std::move(post_processor_functions)};
348 };
349 
Build()350 ChannelInit ChannelInit::Builder::Build() {
351   ChannelInit result;
352   for (int i = 0; i < GRPC_NUM_CHANNEL_STACK_TYPES; i++) {
353     result.stack_configs_[i] =
354         BuildStackConfig(filters_[i], post_processors_[i],
355                          static_cast<grpc_channel_stack_type>(i));
356   }
357   return result;
358 }
359 
CheckPredicates(const ChannelArgs & args) const360 bool ChannelInit::Filter::CheckPredicates(const ChannelArgs& args) const {
361   for (const auto& predicate : predicates) {
362     if (!predicate(args)) return false;
363   }
364   return true;
365 }
366 
CreateStack(ChannelStackBuilder * builder) const367 bool ChannelInit::CreateStack(ChannelStackBuilder* builder) const {
368   const auto& stack_config = stack_configs_[builder->channel_stack_type()];
369   for (const auto& filter : stack_config.filters) {
370     if (!filter.CheckPredicates(builder->channel_args())) continue;
371     builder->AppendFilter(filter.filter);
372   }
373   int found_terminators = 0;
374   for (const auto& terminator : stack_config.terminators) {
375     if (!terminator.CheckPredicates(builder->channel_args())) continue;
376     builder->AppendFilter(terminator.filter);
377     ++found_terminators;
378   }
379   if (found_terminators != 1) {
380     std::string error = absl::StrCat(
381         found_terminators,
382         " terminating filters found creating a channel of type ",
383         grpc_channel_stack_type_string(builder->channel_stack_type()),
384         " with arguments ", builder->channel_args().ToString(),
385         " (we insist upon one and only one terminating "
386         "filter)\n");
387     if (stack_config.terminators.empty()) {
388       absl::StrAppend(&error, "  No terminal filters were registered");
389     } else {
390       for (const auto& terminator : stack_config.terminators) {
391         absl::StrAppend(
392             &error, "  ", NameFromChannelFilter(terminator.filter),
393             " registered @ ", terminator.registration_source.file(), ":",
394             terminator.registration_source.line(), ": enabled = ",
395             terminator.CheckPredicates(builder->channel_args()) ? "true"
396                                                                 : "false",
397             "\n");
398       }
399     }
400     gpr_log(GPR_ERROR, "%s", error.c_str());
401     return false;
402   }
403   for (const auto& post_processor : stack_config.post_processors) {
404     post_processor(*builder);
405   }
406   return true;
407 }
408 
CreateStackSegment(grpc_channel_stack_type type,const ChannelArgs & args) const409 absl::StatusOr<ChannelInit::StackSegment> ChannelInit::CreateStackSegment(
410     grpc_channel_stack_type type, const ChannelArgs& args) const {
411   const auto& stack_config = stack_configs_[type];
412   std::vector<StackSegment::ChannelFilter> filters;
413   size_t channel_data_size = 0;
414   size_t channel_data_alignment = 0;
415   // Based on predicates build a list of filters to include in this segment.
416   for (const auto& filter : stack_config.filters) {
417     if (filter.skip_v3) continue;
418     if (!filter.CheckPredicates(args)) continue;
419     if (filter.vtable == nullptr) {
420       return absl::InvalidArgumentError(
421           absl::StrCat("Filter ", NameFromChannelFilter(filter.filter),
422                        " has no v3-callstack vtable"));
423     }
424     channel_data_alignment =
425         std::max(channel_data_alignment, filter.vtable->alignment);
426     if (channel_data_size % filter.vtable->alignment != 0) {
427       channel_data_size += filter.vtable->alignment -
428                            (channel_data_size % filter.vtable->alignment);
429     }
430     filters.push_back({channel_data_size, filter.vtable});
431     channel_data_size += filter.vtable->size;
432   }
433   // Shortcut for empty segments.
434   if (filters.empty()) return StackSegment();
435   // Allocate memory for the channel data, initialize channel filters into it.
436   uint8_t* p = static_cast<uint8_t*>(
437       gpr_malloc_aligned(channel_data_size, channel_data_alignment));
438   for (size_t i = 0; i < filters.size(); i++) {
439     auto r = filters[i].vtable->init(p + filters[i].offset, args);
440     if (!r.ok()) {
441       for (size_t j = 0; j < i; j++) {
442         filters[j].vtable->destroy(p + filters[j].offset);
443       }
444       gpr_free_aligned(p);
445       return r;
446     }
447   }
448   return StackSegment(std::move(filters), p);
449 }
450 
451 ///////////////////////////////////////////////////////////////////////////////
452 // ChannelInit::StackSegment
453 
StackSegment(std::vector<ChannelFilter> filters,uint8_t * channel_data)454 ChannelInit::StackSegment::StackSegment(std::vector<ChannelFilter> filters,
455                                         uint8_t* channel_data)
456     : data_(MakeRefCounted<ChannelData>(std::move(filters), channel_data)) {}
457 
AddToCallFilterStack(CallFilters::StackBuilder & builder)458 void ChannelInit::StackSegment::AddToCallFilterStack(
459     CallFilters::StackBuilder& builder) {
460   if (data_ == nullptr) return;
461   data_->AddToCallFilterStack(builder);
462   builder.AddOwnedObject(data_);
463 };
464 
ChannelData(std::vector<ChannelFilter> filters,uint8_t * channel_data)465 ChannelInit::StackSegment::ChannelData::ChannelData(
466     std::vector<ChannelFilter> filters, uint8_t* channel_data)
467     : filters_(std::move(filters)), channel_data_(channel_data) {}
468 
AddToCallFilterStack(CallFilters::StackBuilder & builder)469 void ChannelInit::StackSegment::ChannelData::AddToCallFilterStack(
470     CallFilters::StackBuilder& builder) {
471   for (const auto& filter : filters_) {
472     filter.vtable->add_to_stack_builder(channel_data_ + filter.offset, builder);
473   }
474 }
475 
~ChannelData()476 ChannelInit::StackSegment::ChannelData::~ChannelData() {
477   for (const auto& filter : filters_) {
478     filter.vtable->destroy(channel_data_ + filter.offset);
479   }
480   gpr_free_aligned(channel_data_);
481 }
482 
483 }  // namespace grpc_core
484