xref: /aosp_15_r20/external/grpc-grpc/src/core/lib/event_engine/ares_resolver.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2023 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15 
16 #include "src/core/lib/event_engine/ares_resolver.h"
17 
18 #include <string>
19 #include <vector>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 // IWYU pragma: no_include <ares_version.h>
24 // IWYU pragma: no_include <arpa/inet.h>
25 // IWYU pragma: no_include <arpa/nameser.h>
26 // IWYU pragma: no_include <inttypes.h>
27 // IWYU pragma: no_include <netdb.h>
28 // IWYU pragma: no_include <netinet/in.h>
29 // IWYU pragma: no_include <stdlib.h>
30 // IWYU pragma: no_include <sys/socket.h>
31 // IWYU pragma: no_include <ratio>
32 
33 #if GRPC_ARES == 1
34 
35 #include <address_sorting/address_sorting.h>
36 #include <ares.h>
37 
38 #if ARES_VERSION >= 0x011200
39 // c-ares 1.18.0 or later starts to provide ares_nameser.h as a public header.
40 #include <ares_nameser.h>
41 #else
42 #include "src/core/lib/event_engine/nameser.h"  // IWYU pragma: keep
43 #endif
44 
45 #include <string.h>
46 
47 #include <algorithm>
48 #include <chrono>
49 #include <memory>
50 #include <type_traits>
51 #include <utility>
52 
53 #include "absl/functional/any_invocable.h"
54 #include "absl/hash/hash.h"
55 #include "absl/strings/match.h"
56 #include "absl/strings/numbers.h"
57 #include "absl/strings/str_cat.h"
58 #include "absl/strings/str_format.h"
59 #include "absl/types/optional.h"
60 
61 #include <grpc/event_engine/event_engine.h>
62 #include <grpc/support/log.h>
63 
64 #include "src/core/lib/address_utils/parse_address.h"
65 #include "src/core/lib/address_utils/sockaddr_utils.h"
66 #include "src/core/lib/debug/trace.h"
67 #include "src/core/lib/event_engine/grpc_polled_fd.h"
68 #include "src/core/lib/event_engine/time_util.h"
69 #include "src/core/lib/gprpp/debug_location.h"
70 #include "src/core/lib/gprpp/host_port.h"
71 #include "src/core/lib/gprpp/orphanable.h"
72 #include "src/core/lib/gprpp/ref_counted_ptr.h"
73 #include "src/core/lib/iomgr/resolved_address.h"
74 #include "src/core/lib/iomgr/sockaddr.h"
75 #ifdef GRPC_POSIX_SOCKET_ARES_EV_DRIVER
76 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
77 #endif
78 
79 namespace grpc_event_engine {
80 namespace experimental {
81 
82 grpc_core::TraceFlag grpc_trace_ares_resolver(false, "cares_resolver");
83 
84 namespace {
85 
AresStatusToAbslStatus(int status,absl::string_view error_msg)86 absl::Status AresStatusToAbslStatus(int status, absl::string_view error_msg) {
87   switch (status) {
88     case ARES_ECANCELLED:
89       return absl::CancelledError(error_msg);
90     case ARES_ENOTIMP:
91       return absl::UnimplementedError(error_msg);
92     case ARES_ENOTFOUND:
93       return absl::NotFoundError(error_msg);
94     default:
95       return absl::UnknownError(error_msg);
96   }
97 }
98 
99 // An alternative here could be to use ares_timeout to try to be more
100 // accurate, but that would require using "struct timeval"'s, which just
101 // makes things a bit more complicated. So just poll every second, as
102 // suggested by the c-ares code comments.
103 constexpr EventEngine::Duration kAresBackupPollAlarmDuration =
104     std::chrono::seconds(1);
105 
IsIpv6LoopbackAvailable()106 bool IsIpv6LoopbackAvailable() {
107 #ifdef GRPC_POSIX_SOCKET_ARES_EV_DRIVER
108   return PosixSocketWrapper::IsIpv6LoopbackAvailable();
109 #elif defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
110   // TODO(yijiem): implement this for Windows
111   return true;
112 #else
113 #error "Unsupported platform"
114 #endif
115 }
116 
SetRequestDNSServer(absl::string_view dns_server,ares_channel * channel)117 absl::Status SetRequestDNSServer(absl::string_view dns_server,
118                                  ares_channel* channel) {
119   GRPC_ARES_RESOLVER_TRACE_LOG("Using DNS server %s", dns_server.data());
120   grpc_resolved_address addr;
121   struct ares_addr_port_node dns_server_addr = {};
122   if (grpc_parse_ipv4_hostport(dns_server, &addr, /*log_errors=*/false)) {
123     dns_server_addr.family = AF_INET;
124     struct sockaddr_in* in = reinterpret_cast<struct sockaddr_in*>(addr.addr);
125     memcpy(&dns_server_addr.addr.addr4, &in->sin_addr, sizeof(struct in_addr));
126     dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr);
127     dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr);
128   } else if (grpc_parse_ipv6_hostport(dns_server, &addr,
129                                       /*log_errors=*/false)) {
130     dns_server_addr.family = AF_INET6;
131     struct sockaddr_in6* in6 =
132         reinterpret_cast<struct sockaddr_in6*>(addr.addr);
133     memcpy(&dns_server_addr.addr.addr6, &in6->sin6_addr,
134            sizeof(struct in6_addr));
135     dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr);
136     dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr);
137   } else {
138     return absl::InvalidArgumentError(
139         absl::StrCat("Cannot parse authority: ", dns_server));
140   }
141   int status = ares_set_servers_ports(*channel, &dns_server_addr);
142   if (status != ARES_SUCCESS) {
143     return AresStatusToAbslStatus(status, ares_strerror(status));
144   }
145   return absl::OkStatus();
146 }
147 
SortAddresses(const std::vector<EventEngine::ResolvedAddress> & addresses)148 std::vector<EventEngine::ResolvedAddress> SortAddresses(
149     const std::vector<EventEngine::ResolvedAddress>& addresses) {
150   address_sorting_sortable* sortables = static_cast<address_sorting_sortable*>(
151       gpr_zalloc(sizeof(address_sorting_sortable) * addresses.size()));
152   for (size_t i = 0; i < addresses.size(); i++) {
153     sortables[i].user_data =
154         const_cast<EventEngine::ResolvedAddress*>(&addresses[i]);
155     memcpy(&sortables[i].dest_addr.addr, addresses[i].address(),
156            addresses[i].size());
157     sortables[i].dest_addr.len = addresses[i].size();
158   }
159   address_sorting_rfc_6724_sort(sortables, addresses.size());
160   std::vector<EventEngine::ResolvedAddress> sorted_addresses;
161   sorted_addresses.reserve(addresses.size());
162   for (size_t i = 0; i < addresses.size(); ++i) {
163     sorted_addresses.emplace_back(
164         *static_cast<EventEngine::ResolvedAddress*>(sortables[i].user_data));
165   }
166   gpr_free(sortables);
167   return sorted_addresses;
168 }
169 
170 struct QueryArg {
QueryArggrpc_event_engine::experimental::__anon6450d0860111::QueryArg171   QueryArg(AresResolver* ar, int id, absl::string_view name)
172       : ares_resolver(ar), callback_map_id(id), query_name(name) {}
173   AresResolver* ares_resolver;
174   int callback_map_id;
175   std::string query_name;
176 };
177 
178 struct HostnameQueryArg : public QueryArg {
HostnameQueryArggrpc_event_engine::experimental::__anon6450d0860111::HostnameQueryArg179   HostnameQueryArg(AresResolver* ar, int id, absl::string_view name, int p)
180       : QueryArg(ar, id, name), port(p) {}
181   int port;
182   int pending_requests;
183   absl::Status error_status;
184   std::vector<EventEngine::ResolvedAddress> result;
185 };
186 
187 }  // namespace
188 
189 absl::StatusOr<grpc_core::OrphanablePtr<AresResolver>>
CreateAresResolver(absl::string_view dns_server,std::unique_ptr<GrpcPolledFdFactory> polled_fd_factory,std::shared_ptr<EventEngine> event_engine)190 AresResolver::CreateAresResolver(
191     absl::string_view dns_server,
192     std::unique_ptr<GrpcPolledFdFactory> polled_fd_factory,
193     std::shared_ptr<EventEngine> event_engine) {
194   ares_options opts = {};
195   opts.flags |= ARES_FLAG_STAYOPEN;
196   if (g_event_engine_grpc_ares_test_only_force_tcp) {
197     opts.flags |= ARES_FLAG_USEVC;
198   }
199   ares_channel channel;
200   int status = ares_init_options(&channel, &opts, ARES_OPT_FLAGS);
201   if (status != ARES_SUCCESS) {
202     gpr_log(GPR_ERROR, "ares_init_options failed, status: %d", status);
203     return AresStatusToAbslStatus(
204         status,
205         absl::StrCat("Failed to init c-ares channel: ", ares_strerror(status)));
206   }
207   event_engine_grpc_ares_test_only_inject_config(&channel);
208   polled_fd_factory->ConfigureAresChannelLocked(channel);
209   if (!dns_server.empty()) {
210     absl::Status status = SetRequestDNSServer(dns_server, &channel);
211     if (!status.ok()) {
212       return status;
213     }
214   }
215   return grpc_core::MakeOrphanable<AresResolver>(
216       std::move(polled_fd_factory), std::move(event_engine), channel);
217 }
218 
AresResolver(std::unique_ptr<GrpcPolledFdFactory> polled_fd_factory,std::shared_ptr<EventEngine> event_engine,ares_channel channel)219 AresResolver::AresResolver(
220     std::unique_ptr<GrpcPolledFdFactory> polled_fd_factory,
221     std::shared_ptr<EventEngine> event_engine, ares_channel channel)
222     : RefCountedDNSResolverInterface(
223           GRPC_TRACE_FLAG_ENABLED(grpc_trace_ares_resolver) ? "AresResolver"
224                                                             : nullptr),
225       channel_(channel),
226       polled_fd_factory_(std::move(polled_fd_factory)),
227       event_engine_(std::move(event_engine)) {
228   polled_fd_factory_->Initialize(&mutex_, event_engine_.get());
229 }
230 
~AresResolver()231 AresResolver::~AresResolver() {
232   GPR_ASSERT(fd_node_list_.empty());
233   GPR_ASSERT(callback_map_.empty());
234   ares_destroy(channel_);
235 }
236 
Orphan()237 void AresResolver::Orphan() {
238   {
239     grpc_core::MutexLock lock(&mutex_);
240     shutting_down_ = true;
241     if (ares_backup_poll_alarm_handle_.has_value()) {
242       event_engine_->Cancel(*ares_backup_poll_alarm_handle_);
243       ares_backup_poll_alarm_handle_.reset();
244     }
245     for (const auto& fd_node : fd_node_list_) {
246       if (!fd_node->already_shutdown) {
247         GRPC_ARES_RESOLVER_TRACE_LOG("resolver: %p shutdown fd: %s", this,
248                                      fd_node->polled_fd->GetName());
249         GPR_ASSERT(fd_node->polled_fd->ShutdownLocked(
250             absl::CancelledError("AresResolver::Orphan")));
251         fd_node->already_shutdown = true;
252       }
253     }
254   }
255   Unref(DEBUG_LOCATION, "Orphan");
256 }
257 
LookupHostname(EventEngine::DNSResolver::LookupHostnameCallback callback,absl::string_view name,absl::string_view default_port)258 void AresResolver::LookupHostname(
259     EventEngine::DNSResolver::LookupHostnameCallback callback,
260     absl::string_view name, absl::string_view default_port) {
261   absl::string_view host;
262   absl::string_view port_string;
263   if (!grpc_core::SplitHostPort(name, &host, &port_string)) {
264     event_engine_->Run(
265         [callback = std::move(callback),
266          status = absl::InvalidArgumentError(absl::StrCat(
267              "Unparseable name: ", name))]() mutable { callback(status); });
268     return;
269   }
270   if (host.empty()) {
271     event_engine_->Run([callback = std::move(callback),
272                         status = absl::InvalidArgumentError(absl::StrCat(
273                             "host must not be empty in name: ",
274                             name))]() mutable { callback(status); });
275     return;
276   }
277   if (port_string.empty()) {
278     if (default_port.empty()) {
279       event_engine_->Run([callback = std::move(callback),
280                           status = absl::InvalidArgumentError(absl::StrFormat(
281                               "No port in name %s or default_port argument",
282                               name))]() mutable { callback(status); });
283       return;
284     }
285     port_string = default_port;
286   }
287   int port = 0;
288   if (port_string == "http") {
289     port = 80;
290   } else if (port_string == "https") {
291     port = 443;
292   } else if (!absl::SimpleAtoi(port_string, &port)) {
293     event_engine_->Run([callback = std::move(callback),
294                         status = absl::InvalidArgumentError(absl::StrCat(
295                             "Failed to parse port in name: ",
296                             name))]() mutable { callback(status); });
297     return;
298   }
299   // TODO(yijiem): Change this when refactoring code in
300   // src/core/lib/address_utils to use EventEngine::ResolvedAddress.
301   grpc_resolved_address addr;
302   const std::string hostport = grpc_core::JoinHostPort(host, port);
303   if (grpc_parse_ipv4_hostport(hostport.c_str(), &addr,
304                                false /* log errors */) ||
305       grpc_parse_ipv6_hostport(hostport.c_str(), &addr,
306                                false /* log errors */)) {
307     // Early out if the target is an ipv4 or ipv6 literal.
308     std::vector<EventEngine::ResolvedAddress> result;
309     result.emplace_back(reinterpret_cast<sockaddr*>(addr.addr), addr.len);
310     event_engine_->Run(
311         [callback = std::move(callback), result = std::move(result)]() mutable {
312           callback(std::move(result));
313         });
314     return;
315   }
316   grpc_core::MutexLock lock(&mutex_);
317   callback_map_.emplace(++id_, std::move(callback));
318   auto* resolver_arg = new HostnameQueryArg(this, id_, name, port);
319   if (IsIpv6LoopbackAvailable()) {
320     // Note that using AF_UNSPEC for both IPv6 and IPv4 queries does not work in
321     // all cases, e.g. for localhost:<> it only gets back the IPv6 result (i.e.
322     // ::1).
323     resolver_arg->pending_requests = 2;
324     ares_gethostbyname(channel_, std::string(host).c_str(), AF_INET,
325                        &AresResolver::OnHostbynameDoneLocked, resolver_arg);
326     ares_gethostbyname(channel_, std::string(host).c_str(), AF_INET6,
327                        &AresResolver::OnHostbynameDoneLocked, resolver_arg);
328   } else {
329     resolver_arg->pending_requests = 1;
330     ares_gethostbyname(channel_, std::string(host).c_str(), AF_INET,
331                        &AresResolver::OnHostbynameDoneLocked, resolver_arg);
332   }
333   CheckSocketsLocked();
334   MaybeStartTimerLocked();
335 }
336 
LookupSRV(EventEngine::DNSResolver::LookupSRVCallback callback,absl::string_view name)337 void AresResolver::LookupSRV(
338     EventEngine::DNSResolver::LookupSRVCallback callback,
339     absl::string_view name) {
340   absl::string_view host;
341   absl::string_view port;
342   if (!grpc_core::SplitHostPort(name, &host, &port)) {
343     event_engine_->Run(
344         [callback = std::move(callback),
345          status = absl::InvalidArgumentError(absl::StrCat(
346              "Unparseable name: ", name))]() mutable { callback(status); });
347     return;
348   }
349   if (host.empty()) {
350     event_engine_->Run([callback = std::move(callback),
351                         status = absl::InvalidArgumentError(absl::StrCat(
352                             "host must not be empty in name: ",
353                             name))]() mutable { callback(status); });
354     return;
355   }
356   // Don't query for SRV records if the target is "localhost"
357   if (absl::EqualsIgnoreCase(host, "localhost")) {
358     event_engine_->Run([callback = std::move(callback)]() mutable {
359       callback(std::vector<EventEngine::DNSResolver::SRVRecord>());
360     });
361     return;
362   }
363   grpc_core::MutexLock lock(&mutex_);
364   callback_map_.emplace(++id_, std::move(callback));
365   auto* resolver_arg = new QueryArg(this, id_, host);
366   ares_query(channel_, std::string(host).c_str(), ns_c_in, ns_t_srv,
367              &AresResolver::OnSRVQueryDoneLocked, resolver_arg);
368   CheckSocketsLocked();
369   MaybeStartTimerLocked();
370 }
371 
LookupTXT(EventEngine::DNSResolver::LookupTXTCallback callback,absl::string_view name)372 void AresResolver::LookupTXT(
373     EventEngine::DNSResolver::LookupTXTCallback callback,
374     absl::string_view name) {
375   absl::string_view host;
376   absl::string_view port;
377   if (!grpc_core::SplitHostPort(name, &host, &port)) {
378     event_engine_->Run(
379         [callback = std::move(callback),
380          status = absl::InvalidArgumentError(absl::StrCat(
381              "Unparseable name: ", name))]() mutable { callback(status); });
382     return;
383   }
384   if (host.empty()) {
385     event_engine_->Run([callback = std::move(callback),
386                         status = absl::InvalidArgumentError(absl::StrCat(
387                             "host must not be empty in name: ",
388                             name))]() mutable { callback(status); });
389     return;
390   }
391   // Don't query for TXT records if the target is "localhost"
392   if (absl::EqualsIgnoreCase(host, "localhost")) {
393     event_engine_->Run([callback = std::move(callback)]() mutable {
394       callback(std::vector<std::string>());
395     });
396     return;
397   }
398   grpc_core::MutexLock lock(&mutex_);
399   callback_map_.emplace(++id_, std::move(callback));
400   auto* resolver_arg = new QueryArg(this, id_, host);
401   ares_search(channel_, std::string(host).c_str(), ns_c_in, ns_t_txt,
402               &AresResolver::OnTXTDoneLocked, resolver_arg);
403   CheckSocketsLocked();
404   MaybeStartTimerLocked();
405 }
406 
CheckSocketsLocked()407 void AresResolver::CheckSocketsLocked() {
408   FdNodeList new_list;
409   if (!shutting_down_) {
410     ares_socket_t socks[ARES_GETSOCK_MAXNUM] = {};
411     int socks_bitmask = ares_getsock(channel_, socks, ARES_GETSOCK_MAXNUM);
412     for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
413       if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
414           ARES_GETSOCK_WRITABLE(socks_bitmask, i)) {
415         auto iter = std::find_if(
416             fd_node_list_.begin(), fd_node_list_.end(),
417             [sock = socks[i]](const auto& node) { return node->as == sock; });
418         if (iter == fd_node_list_.end()) {
419           GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p new fd: %d", this,
420                                        socks[i]);
421           new_list.push_back(std::make_unique<FdNode>(
422               socks[i], polled_fd_factory_->NewGrpcPolledFdLocked(socks[i])));
423         } else {
424           new_list.splice(new_list.end(), fd_node_list_, iter);
425         }
426         FdNode* fd_node = new_list.back().get();
427         if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
428             !fd_node->readable_registered) {
429           fd_node->readable_registered = true;
430           if (fd_node->polled_fd->IsFdStillReadableLocked()) {
431             // If c-ares is interested to read and the socket already has data
432             // available for read, schedules OnReadable directly here. This is
433             // to cope with the edge-triggered poller not getting an event if no
434             // new data arrives and c-ares hasn't read all the data in the
435             // previous ares_process_fd.
436             GRPC_ARES_RESOLVER_TRACE_LOG(
437                 "resolver:%p schedule read directly on: %d", this, fd_node->as);
438             event_engine_->Run(
439                 [self = Ref(DEBUG_LOCATION, "CheckSocketsLocked"),
440                  fd_node]() mutable {
441                   static_cast<AresResolver*>(self.get())
442                       ->OnReadable(fd_node, absl::OkStatus());
443                 });
444           } else {
445             // Otherwise register with the poller for readable event.
446             GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p notify read on: %d", this,
447                                          fd_node->as);
448             fd_node->polled_fd->RegisterForOnReadableLocked(
449                 [self = Ref(DEBUG_LOCATION, "CheckSocketsLocked"),
450                  fd_node](absl::Status status) mutable {
451                   static_cast<AresResolver*>(self.get())
452                       ->OnReadable(fd_node, status);
453                 });
454           }
455         }
456         // Register write_closure if the socket is writable and write_closure
457         // has not been registered with this socket.
458         if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
459             !fd_node->writable_registered) {
460           GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p notify write on: %d", this,
461                                        fd_node->as);
462           fd_node->writable_registered = true;
463           fd_node->polled_fd->RegisterForOnWriteableLocked(
464               [self = Ref(DEBUG_LOCATION, "CheckSocketsLocked"),
465                fd_node](absl::Status status) mutable {
466                 static_cast<AresResolver*>(self.get())
467                     ->OnWritable(fd_node, status);
468               });
469         }
470       }
471     }
472   }
473   // Any remaining fds in fd_node_list_ were not returned by ares_getsock()
474   // and are therefore no longer in use, so they can be shut down and removed
475   // from the list.
476   // TODO(yijiem): Since we are keeping the underlying socket opened for both
477   // Posix and Windows, it might be reasonable to also keep the FdNodes alive
478   // till the end. But we need to change the state management of FdNodes in this
479   // file. This may simplify the code a bit.
480   while (!fd_node_list_.empty()) {
481     FdNode* fd_node = fd_node_list_.front().get();
482     if (!fd_node->already_shutdown) {
483       GRPC_ARES_RESOLVER_TRACE_LOG("resolver: %p shutdown fd: %s", this,
484                                    fd_node->polled_fd->GetName());
485       fd_node->already_shutdown =
486           fd_node->polled_fd->ShutdownLocked(absl::OkStatus());
487     }
488     if (!fd_node->readable_registered && !fd_node->writable_registered) {
489       GRPC_ARES_RESOLVER_TRACE_LOG("resolver: %p delete fd: %s", this,
490                                    fd_node->polled_fd->GetName());
491       fd_node_list_.pop_front();
492     } else {
493       new_list.splice(new_list.end(), fd_node_list_, fd_node_list_.begin());
494     }
495   }
496   fd_node_list_ = std::move(new_list);
497 }
498 
MaybeStartTimerLocked()499 void AresResolver::MaybeStartTimerLocked() {
500   if (ares_backup_poll_alarm_handle_.has_value()) {
501     return;
502   }
503   // Initialize the backup poll alarm
504   GRPC_ARES_RESOLVER_TRACE_LOG(
505       "request:%p MaybeStartTimerLocked next ares process poll time in %zu ms",
506       this, Milliseconds(kAresBackupPollAlarmDuration));
507   ares_backup_poll_alarm_handle_ = event_engine_->RunAfter(
508       kAresBackupPollAlarmDuration,
509       [self = Ref(DEBUG_LOCATION, "MaybeStartTimerLocked")]() {
510         static_cast<AresResolver*>(self.get())->OnAresBackupPollAlarm();
511       });
512 }
513 
OnReadable(FdNode * fd_node,absl::Status status)514 void AresResolver::OnReadable(FdNode* fd_node, absl::Status status) {
515   grpc_core::MutexLock lock(&mutex_);
516   GPR_ASSERT(fd_node->readable_registered);
517   fd_node->readable_registered = false;
518   GRPC_ARES_RESOLVER_TRACE_LOG("OnReadable: fd: %d; request: %p; status: %s",
519                                fd_node->as, this, status.ToString().c_str());
520   if (status.ok() && !shutting_down_) {
521     ares_process_fd(channel_, fd_node->as, ARES_SOCKET_BAD);
522   } else {
523     // If error is not absl::OkStatus() or the resolution was cancelled, it
524     // means the fd has been shutdown or timed out. The pending lookups made
525     // on this request will be cancelled by the following ares_cancel(). The
526     // remaining file descriptors in this request will be cleaned up in the
527     // following Work() method.
528     ares_cancel(channel_);
529   }
530   CheckSocketsLocked();
531 }
532 
OnWritable(FdNode * fd_node,absl::Status status)533 void AresResolver::OnWritable(FdNode* fd_node, absl::Status status) {
534   grpc_core::MutexLock lock(&mutex_);
535   GPR_ASSERT(fd_node->writable_registered);
536   fd_node->writable_registered = false;
537   GRPC_ARES_RESOLVER_TRACE_LOG("OnWritable: fd: %d; request:%p; status: %s",
538                                fd_node->as, this, status.ToString().c_str());
539   if (status.ok() && !shutting_down_) {
540     ares_process_fd(channel_, ARES_SOCKET_BAD, fd_node->as);
541   } else {
542     // If error is not absl::OkStatus() or the resolution was cancelled, it
543     // means the fd has been shutdown or timed out. The pending lookups made
544     // on this request will be cancelled by the following ares_cancel(). The
545     // remaining file descriptors in this request will be cleaned up in the
546     // following Work() method.
547     ares_cancel(channel_);
548   }
549   CheckSocketsLocked();
550 }
551 
552 // In case of non-responsive DNS servers, dropped packets, etc., c-ares has
553 // intelligent timeout and retry logic, which we can take advantage of by
554 // polling ares_process_fd on time intervals. Overall, the c-ares library is
555 // meant to be called into and given a chance to proceed name resolution:
556 //   a) when fd events happen
557 //   b) when some time has passed without fd events having happened
558 // For the latter, we use this backup poller. Also see
559 // https://github.com/grpc/grpc/pull/17688 description for more details.
OnAresBackupPollAlarm()560 void AresResolver::OnAresBackupPollAlarm() {
561   grpc_core::MutexLock lock(&mutex_);
562   ares_backup_poll_alarm_handle_.reset();
563   GRPC_ARES_RESOLVER_TRACE_LOG(
564       "request:%p OnAresBackupPollAlarm shutting_down=%d.", this,
565       shutting_down_);
566   if (!shutting_down_) {
567     for (const auto& fd_node : fd_node_list_) {
568       if (!fd_node->already_shutdown) {
569         GRPC_ARES_RESOLVER_TRACE_LOG(
570             "request:%p OnAresBackupPollAlarm; ares_process_fd. fd=%s", this,
571             fd_node->polled_fd->GetName());
572         ares_socket_t as = fd_node->polled_fd->GetWrappedAresSocketLocked();
573         ares_process_fd(channel_, as, as);
574       }
575     }
576     MaybeStartTimerLocked();
577     CheckSocketsLocked();
578   }
579 }
580 
OnHostbynameDoneLocked(void * arg,int status,int,struct hostent * hostent)581 void AresResolver::OnHostbynameDoneLocked(void* arg, int status,
582                                           int /*timeouts*/,
583                                           struct hostent* hostent) {
584   auto* hostname_qa = static_cast<HostnameQueryArg*>(arg);
585   GPR_ASSERT(hostname_qa->pending_requests-- > 0);
586   auto* ares_resolver = hostname_qa->ares_resolver;
587   if (status != ARES_SUCCESS) {
588     std::string error_msg =
589         absl::StrFormat("address lookup failed for %s: %s",
590                         hostname_qa->query_name, ares_strerror(status));
591     GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p OnHostbynameDoneLocked: %s",
592                                  ares_resolver, error_msg.c_str());
593     hostname_qa->error_status = AresStatusToAbslStatus(status, error_msg);
594   } else {
595     GRPC_ARES_RESOLVER_TRACE_LOG(
596         "resolver:%p OnHostbynameDoneLocked name=%s ARES_SUCCESS",
597         ares_resolver, hostname_qa->query_name.c_str());
598     for (size_t i = 0; hostent->h_addr_list[i] != nullptr; i++) {
599       switch (hostent->h_addrtype) {
600         case AF_INET6: {
601           size_t addr_len = sizeof(struct sockaddr_in6);
602           struct sockaddr_in6 addr;
603           memset(&addr, 0, addr_len);
604           memcpy(&addr.sin6_addr, hostent->h_addr_list[i],
605                  sizeof(struct in6_addr));
606           addr.sin6_family = static_cast<unsigned char>(hostent->h_addrtype);
607           addr.sin6_port = htons(hostname_qa->port);
608           hostname_qa->result.emplace_back(
609               reinterpret_cast<const sockaddr*>(&addr), addr_len);
610           char output[INET6_ADDRSTRLEN];
611           ares_inet_ntop(AF_INET6, &addr.sin6_addr, output, INET6_ADDRSTRLEN);
612           GRPC_ARES_RESOLVER_TRACE_LOG(
613               "resolver:%p c-ares resolver gets a AF_INET6 result: \n"
614               "  addr: %s\n  port: %d\n  sin6_scope_id: %d\n",
615               ares_resolver, output, hostname_qa->port, addr.sin6_scope_id);
616           break;
617         }
618         case AF_INET: {
619           size_t addr_len = sizeof(struct sockaddr_in);
620           struct sockaddr_in addr;
621           memset(&addr, 0, addr_len);
622           memcpy(&addr.sin_addr, hostent->h_addr_list[i],
623                  sizeof(struct in_addr));
624           addr.sin_family = static_cast<unsigned char>(hostent->h_addrtype);
625           addr.sin_port = htons(hostname_qa->port);
626           hostname_qa->result.emplace_back(
627               reinterpret_cast<const sockaddr*>(&addr), addr_len);
628           char output[INET_ADDRSTRLEN];
629           ares_inet_ntop(AF_INET, &addr.sin_addr, output, INET_ADDRSTRLEN);
630           GRPC_ARES_RESOLVER_TRACE_LOG(
631               "resolver:%p c-ares resolver gets a AF_INET result: \n"
632               "  addr: %s\n  port: %d\n",
633               ares_resolver, output, hostname_qa->port);
634           break;
635         }
636         default:
637           grpc_core::Crash(
638               absl::StrFormat("resolver:%p Received invalid type of address %d",
639                               ares_resolver, hostent->h_addrtype));
640       }
641     }
642   }
643   if (hostname_qa->pending_requests == 0) {
644     auto nh =
645         ares_resolver->callback_map_.extract(hostname_qa->callback_map_id);
646     GPR_ASSERT(!nh.empty());
647     GPR_ASSERT(absl::holds_alternative<
648                EventEngine::DNSResolver::LookupHostnameCallback>(nh.mapped()));
649     auto callback = absl::get<EventEngine::DNSResolver::LookupHostnameCallback>(
650         std::move(nh.mapped()));
651     if (!hostname_qa->result.empty() || hostname_qa->error_status.ok()) {
652       ares_resolver->event_engine_->Run(
653           [callback = std::move(callback),
654            result = SortAddresses(hostname_qa->result)]() mutable {
655             callback(std::move(result));
656           });
657     } else {
658       ares_resolver->event_engine_->Run(
659           [callback = std::move(callback),
660            result = std::move(hostname_qa->error_status)]() mutable {
661             callback(std::move(result));
662           });
663     }
664     delete hostname_qa;
665   }
666 }
667 
OnSRVQueryDoneLocked(void * arg,int status,int,unsigned char * abuf,int alen)668 void AresResolver::OnSRVQueryDoneLocked(void* arg, int status, int /*timeouts*/,
669                                         unsigned char* abuf, int alen) {
670   std::unique_ptr<QueryArg> qa(static_cast<QueryArg*>(arg));
671   auto* ares_resolver = qa->ares_resolver;
672   auto nh = ares_resolver->callback_map_.extract(qa->callback_map_id);
673   GPR_ASSERT(!nh.empty());
674   GPR_ASSERT(
675       absl::holds_alternative<EventEngine::DNSResolver::LookupSRVCallback>(
676           nh.mapped()));
677   auto callback = absl::get<EventEngine::DNSResolver::LookupSRVCallback>(
678       std::move(nh.mapped()));
679   auto fail = [&](absl::string_view prefix) {
680     std::string error_message = absl::StrFormat(
681         "%s for %s: %s", prefix, qa->query_name, ares_strerror(status));
682     GRPC_ARES_RESOLVER_TRACE_LOG("OnSRVQueryDoneLocked: %s",
683                                  error_message.c_str());
684     ares_resolver->event_engine_->Run(
685         [callback = std::move(callback),
686          status = AresStatusToAbslStatus(status, error_message)]() mutable {
687           callback(status);
688         });
689   };
690   if (status != ARES_SUCCESS) {
691     fail("SRV lookup failed");
692     return;
693   }
694   GRPC_ARES_RESOLVER_TRACE_LOG(
695       "resolver:%p OnSRVQueryDoneLocked name=%s ARES_SUCCESS", ares_resolver,
696       qa->query_name.c_str());
697   struct ares_srv_reply* reply = nullptr;
698   status = ares_parse_srv_reply(abuf, alen, &reply);
699   GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p ares_parse_srv_reply: %d",
700                                ares_resolver, status);
701   if (status != ARES_SUCCESS) {
702     fail("Failed to parse SRV reply");
703     return;
704   }
705   std::vector<EventEngine::DNSResolver::SRVRecord> result;
706   for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr;
707        srv_it = srv_it->next) {
708     EventEngine::DNSResolver::SRVRecord record;
709     record.host = srv_it->host;
710     record.port = srv_it->port;
711     record.priority = srv_it->priority;
712     record.weight = srv_it->weight;
713     result.push_back(std::move(record));
714   }
715   if (reply != nullptr) {
716     ares_free_data(reply);
717   }
718   ares_resolver->event_engine_->Run(
719       [callback = std::move(callback), result = std::move(result)]() mutable {
720         callback(std::move(result));
721       });
722 }
723 
OnTXTDoneLocked(void * arg,int status,int,unsigned char * buf,int len)724 void AresResolver::OnTXTDoneLocked(void* arg, int status, int /*timeouts*/,
725                                    unsigned char* buf, int len) {
726   std::unique_ptr<QueryArg> qa(static_cast<QueryArg*>(arg));
727   auto* ares_resolver = qa->ares_resolver;
728   auto nh = ares_resolver->callback_map_.extract(qa->callback_map_id);
729   GPR_ASSERT(!nh.empty());
730   GPR_ASSERT(
731       absl::holds_alternative<EventEngine::DNSResolver::LookupTXTCallback>(
732           nh.mapped()));
733   auto callback = absl::get<EventEngine::DNSResolver::LookupTXTCallback>(
734       std::move(nh.mapped()));
735   auto fail = [&](absl::string_view prefix) {
736     std::string error_message = absl::StrFormat(
737         "%s for %s: %s", prefix, qa->query_name, ares_strerror(status));
738     GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p OnTXTDoneLocked: %s",
739                                  ares_resolver, error_message.c_str());
740     ares_resolver->event_engine_->Run(
741         [callback = std::move(callback),
742          status = AresStatusToAbslStatus(status, error_message)]() mutable {
743           callback(status);
744         });
745   };
746   if (status != ARES_SUCCESS) {
747     fail("TXT lookup failed");
748     return;
749   }
750   GRPC_ARES_RESOLVER_TRACE_LOG(
751       "resolver:%p OnTXTDoneLocked name=%s ARES_SUCCESS", ares_resolver,
752       qa->query_name.c_str());
753   struct ares_txt_ext* reply = nullptr;
754   status = ares_parse_txt_reply_ext(buf, len, &reply);
755   if (status != ARES_SUCCESS) {
756     fail("Failed to parse TXT result");
757     return;
758   }
759   std::vector<std::string> result;
760   for (struct ares_txt_ext* part = reply; part != nullptr; part = part->next) {
761     if (part->record_start) {
762       result.emplace_back(reinterpret_cast<char*>(part->txt), part->length);
763     } else {
764       absl::StrAppend(
765           &result.back(),
766           std::string(reinterpret_cast<char*>(part->txt), part->length));
767     }
768   }
769   GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p Got %zu TXT records", ares_resolver,
770                                result.size());
771   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_ares_resolver)) {
772     for (const auto& record : result) {
773       gpr_log(GPR_INFO, "%s", record.c_str());
774     }
775   }
776   // Clean up.
777   ares_free_data(reply);
778   ares_resolver->event_engine_->Run(
779       [callback = std::move(callback), result = std::move(result)]() mutable {
780         callback(std::move(result));
781       });
782 }
783 
784 }  // namespace experimental
785 }  // namespace grpc_event_engine
786 
noop_inject_channel_config(ares_channel *)787 void noop_inject_channel_config(ares_channel* /*channel*/) {}
788 
789 void (*event_engine_grpc_ares_test_only_inject_config)(ares_channel* channel) =
790     noop_inject_channel_config;
791 
792 bool g_event_engine_grpc_ares_test_only_force_tcp = false;
793 
794 #endif  // GRPC_ARES == 1
795