xref: /aosp_15_r20/external/grpc-grpc/test/core/xds/xds_client_fuzzer.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <map>
18 #include <memory>
19 #include <set>
20 #include <string>
21 #include <utility>
22 
23 #include "absl/status/status.h"
24 #include "absl/status/statusor.h"
25 #include "absl/strings/str_cat.h"
26 #include "absl/strings/string_view.h"
27 #include "absl/time/time.h"
28 #include "absl/types/optional.h"
29 
30 #include <grpc/grpc.h>
31 #include <grpc/support/log.h>
32 
33 #include "src/core/ext/xds/xds_bootstrap.h"
34 #include "src/core/ext/xds/xds_bootstrap_grpc.h"
35 #include "src/core/ext/xds/xds_client.h"
36 #include "src/core/ext/xds/xds_cluster.h"
37 #include "src/core/ext/xds/xds_endpoint.h"
38 #include "src/core/ext/xds/xds_listener.h"
39 #include "src/core/ext/xds/xds_route_config.h"
40 #include "src/core/lib/event_engine/default_event_engine.h"
41 #include "src/core/lib/gprpp/orphanable.h"
42 #include "src/core/lib/gprpp/ref_counted_ptr.h"
43 #include "src/libfuzzer/libfuzzer_macro.h"
44 #include "src/proto/grpc/testing/xds/v3/discovery.pb.h"
45 #include "test/core/xds/xds_client_fuzzer.pb.h"
46 #include "test/core/xds/xds_client_test_peer.h"
47 #include "test/core/xds/xds_transport_fake.h"
48 
49 namespace grpc_core {
50 
51 class Fuzzer {
52  public:
Fuzzer(absl::string_view bootstrap_json)53   explicit Fuzzer(absl::string_view bootstrap_json) {
54     auto bootstrap = GrpcXdsBootstrap::Create(bootstrap_json);
55     if (!bootstrap.ok()) {
56       gpr_log(GPR_ERROR, "error creating bootstrap: %s",
57               bootstrap.status().ToString().c_str());
58       // Leave xds_client_ unset, so Act() will be a no-op.
59       return;
60     }
61     auto transport_factory = MakeOrphanable<FakeXdsTransportFactory>(
62         []() { Crash("Multiple concurrent reads"); });
63     transport_factory->SetAutoCompleteMessagesFromClient(false);
64     transport_factory->SetAbortOnUndrainedMessages(false);
65     transport_factory_ = transport_factory.get();
66     xds_client_ = MakeRefCounted<XdsClient>(
67         std::move(*bootstrap), std::move(transport_factory),
68         grpc_event_engine::experimental::GetDefaultEventEngine(),
69         /*metrics_reporter=*/nullptr, "foo agent", "foo version");
70   }
71 
Act(const xds_client_fuzzer::Action & action)72   void Act(const xds_client_fuzzer::Action& action) {
73     if (xds_client_ == nullptr) return;
74     switch (action.action_type_case()) {
75       case xds_client_fuzzer::Action::kStartWatch:
76         switch (action.start_watch().resource_type().resource_type_case()) {
77           case xds_client_fuzzer::ResourceType::kListener:
78             StartWatch(&listener_watchers_,
79                        action.start_watch().resource_name());
80             break;
81           case xds_client_fuzzer::ResourceType::kRouteConfig:
82             StartWatch(&route_config_watchers_,
83                        action.start_watch().resource_name());
84             break;
85           case xds_client_fuzzer::ResourceType::kCluster:
86             StartWatch(&cluster_watchers_,
87                        action.start_watch().resource_name());
88             break;
89           case xds_client_fuzzer::ResourceType::kEndpoint:
90             StartWatch(&endpoint_watchers_,
91                        action.start_watch().resource_name());
92             break;
93           case xds_client_fuzzer::ResourceType::RESOURCE_TYPE_NOT_SET:
94             break;
95         }
96         break;
97       case xds_client_fuzzer::Action::kStopWatch:
98         switch (action.stop_watch().resource_type().resource_type_case()) {
99           case xds_client_fuzzer::ResourceType::kListener:
100             StopWatch(&listener_watchers_, action.stop_watch().resource_name());
101             break;
102           case xds_client_fuzzer::ResourceType::kRouteConfig:
103             StopWatch(&route_config_watchers_,
104                       action.stop_watch().resource_name());
105             break;
106           case xds_client_fuzzer::ResourceType::kCluster:
107             StopWatch(&cluster_watchers_, action.stop_watch().resource_name());
108             break;
109           case xds_client_fuzzer::ResourceType::kEndpoint:
110             StopWatch(&endpoint_watchers_, action.stop_watch().resource_name());
111             break;
112           case xds_client_fuzzer::ResourceType::RESOURCE_TYPE_NOT_SET:
113             break;
114         }
115         break;
116       case xds_client_fuzzer::Action::kDumpCsdsData:
117         testing::XdsClientTestPeer(xds_client_.get()).TestDumpClientConfig();
118         break;
119       case xds_client_fuzzer::Action::kReportResourceCounts:
120         testing::XdsClientTestPeer(xds_client_.get())
121             .TestReportResourceCounts(
122                 [](const testing::XdsClientTestPeer::ResourceCountLabels&
123                        labels,
124                    uint64_t count) {
125                   gpr_log(GPR_INFO,
126                           "xds_authority=\"%s\", resource_type=\"%s\", "
127                           "cache_state=\"%s\" count=%" PRIu64,
128                           std::string(labels.xds_authority).c_str(),
129                           std::string(labels.resource_type).c_str(),
130                           std::string(labels.cache_state).c_str(), count);
131                 });
132         break;
133       case xds_client_fuzzer::Action::kReportServerConnections:
134         testing::XdsClientTestPeer(xds_client_.get())
135             .TestReportServerConnections(
136                 [](absl::string_view xds_server, bool connected) {
137                   gpr_log(GPR_INFO, "xds_server=\"%s\" connected=%d",
138                           std::string(xds_server).c_str(), connected);
139                 });
140         break;
141       case xds_client_fuzzer::Action::kTriggerConnectionFailure:
142         TriggerConnectionFailure(
143             action.trigger_connection_failure().authority(),
144             ToAbslStatus(action.trigger_connection_failure().status()));
145         break;
146       case xds_client_fuzzer::Action::kReadMessageFromClient:
147         ReadMessageFromClient(action.read_message_from_client().stream_id(),
148                               action.read_message_from_client().ok());
149         break;
150       case xds_client_fuzzer::Action::kSendMessageToClient:
151         SendMessageToClient(action.send_message_to_client().stream_id(),
152                             action.send_message_to_client().response());
153         break;
154       case xds_client_fuzzer::Action::kSendStatusToClient:
155         SendStatusToClient(
156             action.send_status_to_client().stream_id(),
157             ToAbslStatus(action.send_status_to_client().status()));
158         break;
159       case xds_client_fuzzer::Action::ACTION_TYPE_NOT_SET:
160         break;
161     }
162   }
163 
164  private:
165   template <typename ResourceTypeType>
166   class Watcher : public ResourceTypeType::WatcherInterface {
167    public:
168     using ResourceType = ResourceTypeType;
169 
Watcher(std::string resource_name)170     explicit Watcher(std::string resource_name)
171         : resource_name_(std::move(resource_name)) {}
172 
OnResourceChanged(std::shared_ptr<const typename ResourceType::ResourceType> resource,RefCountedPtr<XdsClient::ReadDelayHandle>)173     void OnResourceChanged(
174         std::shared_ptr<const typename ResourceType::ResourceType> resource,
175         RefCountedPtr<XdsClient::ReadDelayHandle> /* read_delay_handle */)
176         override {
177       gpr_log(GPR_INFO, "==> OnResourceChanged(%s %s): %s",
178               std::string(ResourceType::Get()->type_url()).c_str(),
179               resource_name_.c_str(), resource->ToString().c_str());
180     }
181 
OnError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle>)182     void OnError(
183         absl::Status status,
184         RefCountedPtr<XdsClient::ReadDelayHandle> /* read_delay_handle */)
185         override {
186       gpr_log(GPR_INFO, "==> OnError(%s %s): %s",
187               std::string(ResourceType::Get()->type_url()).c_str(),
188               resource_name_.c_str(), status.ToString().c_str());
189     }
190 
OnResourceDoesNotExist(RefCountedPtr<XdsClient::ReadDelayHandle>)191     void OnResourceDoesNotExist(
192         RefCountedPtr<XdsClient::ReadDelayHandle> /* read_delay_handle */)
193         override {
194       gpr_log(GPR_INFO, "==> OnResourceDoesNotExist(%s %s)",
195               std::string(ResourceType::Get()->type_url()).c_str(),
196               resource_name_.c_str());
197     }
198 
199    private:
200     std::string resource_name_;
201   };
202 
203   using ListenerWatcher = Watcher<XdsListenerResourceType>;
204   using RouteConfigWatcher = Watcher<XdsRouteConfigResourceType>;
205   using ClusterWatcher = Watcher<XdsClusterResourceType>;
206   using EndpointWatcher = Watcher<XdsEndpointResourceType>;
207 
208   template <typename WatcherType>
StartWatch(std::map<std::string,std::set<WatcherType * >> * watchers,std::string resource_name)209   void StartWatch(std::map<std::string, std::set<WatcherType*>>* watchers,
210                   std::string resource_name) {
211     gpr_log(GPR_INFO, "### StartWatch(%s %s)",
212             std::string(WatcherType::ResourceType::Get()->type_url()).c_str(),
213             resource_name.c_str());
214     auto watcher = MakeRefCounted<WatcherType>(resource_name);
215     (*watchers)[resource_name].insert(watcher.get());
216     WatcherType::ResourceType::Get()->StartWatch(
217         xds_client_.get(), resource_name, std::move(watcher));
218   }
219 
220   template <typename WatcherType>
StopWatch(std::map<std::string,std::set<WatcherType * >> * watchers,std::string resource_name)221   void StopWatch(std::map<std::string, std::set<WatcherType*>>* watchers,
222                  std::string resource_name) {
223     gpr_log(GPR_INFO, "### StopWatch(%s %s)",
224             std::string(WatcherType::ResourceType::Get()->type_url()).c_str(),
225             resource_name.c_str());
226     auto& watchers_set = (*watchers)[resource_name];
227     auto it = watchers_set.begin();
228     if (it == watchers_set.end()) return;
229     WatcherType::ResourceType::Get()->CancelWatch(xds_client_.get(),
230                                                   resource_name, *it);
231     watchers_set.erase(it);
232   }
233 
ToAbslStatus(const xds_client_fuzzer::Status & status)234   static absl::Status ToAbslStatus(const xds_client_fuzzer::Status& status) {
235     return absl::Status(static_cast<absl::StatusCode>(status.code()),
236                         status.message());
237   }
238 
GetServer(const std::string & authority)239   const XdsBootstrap::XdsServer* GetServer(const std::string& authority) {
240     const GrpcXdsBootstrap& bootstrap =
241         static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap());
242     if (authority.empty()) return bootstrap.servers().front();
243     const auto* authority_entry =
244         static_cast<const GrpcXdsBootstrap::GrpcAuthority*>(
245             bootstrap.LookupAuthority(authority));
246     if (authority_entry == nullptr) return nullptr;
247     if (!authority_entry->servers().empty()) {
248       return authority_entry->servers().front();
249     }
250     return bootstrap.servers().front();
251   }
252 
TriggerConnectionFailure(const std::string & authority,absl::Status status)253   void TriggerConnectionFailure(const std::string& authority,
254                                 absl::Status status) {
255     gpr_log(GPR_INFO, "### TriggerConnectionFailure(%s): %s", authority.c_str(),
256             status.ToString().c_str());
257     const auto* xds_server = GetServer(authority);
258     if (xds_server == nullptr) return;
259     transport_factory_->TriggerConnectionFailure(*xds_server,
260                                                  std::move(status));
261   }
262 
StreamIdMethod(const xds_client_fuzzer::StreamId & stream_id)263   static const char* StreamIdMethod(
264       const xds_client_fuzzer::StreamId& stream_id) {
265     switch (stream_id.method_case()) {
266       case xds_client_fuzzer::StreamId::kAds:
267         return FakeXdsTransportFactory::kAdsMethod;
268       case xds_client_fuzzer::StreamId::kLrs:
269         return FakeXdsTransportFactory::kLrsMethod;
270       case xds_client_fuzzer::StreamId::METHOD_NOT_SET:
271         return nullptr;
272     }
273   }
274 
GetStream(const xds_client_fuzzer::StreamId & stream_id)275   RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> GetStream(
276       const xds_client_fuzzer::StreamId& stream_id) {
277     const auto* xds_server = GetServer(stream_id.authority());
278     if (xds_server == nullptr) return nullptr;
279     const char* method = StreamIdMethod(stream_id);
280     if (method == nullptr) return nullptr;
281     return transport_factory_->WaitForStream(*xds_server, method,
282                                              absl::ZeroDuration());
283   }
284 
StreamIdString(const xds_client_fuzzer::StreamId & stream_id)285   static std::string StreamIdString(
286       const xds_client_fuzzer::StreamId& stream_id) {
287     return absl::StrCat("{authority=\"", stream_id.authority(),
288                         "\", method=", StreamIdMethod(stream_id), "}");
289   }
290 
ReadMessageFromClient(const xds_client_fuzzer::StreamId & stream_id,bool ok)291   void ReadMessageFromClient(const xds_client_fuzzer::StreamId& stream_id,
292                              bool ok) {
293     gpr_log(GPR_INFO, "### ReadMessageFromClient(%s): %s",
294             StreamIdString(stream_id).c_str(), ok ? "true" : "false");
295     auto stream = GetStream(stream_id);
296     if (stream == nullptr) return;
297     gpr_log(GPR_INFO, "    stream=%p", stream.get());
298     auto message = stream->WaitForMessageFromClient(absl::ZeroDuration());
299     if (message.has_value()) {
300       gpr_log(GPR_INFO, "    completing send_message");
301       stream->CompleteSendMessageFromClient(ok);
302     }
303   }
304 
SendMessageToClient(const xds_client_fuzzer::StreamId & stream_id,const envoy::service::discovery::v3::DiscoveryResponse & response)305   void SendMessageToClient(
306       const xds_client_fuzzer::StreamId& stream_id,
307       const envoy::service::discovery::v3::DiscoveryResponse& response) {
308     gpr_log(GPR_INFO, "### SendMessageToClient(%s)",
309             StreamIdString(stream_id).c_str());
310     auto stream = GetStream(stream_id);
311     if (stream == nullptr) return;
312     gpr_log(GPR_INFO, "    stream=%p", stream.get());
313     stream->SendMessageToClient(response.SerializeAsString());
314   }
315 
SendStatusToClient(const xds_client_fuzzer::StreamId & stream_id,absl::Status status)316   void SendStatusToClient(const xds_client_fuzzer::StreamId& stream_id,
317                           absl::Status status) {
318     gpr_log(GPR_INFO, "### SendStatusToClient(%s): %s",
319             StreamIdString(stream_id).c_str(), status.ToString().c_str());
320     auto stream = GetStream(stream_id);
321     if (stream == nullptr) return;
322     gpr_log(GPR_INFO, "    stream=%p", stream.get());
323     stream->MaybeSendStatusToClient(std::move(status));
324   }
325 
326   RefCountedPtr<XdsClient> xds_client_;
327   FakeXdsTransportFactory* transport_factory_;
328 
329   // Maps of currently active watchers for each resource type, keyed by
330   // resource name.
331   std::map<std::string, std::set<ListenerWatcher*>> listener_watchers_;
332   std::map<std::string, std::set<RouteConfigWatcher*>> route_config_watchers_;
333   std::map<std::string, std::set<ClusterWatcher*>> cluster_watchers_;
334   std::map<std::string, std::set<EndpointWatcher*>> endpoint_watchers_;
335 };
336 
337 }  // namespace grpc_core
338 
339 bool squelch = true;
340 
DEFINE_PROTO_FUZZER(const xds_client_fuzzer::Message & message)341 DEFINE_PROTO_FUZZER(const xds_client_fuzzer::Message& message) {
342   grpc_init();
343   grpc_core::Fuzzer fuzzer(message.bootstrap());
344   for (int i = 0; i < message.actions_size(); i++) {
345     fuzzer.Act(message.actions(i));
346   }
347   grpc_shutdown();
348 }
349