1 // Copyright 2023 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include <grpc/support/port_platform.h>
15
16 #include "src/core/lib/event_engine/ares_resolver.h"
17
18 #include <string>
19 #include <vector>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 // IWYU pragma: no_include <ares_version.h>
24 // IWYU pragma: no_include <arpa/inet.h>
25 // IWYU pragma: no_include <arpa/nameser.h>
26 // IWYU pragma: no_include <inttypes.h>
27 // IWYU pragma: no_include <netdb.h>
28 // IWYU pragma: no_include <netinet/in.h>
29 // IWYU pragma: no_include <stdlib.h>
30 // IWYU pragma: no_include <sys/socket.h>
31 // IWYU pragma: no_include <ratio>
32
33 #if GRPC_ARES == 1
34
35 #include <address_sorting/address_sorting.h>
36 #include <ares.h>
37
38 #if ARES_VERSION >= 0x011200
39 // c-ares 1.18.0 or later starts to provide ares_nameser.h as a public header.
40 #include <ares_nameser.h>
41 #else
42 #include "src/core/lib/event_engine/nameser.h" // IWYU pragma: keep
43 #endif
44
45 #include <string.h>
46
47 #include <algorithm>
48 #include <chrono>
49 #include <memory>
50 #include <type_traits>
51 #include <utility>
52
53 #include "absl/functional/any_invocable.h"
54 #include "absl/hash/hash.h"
55 #include "absl/strings/match.h"
56 #include "absl/strings/numbers.h"
57 #include "absl/strings/str_cat.h"
58 #include "absl/strings/str_format.h"
59 #include "absl/types/optional.h"
60
61 #include <grpc/event_engine/event_engine.h>
62 #include <grpc/support/log.h>
63
64 #include "src/core/lib/address_utils/parse_address.h"
65 #include "src/core/lib/address_utils/sockaddr_utils.h"
66 #include "src/core/lib/debug/trace.h"
67 #include "src/core/lib/event_engine/grpc_polled_fd.h"
68 #include "src/core/lib/event_engine/time_util.h"
69 #include "src/core/lib/gprpp/debug_location.h"
70 #include "src/core/lib/gprpp/host_port.h"
71 #include "src/core/lib/gprpp/orphanable.h"
72 #include "src/core/lib/gprpp/ref_counted_ptr.h"
73 #include "src/core/lib/iomgr/resolved_address.h"
74 #include "src/core/lib/iomgr/sockaddr.h"
75 #ifdef GRPC_POSIX_SOCKET_ARES_EV_DRIVER
76 #include "src/core/lib/event_engine/posix_engine/tcp_socket_utils.h"
77 #endif
78
79 namespace grpc_event_engine {
80 namespace experimental {
81
82 grpc_core::TraceFlag grpc_trace_ares_resolver(false, "cares_resolver");
83
84 namespace {
85
AresStatusToAbslStatus(int status,absl::string_view error_msg)86 absl::Status AresStatusToAbslStatus(int status, absl::string_view error_msg) {
87 switch (status) {
88 case ARES_ECANCELLED:
89 return absl::CancelledError(error_msg);
90 case ARES_ENOTIMP:
91 return absl::UnimplementedError(error_msg);
92 case ARES_ENOTFOUND:
93 return absl::NotFoundError(error_msg);
94 default:
95 return absl::UnknownError(error_msg);
96 }
97 }
98
99 // An alternative here could be to use ares_timeout to try to be more
100 // accurate, but that would require using "struct timeval"'s, which just
101 // makes things a bit more complicated. So just poll every second, as
102 // suggested by the c-ares code comments.
103 constexpr EventEngine::Duration kAresBackupPollAlarmDuration =
104 std::chrono::seconds(1);
105
IsIpv6LoopbackAvailable()106 bool IsIpv6LoopbackAvailable() {
107 #ifdef GRPC_POSIX_SOCKET_ARES_EV_DRIVER
108 return PosixSocketWrapper::IsIpv6LoopbackAvailable();
109 #elif defined(GRPC_WINDOWS_SOCKET_ARES_EV_DRIVER)
110 // TODO(yijiem): implement this for Windows
111 return true;
112 #else
113 #error "Unsupported platform"
114 #endif
115 }
116
SetRequestDNSServer(absl::string_view dns_server,ares_channel * channel)117 absl::Status SetRequestDNSServer(absl::string_view dns_server,
118 ares_channel* channel) {
119 GRPC_ARES_RESOLVER_TRACE_LOG("Using DNS server %s", dns_server.data());
120 grpc_resolved_address addr;
121 struct ares_addr_port_node dns_server_addr = {};
122 if (grpc_parse_ipv4_hostport(dns_server, &addr, /*log_errors=*/false)) {
123 dns_server_addr.family = AF_INET;
124 struct sockaddr_in* in = reinterpret_cast<struct sockaddr_in*>(addr.addr);
125 memcpy(&dns_server_addr.addr.addr4, &in->sin_addr, sizeof(struct in_addr));
126 dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr);
127 dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr);
128 } else if (grpc_parse_ipv6_hostport(dns_server, &addr,
129 /*log_errors=*/false)) {
130 dns_server_addr.family = AF_INET6;
131 struct sockaddr_in6* in6 =
132 reinterpret_cast<struct sockaddr_in6*>(addr.addr);
133 memcpy(&dns_server_addr.addr.addr6, &in6->sin6_addr,
134 sizeof(struct in6_addr));
135 dns_server_addr.tcp_port = grpc_sockaddr_get_port(&addr);
136 dns_server_addr.udp_port = grpc_sockaddr_get_port(&addr);
137 } else {
138 return absl::InvalidArgumentError(
139 absl::StrCat("Cannot parse authority: ", dns_server));
140 }
141 int status = ares_set_servers_ports(*channel, &dns_server_addr);
142 if (status != ARES_SUCCESS) {
143 return AresStatusToAbslStatus(status, ares_strerror(status));
144 }
145 return absl::OkStatus();
146 }
147
SortAddresses(const std::vector<EventEngine::ResolvedAddress> & addresses)148 std::vector<EventEngine::ResolvedAddress> SortAddresses(
149 const std::vector<EventEngine::ResolvedAddress>& addresses) {
150 address_sorting_sortable* sortables = static_cast<address_sorting_sortable*>(
151 gpr_zalloc(sizeof(address_sorting_sortable) * addresses.size()));
152 for (size_t i = 0; i < addresses.size(); i++) {
153 sortables[i].user_data =
154 const_cast<EventEngine::ResolvedAddress*>(&addresses[i]);
155 memcpy(&sortables[i].dest_addr.addr, addresses[i].address(),
156 addresses[i].size());
157 sortables[i].dest_addr.len = addresses[i].size();
158 }
159 address_sorting_rfc_6724_sort(sortables, addresses.size());
160 std::vector<EventEngine::ResolvedAddress> sorted_addresses;
161 sorted_addresses.reserve(addresses.size());
162 for (size_t i = 0; i < addresses.size(); ++i) {
163 sorted_addresses.emplace_back(
164 *static_cast<EventEngine::ResolvedAddress*>(sortables[i].user_data));
165 }
166 gpr_free(sortables);
167 return sorted_addresses;
168 }
169
170 struct QueryArg {
QueryArggrpc_event_engine::experimental::__anon6450d0860111::QueryArg171 QueryArg(AresResolver* ar, int id, absl::string_view name)
172 : ares_resolver(ar), callback_map_id(id), query_name(name) {}
173 AresResolver* ares_resolver;
174 int callback_map_id;
175 std::string query_name;
176 };
177
178 struct HostnameQueryArg : public QueryArg {
HostnameQueryArggrpc_event_engine::experimental::__anon6450d0860111::HostnameQueryArg179 HostnameQueryArg(AresResolver* ar, int id, absl::string_view name, int p)
180 : QueryArg(ar, id, name), port(p) {}
181 int port;
182 int pending_requests;
183 absl::Status error_status;
184 std::vector<EventEngine::ResolvedAddress> result;
185 };
186
187 } // namespace
188
189 absl::StatusOr<grpc_core::OrphanablePtr<AresResolver>>
CreateAresResolver(absl::string_view dns_server,std::unique_ptr<GrpcPolledFdFactory> polled_fd_factory,std::shared_ptr<EventEngine> event_engine)190 AresResolver::CreateAresResolver(
191 absl::string_view dns_server,
192 std::unique_ptr<GrpcPolledFdFactory> polled_fd_factory,
193 std::shared_ptr<EventEngine> event_engine) {
194 ares_options opts = {};
195 opts.flags |= ARES_FLAG_STAYOPEN;
196 if (g_event_engine_grpc_ares_test_only_force_tcp) {
197 opts.flags |= ARES_FLAG_USEVC;
198 }
199 ares_channel channel;
200 int status = ares_init_options(&channel, &opts, ARES_OPT_FLAGS);
201 if (status != ARES_SUCCESS) {
202 gpr_log(GPR_ERROR, "ares_init_options failed, status: %d", status);
203 return AresStatusToAbslStatus(
204 status,
205 absl::StrCat("Failed to init c-ares channel: ", ares_strerror(status)));
206 }
207 event_engine_grpc_ares_test_only_inject_config(&channel);
208 polled_fd_factory->ConfigureAresChannelLocked(channel);
209 if (!dns_server.empty()) {
210 absl::Status status = SetRequestDNSServer(dns_server, &channel);
211 if (!status.ok()) {
212 return status;
213 }
214 }
215 return grpc_core::MakeOrphanable<AresResolver>(
216 std::move(polled_fd_factory), std::move(event_engine), channel);
217 }
218
AresResolver(std::unique_ptr<GrpcPolledFdFactory> polled_fd_factory,std::shared_ptr<EventEngine> event_engine,ares_channel channel)219 AresResolver::AresResolver(
220 std::unique_ptr<GrpcPolledFdFactory> polled_fd_factory,
221 std::shared_ptr<EventEngine> event_engine, ares_channel channel)
222 : RefCountedDNSResolverInterface(
223 GRPC_TRACE_FLAG_ENABLED(grpc_trace_ares_resolver) ? "AresResolver"
224 : nullptr),
225 channel_(channel),
226 polled_fd_factory_(std::move(polled_fd_factory)),
227 event_engine_(std::move(event_engine)) {
228 polled_fd_factory_->Initialize(&mutex_, event_engine_.get());
229 }
230
~AresResolver()231 AresResolver::~AresResolver() {
232 GPR_ASSERT(fd_node_list_.empty());
233 GPR_ASSERT(callback_map_.empty());
234 ares_destroy(channel_);
235 }
236
Orphan()237 void AresResolver::Orphan() {
238 {
239 grpc_core::MutexLock lock(&mutex_);
240 shutting_down_ = true;
241 if (ares_backup_poll_alarm_handle_.has_value()) {
242 event_engine_->Cancel(*ares_backup_poll_alarm_handle_);
243 ares_backup_poll_alarm_handle_.reset();
244 }
245 for (const auto& fd_node : fd_node_list_) {
246 if (!fd_node->already_shutdown) {
247 GRPC_ARES_RESOLVER_TRACE_LOG("resolver: %p shutdown fd: %s", this,
248 fd_node->polled_fd->GetName());
249 GPR_ASSERT(fd_node->polled_fd->ShutdownLocked(
250 absl::CancelledError("AresResolver::Orphan")));
251 fd_node->already_shutdown = true;
252 }
253 }
254 }
255 Unref(DEBUG_LOCATION, "Orphan");
256 }
257
LookupHostname(EventEngine::DNSResolver::LookupHostnameCallback callback,absl::string_view name,absl::string_view default_port)258 void AresResolver::LookupHostname(
259 EventEngine::DNSResolver::LookupHostnameCallback callback,
260 absl::string_view name, absl::string_view default_port) {
261 absl::string_view host;
262 absl::string_view port_string;
263 if (!grpc_core::SplitHostPort(name, &host, &port_string)) {
264 event_engine_->Run(
265 [callback = std::move(callback),
266 status = absl::InvalidArgumentError(absl::StrCat(
267 "Unparseable name: ", name))]() mutable { callback(status); });
268 return;
269 }
270 if (host.empty()) {
271 event_engine_->Run([callback = std::move(callback),
272 status = absl::InvalidArgumentError(absl::StrCat(
273 "host must not be empty in name: ",
274 name))]() mutable { callback(status); });
275 return;
276 }
277 if (port_string.empty()) {
278 if (default_port.empty()) {
279 event_engine_->Run([callback = std::move(callback),
280 status = absl::InvalidArgumentError(absl::StrFormat(
281 "No port in name %s or default_port argument",
282 name))]() mutable { callback(status); });
283 return;
284 }
285 port_string = default_port;
286 }
287 int port = 0;
288 if (port_string == "http") {
289 port = 80;
290 } else if (port_string == "https") {
291 port = 443;
292 } else if (!absl::SimpleAtoi(port_string, &port)) {
293 event_engine_->Run([callback = std::move(callback),
294 status = absl::InvalidArgumentError(absl::StrCat(
295 "Failed to parse port in name: ",
296 name))]() mutable { callback(status); });
297 return;
298 }
299 // TODO(yijiem): Change this when refactoring code in
300 // src/core/lib/address_utils to use EventEngine::ResolvedAddress.
301 grpc_resolved_address addr;
302 const std::string hostport = grpc_core::JoinHostPort(host, port);
303 if (grpc_parse_ipv4_hostport(hostport.c_str(), &addr,
304 false /* log errors */) ||
305 grpc_parse_ipv6_hostport(hostport.c_str(), &addr,
306 false /* log errors */)) {
307 // Early out if the target is an ipv4 or ipv6 literal.
308 std::vector<EventEngine::ResolvedAddress> result;
309 result.emplace_back(reinterpret_cast<sockaddr*>(addr.addr), addr.len);
310 event_engine_->Run(
311 [callback = std::move(callback), result = std::move(result)]() mutable {
312 callback(std::move(result));
313 });
314 return;
315 }
316 grpc_core::MutexLock lock(&mutex_);
317 callback_map_.emplace(++id_, std::move(callback));
318 auto* resolver_arg = new HostnameQueryArg(this, id_, name, port);
319 if (IsIpv6LoopbackAvailable()) {
320 // Note that using AF_UNSPEC for both IPv6 and IPv4 queries does not work in
321 // all cases, e.g. for localhost:<> it only gets back the IPv6 result (i.e.
322 // ::1).
323 resolver_arg->pending_requests = 2;
324 ares_gethostbyname(channel_, std::string(host).c_str(), AF_INET,
325 &AresResolver::OnHostbynameDoneLocked, resolver_arg);
326 ares_gethostbyname(channel_, std::string(host).c_str(), AF_INET6,
327 &AresResolver::OnHostbynameDoneLocked, resolver_arg);
328 } else {
329 resolver_arg->pending_requests = 1;
330 ares_gethostbyname(channel_, std::string(host).c_str(), AF_INET,
331 &AresResolver::OnHostbynameDoneLocked, resolver_arg);
332 }
333 CheckSocketsLocked();
334 MaybeStartTimerLocked();
335 }
336
LookupSRV(EventEngine::DNSResolver::LookupSRVCallback callback,absl::string_view name)337 void AresResolver::LookupSRV(
338 EventEngine::DNSResolver::LookupSRVCallback callback,
339 absl::string_view name) {
340 absl::string_view host;
341 absl::string_view port;
342 if (!grpc_core::SplitHostPort(name, &host, &port)) {
343 event_engine_->Run(
344 [callback = std::move(callback),
345 status = absl::InvalidArgumentError(absl::StrCat(
346 "Unparseable name: ", name))]() mutable { callback(status); });
347 return;
348 }
349 if (host.empty()) {
350 event_engine_->Run([callback = std::move(callback),
351 status = absl::InvalidArgumentError(absl::StrCat(
352 "host must not be empty in name: ",
353 name))]() mutable { callback(status); });
354 return;
355 }
356 // Don't query for SRV records if the target is "localhost"
357 if (absl::EqualsIgnoreCase(host, "localhost")) {
358 event_engine_->Run([callback = std::move(callback)]() mutable {
359 callback(std::vector<EventEngine::DNSResolver::SRVRecord>());
360 });
361 return;
362 }
363 grpc_core::MutexLock lock(&mutex_);
364 callback_map_.emplace(++id_, std::move(callback));
365 auto* resolver_arg = new QueryArg(this, id_, host);
366 ares_query(channel_, std::string(host).c_str(), ns_c_in, ns_t_srv,
367 &AresResolver::OnSRVQueryDoneLocked, resolver_arg);
368 CheckSocketsLocked();
369 MaybeStartTimerLocked();
370 }
371
LookupTXT(EventEngine::DNSResolver::LookupTXTCallback callback,absl::string_view name)372 void AresResolver::LookupTXT(
373 EventEngine::DNSResolver::LookupTXTCallback callback,
374 absl::string_view name) {
375 absl::string_view host;
376 absl::string_view port;
377 if (!grpc_core::SplitHostPort(name, &host, &port)) {
378 event_engine_->Run(
379 [callback = std::move(callback),
380 status = absl::InvalidArgumentError(absl::StrCat(
381 "Unparseable name: ", name))]() mutable { callback(status); });
382 return;
383 }
384 if (host.empty()) {
385 event_engine_->Run([callback = std::move(callback),
386 status = absl::InvalidArgumentError(absl::StrCat(
387 "host must not be empty in name: ",
388 name))]() mutable { callback(status); });
389 return;
390 }
391 // Don't query for TXT records if the target is "localhost"
392 if (absl::EqualsIgnoreCase(host, "localhost")) {
393 event_engine_->Run([callback = std::move(callback)]() mutable {
394 callback(std::vector<std::string>());
395 });
396 return;
397 }
398 grpc_core::MutexLock lock(&mutex_);
399 callback_map_.emplace(++id_, std::move(callback));
400 auto* resolver_arg = new QueryArg(this, id_, host);
401 ares_search(channel_, std::string(host).c_str(), ns_c_in, ns_t_txt,
402 &AresResolver::OnTXTDoneLocked, resolver_arg);
403 CheckSocketsLocked();
404 MaybeStartTimerLocked();
405 }
406
CheckSocketsLocked()407 void AresResolver::CheckSocketsLocked() {
408 FdNodeList new_list;
409 if (!shutting_down_) {
410 ares_socket_t socks[ARES_GETSOCK_MAXNUM] = {};
411 int socks_bitmask = ares_getsock(channel_, socks, ARES_GETSOCK_MAXNUM);
412 for (size_t i = 0; i < ARES_GETSOCK_MAXNUM; i++) {
413 if (ARES_GETSOCK_READABLE(socks_bitmask, i) ||
414 ARES_GETSOCK_WRITABLE(socks_bitmask, i)) {
415 auto iter = std::find_if(
416 fd_node_list_.begin(), fd_node_list_.end(),
417 [sock = socks[i]](const auto& node) { return node->as == sock; });
418 if (iter == fd_node_list_.end()) {
419 GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p new fd: %d", this,
420 socks[i]);
421 new_list.push_back(std::make_unique<FdNode>(
422 socks[i], polled_fd_factory_->NewGrpcPolledFdLocked(socks[i])));
423 } else {
424 new_list.splice(new_list.end(), fd_node_list_, iter);
425 }
426 FdNode* fd_node = new_list.back().get();
427 if (ARES_GETSOCK_READABLE(socks_bitmask, i) &&
428 !fd_node->readable_registered) {
429 fd_node->readable_registered = true;
430 if (fd_node->polled_fd->IsFdStillReadableLocked()) {
431 // If c-ares is interested to read and the socket already has data
432 // available for read, schedules OnReadable directly here. This is
433 // to cope with the edge-triggered poller not getting an event if no
434 // new data arrives and c-ares hasn't read all the data in the
435 // previous ares_process_fd.
436 GRPC_ARES_RESOLVER_TRACE_LOG(
437 "resolver:%p schedule read directly on: %d", this, fd_node->as);
438 event_engine_->Run(
439 [self = Ref(DEBUG_LOCATION, "CheckSocketsLocked"),
440 fd_node]() mutable {
441 static_cast<AresResolver*>(self.get())
442 ->OnReadable(fd_node, absl::OkStatus());
443 });
444 } else {
445 // Otherwise register with the poller for readable event.
446 GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p notify read on: %d", this,
447 fd_node->as);
448 fd_node->polled_fd->RegisterForOnReadableLocked(
449 [self = Ref(DEBUG_LOCATION, "CheckSocketsLocked"),
450 fd_node](absl::Status status) mutable {
451 static_cast<AresResolver*>(self.get())
452 ->OnReadable(fd_node, status);
453 });
454 }
455 }
456 // Register write_closure if the socket is writable and write_closure
457 // has not been registered with this socket.
458 if (ARES_GETSOCK_WRITABLE(socks_bitmask, i) &&
459 !fd_node->writable_registered) {
460 GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p notify write on: %d", this,
461 fd_node->as);
462 fd_node->writable_registered = true;
463 fd_node->polled_fd->RegisterForOnWriteableLocked(
464 [self = Ref(DEBUG_LOCATION, "CheckSocketsLocked"),
465 fd_node](absl::Status status) mutable {
466 static_cast<AresResolver*>(self.get())
467 ->OnWritable(fd_node, status);
468 });
469 }
470 }
471 }
472 }
473 // Any remaining fds in fd_node_list_ were not returned by ares_getsock()
474 // and are therefore no longer in use, so they can be shut down and removed
475 // from the list.
476 // TODO(yijiem): Since we are keeping the underlying socket opened for both
477 // Posix and Windows, it might be reasonable to also keep the FdNodes alive
478 // till the end. But we need to change the state management of FdNodes in this
479 // file. This may simplify the code a bit.
480 while (!fd_node_list_.empty()) {
481 FdNode* fd_node = fd_node_list_.front().get();
482 if (!fd_node->already_shutdown) {
483 GRPC_ARES_RESOLVER_TRACE_LOG("resolver: %p shutdown fd: %s", this,
484 fd_node->polled_fd->GetName());
485 fd_node->already_shutdown =
486 fd_node->polled_fd->ShutdownLocked(absl::OkStatus());
487 }
488 if (!fd_node->readable_registered && !fd_node->writable_registered) {
489 GRPC_ARES_RESOLVER_TRACE_LOG("resolver: %p delete fd: %s", this,
490 fd_node->polled_fd->GetName());
491 fd_node_list_.pop_front();
492 } else {
493 new_list.splice(new_list.end(), fd_node_list_, fd_node_list_.begin());
494 }
495 }
496 fd_node_list_ = std::move(new_list);
497 }
498
MaybeStartTimerLocked()499 void AresResolver::MaybeStartTimerLocked() {
500 if (ares_backup_poll_alarm_handle_.has_value()) {
501 return;
502 }
503 // Initialize the backup poll alarm
504 GRPC_ARES_RESOLVER_TRACE_LOG(
505 "request:%p MaybeStartTimerLocked next ares process poll time in %zu ms",
506 this, Milliseconds(kAresBackupPollAlarmDuration));
507 ares_backup_poll_alarm_handle_ = event_engine_->RunAfter(
508 kAresBackupPollAlarmDuration,
509 [self = Ref(DEBUG_LOCATION, "MaybeStartTimerLocked")]() {
510 static_cast<AresResolver*>(self.get())->OnAresBackupPollAlarm();
511 });
512 }
513
OnReadable(FdNode * fd_node,absl::Status status)514 void AresResolver::OnReadable(FdNode* fd_node, absl::Status status) {
515 grpc_core::MutexLock lock(&mutex_);
516 GPR_ASSERT(fd_node->readable_registered);
517 fd_node->readable_registered = false;
518 GRPC_ARES_RESOLVER_TRACE_LOG("OnReadable: fd: %d; request: %p; status: %s",
519 fd_node->as, this, status.ToString().c_str());
520 if (status.ok() && !shutting_down_) {
521 ares_process_fd(channel_, fd_node->as, ARES_SOCKET_BAD);
522 } else {
523 // If error is not absl::OkStatus() or the resolution was cancelled, it
524 // means the fd has been shutdown or timed out. The pending lookups made
525 // on this request will be cancelled by the following ares_cancel(). The
526 // remaining file descriptors in this request will be cleaned up in the
527 // following Work() method.
528 ares_cancel(channel_);
529 }
530 CheckSocketsLocked();
531 }
532
OnWritable(FdNode * fd_node,absl::Status status)533 void AresResolver::OnWritable(FdNode* fd_node, absl::Status status) {
534 grpc_core::MutexLock lock(&mutex_);
535 GPR_ASSERT(fd_node->writable_registered);
536 fd_node->writable_registered = false;
537 GRPC_ARES_RESOLVER_TRACE_LOG("OnWritable: fd: %d; request:%p; status: %s",
538 fd_node->as, this, status.ToString().c_str());
539 if (status.ok() && !shutting_down_) {
540 ares_process_fd(channel_, ARES_SOCKET_BAD, fd_node->as);
541 } else {
542 // If error is not absl::OkStatus() or the resolution was cancelled, it
543 // means the fd has been shutdown or timed out. The pending lookups made
544 // on this request will be cancelled by the following ares_cancel(). The
545 // remaining file descriptors in this request will be cleaned up in the
546 // following Work() method.
547 ares_cancel(channel_);
548 }
549 CheckSocketsLocked();
550 }
551
552 // In case of non-responsive DNS servers, dropped packets, etc., c-ares has
553 // intelligent timeout and retry logic, which we can take advantage of by
554 // polling ares_process_fd on time intervals. Overall, the c-ares library is
555 // meant to be called into and given a chance to proceed name resolution:
556 // a) when fd events happen
557 // b) when some time has passed without fd events having happened
558 // For the latter, we use this backup poller. Also see
559 // https://github.com/grpc/grpc/pull/17688 description for more details.
OnAresBackupPollAlarm()560 void AresResolver::OnAresBackupPollAlarm() {
561 grpc_core::MutexLock lock(&mutex_);
562 ares_backup_poll_alarm_handle_.reset();
563 GRPC_ARES_RESOLVER_TRACE_LOG(
564 "request:%p OnAresBackupPollAlarm shutting_down=%d.", this,
565 shutting_down_);
566 if (!shutting_down_) {
567 for (const auto& fd_node : fd_node_list_) {
568 if (!fd_node->already_shutdown) {
569 GRPC_ARES_RESOLVER_TRACE_LOG(
570 "request:%p OnAresBackupPollAlarm; ares_process_fd. fd=%s", this,
571 fd_node->polled_fd->GetName());
572 ares_socket_t as = fd_node->polled_fd->GetWrappedAresSocketLocked();
573 ares_process_fd(channel_, as, as);
574 }
575 }
576 MaybeStartTimerLocked();
577 CheckSocketsLocked();
578 }
579 }
580
OnHostbynameDoneLocked(void * arg,int status,int,struct hostent * hostent)581 void AresResolver::OnHostbynameDoneLocked(void* arg, int status,
582 int /*timeouts*/,
583 struct hostent* hostent) {
584 auto* hostname_qa = static_cast<HostnameQueryArg*>(arg);
585 GPR_ASSERT(hostname_qa->pending_requests-- > 0);
586 auto* ares_resolver = hostname_qa->ares_resolver;
587 if (status != ARES_SUCCESS) {
588 std::string error_msg =
589 absl::StrFormat("address lookup failed for %s: %s",
590 hostname_qa->query_name, ares_strerror(status));
591 GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p OnHostbynameDoneLocked: %s",
592 ares_resolver, error_msg.c_str());
593 hostname_qa->error_status = AresStatusToAbslStatus(status, error_msg);
594 } else {
595 GRPC_ARES_RESOLVER_TRACE_LOG(
596 "resolver:%p OnHostbynameDoneLocked name=%s ARES_SUCCESS",
597 ares_resolver, hostname_qa->query_name.c_str());
598 for (size_t i = 0; hostent->h_addr_list[i] != nullptr; i++) {
599 switch (hostent->h_addrtype) {
600 case AF_INET6: {
601 size_t addr_len = sizeof(struct sockaddr_in6);
602 struct sockaddr_in6 addr;
603 memset(&addr, 0, addr_len);
604 memcpy(&addr.sin6_addr, hostent->h_addr_list[i],
605 sizeof(struct in6_addr));
606 addr.sin6_family = static_cast<unsigned char>(hostent->h_addrtype);
607 addr.sin6_port = htons(hostname_qa->port);
608 hostname_qa->result.emplace_back(
609 reinterpret_cast<const sockaddr*>(&addr), addr_len);
610 char output[INET6_ADDRSTRLEN];
611 ares_inet_ntop(AF_INET6, &addr.sin6_addr, output, INET6_ADDRSTRLEN);
612 GRPC_ARES_RESOLVER_TRACE_LOG(
613 "resolver:%p c-ares resolver gets a AF_INET6 result: \n"
614 " addr: %s\n port: %d\n sin6_scope_id: %d\n",
615 ares_resolver, output, hostname_qa->port, addr.sin6_scope_id);
616 break;
617 }
618 case AF_INET: {
619 size_t addr_len = sizeof(struct sockaddr_in);
620 struct sockaddr_in addr;
621 memset(&addr, 0, addr_len);
622 memcpy(&addr.sin_addr, hostent->h_addr_list[i],
623 sizeof(struct in_addr));
624 addr.sin_family = static_cast<unsigned char>(hostent->h_addrtype);
625 addr.sin_port = htons(hostname_qa->port);
626 hostname_qa->result.emplace_back(
627 reinterpret_cast<const sockaddr*>(&addr), addr_len);
628 char output[INET_ADDRSTRLEN];
629 ares_inet_ntop(AF_INET, &addr.sin_addr, output, INET_ADDRSTRLEN);
630 GRPC_ARES_RESOLVER_TRACE_LOG(
631 "resolver:%p c-ares resolver gets a AF_INET result: \n"
632 " addr: %s\n port: %d\n",
633 ares_resolver, output, hostname_qa->port);
634 break;
635 }
636 default:
637 grpc_core::Crash(
638 absl::StrFormat("resolver:%p Received invalid type of address %d",
639 ares_resolver, hostent->h_addrtype));
640 }
641 }
642 }
643 if (hostname_qa->pending_requests == 0) {
644 auto nh =
645 ares_resolver->callback_map_.extract(hostname_qa->callback_map_id);
646 GPR_ASSERT(!nh.empty());
647 GPR_ASSERT(absl::holds_alternative<
648 EventEngine::DNSResolver::LookupHostnameCallback>(nh.mapped()));
649 auto callback = absl::get<EventEngine::DNSResolver::LookupHostnameCallback>(
650 std::move(nh.mapped()));
651 if (!hostname_qa->result.empty() || hostname_qa->error_status.ok()) {
652 ares_resolver->event_engine_->Run(
653 [callback = std::move(callback),
654 result = SortAddresses(hostname_qa->result)]() mutable {
655 callback(std::move(result));
656 });
657 } else {
658 ares_resolver->event_engine_->Run(
659 [callback = std::move(callback),
660 result = std::move(hostname_qa->error_status)]() mutable {
661 callback(std::move(result));
662 });
663 }
664 delete hostname_qa;
665 }
666 }
667
OnSRVQueryDoneLocked(void * arg,int status,int,unsigned char * abuf,int alen)668 void AresResolver::OnSRVQueryDoneLocked(void* arg, int status, int /*timeouts*/,
669 unsigned char* abuf, int alen) {
670 std::unique_ptr<QueryArg> qa(static_cast<QueryArg*>(arg));
671 auto* ares_resolver = qa->ares_resolver;
672 auto nh = ares_resolver->callback_map_.extract(qa->callback_map_id);
673 GPR_ASSERT(!nh.empty());
674 GPR_ASSERT(
675 absl::holds_alternative<EventEngine::DNSResolver::LookupSRVCallback>(
676 nh.mapped()));
677 auto callback = absl::get<EventEngine::DNSResolver::LookupSRVCallback>(
678 std::move(nh.mapped()));
679 auto fail = [&](absl::string_view prefix) {
680 std::string error_message = absl::StrFormat(
681 "%s for %s: %s", prefix, qa->query_name, ares_strerror(status));
682 GRPC_ARES_RESOLVER_TRACE_LOG("OnSRVQueryDoneLocked: %s",
683 error_message.c_str());
684 ares_resolver->event_engine_->Run(
685 [callback = std::move(callback),
686 status = AresStatusToAbslStatus(status, error_message)]() mutable {
687 callback(status);
688 });
689 };
690 if (status != ARES_SUCCESS) {
691 fail("SRV lookup failed");
692 return;
693 }
694 GRPC_ARES_RESOLVER_TRACE_LOG(
695 "resolver:%p OnSRVQueryDoneLocked name=%s ARES_SUCCESS", ares_resolver,
696 qa->query_name.c_str());
697 struct ares_srv_reply* reply = nullptr;
698 status = ares_parse_srv_reply(abuf, alen, &reply);
699 GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p ares_parse_srv_reply: %d",
700 ares_resolver, status);
701 if (status != ARES_SUCCESS) {
702 fail("Failed to parse SRV reply");
703 return;
704 }
705 std::vector<EventEngine::DNSResolver::SRVRecord> result;
706 for (struct ares_srv_reply* srv_it = reply; srv_it != nullptr;
707 srv_it = srv_it->next) {
708 EventEngine::DNSResolver::SRVRecord record;
709 record.host = srv_it->host;
710 record.port = srv_it->port;
711 record.priority = srv_it->priority;
712 record.weight = srv_it->weight;
713 result.push_back(std::move(record));
714 }
715 if (reply != nullptr) {
716 ares_free_data(reply);
717 }
718 ares_resolver->event_engine_->Run(
719 [callback = std::move(callback), result = std::move(result)]() mutable {
720 callback(std::move(result));
721 });
722 }
723
OnTXTDoneLocked(void * arg,int status,int,unsigned char * buf,int len)724 void AresResolver::OnTXTDoneLocked(void* arg, int status, int /*timeouts*/,
725 unsigned char* buf, int len) {
726 std::unique_ptr<QueryArg> qa(static_cast<QueryArg*>(arg));
727 auto* ares_resolver = qa->ares_resolver;
728 auto nh = ares_resolver->callback_map_.extract(qa->callback_map_id);
729 GPR_ASSERT(!nh.empty());
730 GPR_ASSERT(
731 absl::holds_alternative<EventEngine::DNSResolver::LookupTXTCallback>(
732 nh.mapped()));
733 auto callback = absl::get<EventEngine::DNSResolver::LookupTXTCallback>(
734 std::move(nh.mapped()));
735 auto fail = [&](absl::string_view prefix) {
736 std::string error_message = absl::StrFormat(
737 "%s for %s: %s", prefix, qa->query_name, ares_strerror(status));
738 GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p OnTXTDoneLocked: %s",
739 ares_resolver, error_message.c_str());
740 ares_resolver->event_engine_->Run(
741 [callback = std::move(callback),
742 status = AresStatusToAbslStatus(status, error_message)]() mutable {
743 callback(status);
744 });
745 };
746 if (status != ARES_SUCCESS) {
747 fail("TXT lookup failed");
748 return;
749 }
750 GRPC_ARES_RESOLVER_TRACE_LOG(
751 "resolver:%p OnTXTDoneLocked name=%s ARES_SUCCESS", ares_resolver,
752 qa->query_name.c_str());
753 struct ares_txt_ext* reply = nullptr;
754 status = ares_parse_txt_reply_ext(buf, len, &reply);
755 if (status != ARES_SUCCESS) {
756 fail("Failed to parse TXT result");
757 return;
758 }
759 std::vector<std::string> result;
760 for (struct ares_txt_ext* part = reply; part != nullptr; part = part->next) {
761 if (part->record_start) {
762 result.emplace_back(reinterpret_cast<char*>(part->txt), part->length);
763 } else {
764 absl::StrAppend(
765 &result.back(),
766 std::string(reinterpret_cast<char*>(part->txt), part->length));
767 }
768 }
769 GRPC_ARES_RESOLVER_TRACE_LOG("resolver:%p Got %zu TXT records", ares_resolver,
770 result.size());
771 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_ares_resolver)) {
772 for (const auto& record : result) {
773 gpr_log(GPR_INFO, "%s", record.c_str());
774 }
775 }
776 // Clean up.
777 ares_free_data(reply);
778 ares_resolver->event_engine_->Run(
779 [callback = std::move(callback), result = std::move(result)]() mutable {
780 callback(std::move(result));
781 });
782 }
783
784 } // namespace experimental
785 } // namespace grpc_event_engine
786
noop_inject_channel_config(ares_channel *)787 void noop_inject_channel_config(ares_channel* /*channel*/) {}
788
789 void (*event_engine_grpc_ares_test_only_inject_config)(ares_channel* channel) =
790 noop_inject_channel_config;
791
792 bool g_event_engine_grpc_ares_test_only_force_tcp = false;
793
794 #endif // GRPC_ARES == 1
795