1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <map>
18 #include <memory>
19 #include <set>
20 #include <string>
21 #include <utility>
22
23 #include "absl/status/status.h"
24 #include "absl/status/statusor.h"
25 #include "absl/strings/str_cat.h"
26 #include "absl/strings/string_view.h"
27 #include "absl/time/time.h"
28 #include "absl/types/optional.h"
29
30 #include <grpc/grpc.h>
31 #include <grpc/support/log.h>
32
33 #include "src/core/ext/xds/xds_bootstrap.h"
34 #include "src/core/ext/xds/xds_bootstrap_grpc.h"
35 #include "src/core/ext/xds/xds_client.h"
36 #include "src/core/ext/xds/xds_cluster.h"
37 #include "src/core/ext/xds/xds_endpoint.h"
38 #include "src/core/ext/xds/xds_listener.h"
39 #include "src/core/ext/xds/xds_route_config.h"
40 #include "src/core/lib/event_engine/default_event_engine.h"
41 #include "src/core/lib/gprpp/orphanable.h"
42 #include "src/core/lib/gprpp/ref_counted_ptr.h"
43 #include "src/libfuzzer/libfuzzer_macro.h"
44 #include "src/proto/grpc/testing/xds/v3/discovery.pb.h"
45 #include "test/core/xds/xds_client_fuzzer.pb.h"
46 #include "test/core/xds/xds_client_test_peer.h"
47 #include "test/core/xds/xds_transport_fake.h"
48
49 namespace grpc_core {
50
51 class Fuzzer {
52 public:
Fuzzer(absl::string_view bootstrap_json)53 explicit Fuzzer(absl::string_view bootstrap_json) {
54 auto bootstrap = GrpcXdsBootstrap::Create(bootstrap_json);
55 if (!bootstrap.ok()) {
56 gpr_log(GPR_ERROR, "error creating bootstrap: %s",
57 bootstrap.status().ToString().c_str());
58 // Leave xds_client_ unset, so Act() will be a no-op.
59 return;
60 }
61 auto transport_factory = MakeOrphanable<FakeXdsTransportFactory>(
62 []() { Crash("Multiple concurrent reads"); });
63 transport_factory->SetAutoCompleteMessagesFromClient(false);
64 transport_factory->SetAbortOnUndrainedMessages(false);
65 transport_factory_ = transport_factory.get();
66 xds_client_ = MakeRefCounted<XdsClient>(
67 std::move(*bootstrap), std::move(transport_factory),
68 grpc_event_engine::experimental::GetDefaultEventEngine(),
69 /*metrics_reporter=*/nullptr, "foo agent", "foo version");
70 }
71
Act(const xds_client_fuzzer::Action & action)72 void Act(const xds_client_fuzzer::Action& action) {
73 if (xds_client_ == nullptr) return;
74 switch (action.action_type_case()) {
75 case xds_client_fuzzer::Action::kStartWatch:
76 switch (action.start_watch().resource_type().resource_type_case()) {
77 case xds_client_fuzzer::ResourceType::kListener:
78 StartWatch(&listener_watchers_,
79 action.start_watch().resource_name());
80 break;
81 case xds_client_fuzzer::ResourceType::kRouteConfig:
82 StartWatch(&route_config_watchers_,
83 action.start_watch().resource_name());
84 break;
85 case xds_client_fuzzer::ResourceType::kCluster:
86 StartWatch(&cluster_watchers_,
87 action.start_watch().resource_name());
88 break;
89 case xds_client_fuzzer::ResourceType::kEndpoint:
90 StartWatch(&endpoint_watchers_,
91 action.start_watch().resource_name());
92 break;
93 case xds_client_fuzzer::ResourceType::RESOURCE_TYPE_NOT_SET:
94 break;
95 }
96 break;
97 case xds_client_fuzzer::Action::kStopWatch:
98 switch (action.stop_watch().resource_type().resource_type_case()) {
99 case xds_client_fuzzer::ResourceType::kListener:
100 StopWatch(&listener_watchers_, action.stop_watch().resource_name());
101 break;
102 case xds_client_fuzzer::ResourceType::kRouteConfig:
103 StopWatch(&route_config_watchers_,
104 action.stop_watch().resource_name());
105 break;
106 case xds_client_fuzzer::ResourceType::kCluster:
107 StopWatch(&cluster_watchers_, action.stop_watch().resource_name());
108 break;
109 case xds_client_fuzzer::ResourceType::kEndpoint:
110 StopWatch(&endpoint_watchers_, action.stop_watch().resource_name());
111 break;
112 case xds_client_fuzzer::ResourceType::RESOURCE_TYPE_NOT_SET:
113 break;
114 }
115 break;
116 case xds_client_fuzzer::Action::kDumpCsdsData:
117 testing::XdsClientTestPeer(xds_client_.get()).TestDumpClientConfig();
118 break;
119 case xds_client_fuzzer::Action::kReportResourceCounts:
120 testing::XdsClientTestPeer(xds_client_.get())
121 .TestReportResourceCounts(
122 [](const testing::XdsClientTestPeer::ResourceCountLabels&
123 labels,
124 uint64_t count) {
125 gpr_log(GPR_INFO,
126 "xds_authority=\"%s\", resource_type=\"%s\", "
127 "cache_state=\"%s\" count=%" PRIu64,
128 std::string(labels.xds_authority).c_str(),
129 std::string(labels.resource_type).c_str(),
130 std::string(labels.cache_state).c_str(), count);
131 });
132 break;
133 case xds_client_fuzzer::Action::kReportServerConnections:
134 testing::XdsClientTestPeer(xds_client_.get())
135 .TestReportServerConnections(
136 [](absl::string_view xds_server, bool connected) {
137 gpr_log(GPR_INFO, "xds_server=\"%s\" connected=%d",
138 std::string(xds_server).c_str(), connected);
139 });
140 break;
141 case xds_client_fuzzer::Action::kTriggerConnectionFailure:
142 TriggerConnectionFailure(
143 action.trigger_connection_failure().authority(),
144 ToAbslStatus(action.trigger_connection_failure().status()));
145 break;
146 case xds_client_fuzzer::Action::kReadMessageFromClient:
147 ReadMessageFromClient(action.read_message_from_client().stream_id(),
148 action.read_message_from_client().ok());
149 break;
150 case xds_client_fuzzer::Action::kSendMessageToClient:
151 SendMessageToClient(action.send_message_to_client().stream_id(),
152 action.send_message_to_client().response());
153 break;
154 case xds_client_fuzzer::Action::kSendStatusToClient:
155 SendStatusToClient(
156 action.send_status_to_client().stream_id(),
157 ToAbslStatus(action.send_status_to_client().status()));
158 break;
159 case xds_client_fuzzer::Action::ACTION_TYPE_NOT_SET:
160 break;
161 }
162 }
163
164 private:
165 template <typename ResourceTypeType>
166 class Watcher : public ResourceTypeType::WatcherInterface {
167 public:
168 using ResourceType = ResourceTypeType;
169
Watcher(std::string resource_name)170 explicit Watcher(std::string resource_name)
171 : resource_name_(std::move(resource_name)) {}
172
OnResourceChanged(std::shared_ptr<const typename ResourceType::ResourceType> resource,RefCountedPtr<XdsClient::ReadDelayHandle>)173 void OnResourceChanged(
174 std::shared_ptr<const typename ResourceType::ResourceType> resource,
175 RefCountedPtr<XdsClient::ReadDelayHandle> /* read_delay_handle */)
176 override {
177 gpr_log(GPR_INFO, "==> OnResourceChanged(%s %s): %s",
178 std::string(ResourceType::Get()->type_url()).c_str(),
179 resource_name_.c_str(), resource->ToString().c_str());
180 }
181
OnError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle>)182 void OnError(
183 absl::Status status,
184 RefCountedPtr<XdsClient::ReadDelayHandle> /* read_delay_handle */)
185 override {
186 gpr_log(GPR_INFO, "==> OnError(%s %s): %s",
187 std::string(ResourceType::Get()->type_url()).c_str(),
188 resource_name_.c_str(), status.ToString().c_str());
189 }
190
OnResourceDoesNotExist(RefCountedPtr<XdsClient::ReadDelayHandle>)191 void OnResourceDoesNotExist(
192 RefCountedPtr<XdsClient::ReadDelayHandle> /* read_delay_handle */)
193 override {
194 gpr_log(GPR_INFO, "==> OnResourceDoesNotExist(%s %s)",
195 std::string(ResourceType::Get()->type_url()).c_str(),
196 resource_name_.c_str());
197 }
198
199 private:
200 std::string resource_name_;
201 };
202
203 using ListenerWatcher = Watcher<XdsListenerResourceType>;
204 using RouteConfigWatcher = Watcher<XdsRouteConfigResourceType>;
205 using ClusterWatcher = Watcher<XdsClusterResourceType>;
206 using EndpointWatcher = Watcher<XdsEndpointResourceType>;
207
208 template <typename WatcherType>
StartWatch(std::map<std::string,std::set<WatcherType * >> * watchers,std::string resource_name)209 void StartWatch(std::map<std::string, std::set<WatcherType*>>* watchers,
210 std::string resource_name) {
211 gpr_log(GPR_INFO, "### StartWatch(%s %s)",
212 std::string(WatcherType::ResourceType::Get()->type_url()).c_str(),
213 resource_name.c_str());
214 auto watcher = MakeRefCounted<WatcherType>(resource_name);
215 (*watchers)[resource_name].insert(watcher.get());
216 WatcherType::ResourceType::Get()->StartWatch(
217 xds_client_.get(), resource_name, std::move(watcher));
218 }
219
220 template <typename WatcherType>
StopWatch(std::map<std::string,std::set<WatcherType * >> * watchers,std::string resource_name)221 void StopWatch(std::map<std::string, std::set<WatcherType*>>* watchers,
222 std::string resource_name) {
223 gpr_log(GPR_INFO, "### StopWatch(%s %s)",
224 std::string(WatcherType::ResourceType::Get()->type_url()).c_str(),
225 resource_name.c_str());
226 auto& watchers_set = (*watchers)[resource_name];
227 auto it = watchers_set.begin();
228 if (it == watchers_set.end()) return;
229 WatcherType::ResourceType::Get()->CancelWatch(xds_client_.get(),
230 resource_name, *it);
231 watchers_set.erase(it);
232 }
233
ToAbslStatus(const xds_client_fuzzer::Status & status)234 static absl::Status ToAbslStatus(const xds_client_fuzzer::Status& status) {
235 return absl::Status(static_cast<absl::StatusCode>(status.code()),
236 status.message());
237 }
238
GetServer(const std::string & authority)239 const XdsBootstrap::XdsServer* GetServer(const std::string& authority) {
240 const GrpcXdsBootstrap& bootstrap =
241 static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap());
242 if (authority.empty()) return bootstrap.servers().front();
243 const auto* authority_entry =
244 static_cast<const GrpcXdsBootstrap::GrpcAuthority*>(
245 bootstrap.LookupAuthority(authority));
246 if (authority_entry == nullptr) return nullptr;
247 if (!authority_entry->servers().empty()) {
248 return authority_entry->servers().front();
249 }
250 return bootstrap.servers().front();
251 }
252
TriggerConnectionFailure(const std::string & authority,absl::Status status)253 void TriggerConnectionFailure(const std::string& authority,
254 absl::Status status) {
255 gpr_log(GPR_INFO, "### TriggerConnectionFailure(%s): %s", authority.c_str(),
256 status.ToString().c_str());
257 const auto* xds_server = GetServer(authority);
258 if (xds_server == nullptr) return;
259 transport_factory_->TriggerConnectionFailure(*xds_server,
260 std::move(status));
261 }
262
StreamIdMethod(const xds_client_fuzzer::StreamId & stream_id)263 static const char* StreamIdMethod(
264 const xds_client_fuzzer::StreamId& stream_id) {
265 switch (stream_id.method_case()) {
266 case xds_client_fuzzer::StreamId::kAds:
267 return FakeXdsTransportFactory::kAdsMethod;
268 case xds_client_fuzzer::StreamId::kLrs:
269 return FakeXdsTransportFactory::kLrsMethod;
270 case xds_client_fuzzer::StreamId::METHOD_NOT_SET:
271 return nullptr;
272 }
273 }
274
GetStream(const xds_client_fuzzer::StreamId & stream_id)275 RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> GetStream(
276 const xds_client_fuzzer::StreamId& stream_id) {
277 const auto* xds_server = GetServer(stream_id.authority());
278 if (xds_server == nullptr) return nullptr;
279 const char* method = StreamIdMethod(stream_id);
280 if (method == nullptr) return nullptr;
281 return transport_factory_->WaitForStream(*xds_server, method,
282 absl::ZeroDuration());
283 }
284
StreamIdString(const xds_client_fuzzer::StreamId & stream_id)285 static std::string StreamIdString(
286 const xds_client_fuzzer::StreamId& stream_id) {
287 return absl::StrCat("{authority=\"", stream_id.authority(),
288 "\", method=", StreamIdMethod(stream_id), "}");
289 }
290
ReadMessageFromClient(const xds_client_fuzzer::StreamId & stream_id,bool ok)291 void ReadMessageFromClient(const xds_client_fuzzer::StreamId& stream_id,
292 bool ok) {
293 gpr_log(GPR_INFO, "### ReadMessageFromClient(%s): %s",
294 StreamIdString(stream_id).c_str(), ok ? "true" : "false");
295 auto stream = GetStream(stream_id);
296 if (stream == nullptr) return;
297 gpr_log(GPR_INFO, " stream=%p", stream.get());
298 auto message = stream->WaitForMessageFromClient(absl::ZeroDuration());
299 if (message.has_value()) {
300 gpr_log(GPR_INFO, " completing send_message");
301 stream->CompleteSendMessageFromClient(ok);
302 }
303 }
304
SendMessageToClient(const xds_client_fuzzer::StreamId & stream_id,const envoy::service::discovery::v3::DiscoveryResponse & response)305 void SendMessageToClient(
306 const xds_client_fuzzer::StreamId& stream_id,
307 const envoy::service::discovery::v3::DiscoveryResponse& response) {
308 gpr_log(GPR_INFO, "### SendMessageToClient(%s)",
309 StreamIdString(stream_id).c_str());
310 auto stream = GetStream(stream_id);
311 if (stream == nullptr) return;
312 gpr_log(GPR_INFO, " stream=%p", stream.get());
313 stream->SendMessageToClient(response.SerializeAsString());
314 }
315
SendStatusToClient(const xds_client_fuzzer::StreamId & stream_id,absl::Status status)316 void SendStatusToClient(const xds_client_fuzzer::StreamId& stream_id,
317 absl::Status status) {
318 gpr_log(GPR_INFO, "### SendStatusToClient(%s): %s",
319 StreamIdString(stream_id).c_str(), status.ToString().c_str());
320 auto stream = GetStream(stream_id);
321 if (stream == nullptr) return;
322 gpr_log(GPR_INFO, " stream=%p", stream.get());
323 stream->MaybeSendStatusToClient(std::move(status));
324 }
325
326 RefCountedPtr<XdsClient> xds_client_;
327 FakeXdsTransportFactory* transport_factory_;
328
329 // Maps of currently active watchers for each resource type, keyed by
330 // resource name.
331 std::map<std::string, std::set<ListenerWatcher*>> listener_watchers_;
332 std::map<std::string, std::set<RouteConfigWatcher*>> route_config_watchers_;
333 std::map<std::string, std::set<ClusterWatcher*>> cluster_watchers_;
334 std::map<std::string, std::set<EndpointWatcher*>> endpoint_watchers_;
335 };
336
337 } // namespace grpc_core
338
339 bool squelch = true;
340
DEFINE_PROTO_FUZZER(const xds_client_fuzzer::Message & message)341 DEFINE_PROTO_FUZZER(const xds_client_fuzzer::Message& message) {
342 grpc_init();
343 grpc_core::Fuzzer fuzzer(message.bootstrap());
344 for (int i = 0; i < message.actions_size(); i++) {
345 fuzzer.Act(message.actions(i));
346 }
347 grpc_shutdown();
348 }
349