xref: /aosp_15_r20/external/grpc-grpc/test/core/event_engine/event_engine_test_utils.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_TEST_CORE_EVENT_ENGINE_EVENT_ENGINE_TEST_UTILS_H
16 #define GRPC_TEST_CORE_EVENT_ENGINE_EVENT_ENGINE_TEST_UTILS_H
17 
18 #include <functional>
19 #include <map>
20 #include <memory>
21 #include <string>
22 #include <tuple>
23 #include <utility>
24 #include <vector>
25 
26 #include "absl/status/status.h"
27 #include "absl/status/statusor.h"
28 #include "absl/strings/string_view.h"
29 
30 #include <grpc/event_engine/event_engine.h>
31 #include <grpc/event_engine/slice_buffer.h>
32 
33 #include "src/core/lib/gprpp/notification.h"
34 #include "src/core/lib/gprpp/sync.h"
35 #include "src/core/lib/resource_quota/memory_quota.h"
36 
37 using EventEngineFactory = std::function<
38     std::unique_ptr<grpc_event_engine::experimental::EventEngine>()>;
39 
40 namespace grpc_event_engine {
41 namespace experimental {
42 
43 std::string ExtractSliceBufferIntoString(SliceBuffer* buf);
44 
45 // Returns a random message with bounded length.
46 std::string GetNextSendMessage();
47 
48 // Waits until the use_count of the EventEngine shared_ptr has reached 1
49 // and returns.
50 // Callers must give up their ref, or this method will block forever.
51 // Usage: WaitForSingleOwner(std::move(engine))
52 void WaitForSingleOwner(std::shared_ptr<EventEngine> engine);
53 
54 // A helper method to exchange data between two endpoints. It is assumed
55 // that both endpoints are connected. The data (specified as a string) is
56 // written by the sender_endpoint and read by the receiver_endpoint. It
57 // returns OK status only if data written == data read. It also blocks the
58 // calling thread until said Write and Read operations are complete.
59 absl::Status SendValidatePayload(absl::string_view data,
60                                  EventEngine::Endpoint* send_endpoint,
61                                  EventEngine::Endpoint* receive_endpoint);
62 
63 // A helper class to create clients/listeners and connections between them.
64 // The clients and listeners can be created by the oracle EventEngine
65 // or the EventEngine under test. The class provides handles into the
66 // connections that are created. Inidividual tests can test expected behavior by
67 // exchanging arbitrary data over these connections.
68 class ConnectionManager {
69  public:
ConnectionManager(std::unique_ptr<EventEngine> test_event_engine,std::unique_ptr<EventEngine> oracle_event_engine)70   ConnectionManager(std::unique_ptr<EventEngine> test_event_engine,
71                     std::unique_ptr<EventEngine> oracle_event_engine)
72       : memory_quota_(std::make_unique<grpc_core::MemoryQuota>("foo")),
73         test_event_engine_(std::move(test_event_engine)),
74         oracle_event_engine_(std::move(oracle_event_engine)) {}
75   ~ConnectionManager() = default;
76 
77   // It creates and starts a listener bound to all the specified list of
78   //  addresses.  If successful, return OK status. The type of the listener is
79   //  determined by the 2nd argument.
80   absl::Status BindAndStartListener(const std::vector<std::string>& addrs,
81                                     bool listener_type_oracle = true);
82 
83   // If connection is successful, returns a tuple containing:
84   //    1. a pointer to the client side endpoint of the connection.
85   //    2. a pointer to the server side endpoint of the connection.
86   // If un-successful it returns a non-OK  status containing the error
87   // encountered.
88   absl::StatusOr<std::tuple<std::unique_ptr<EventEngine::Endpoint>,
89                             std::unique_ptr<EventEngine::Endpoint>>>
90   CreateConnection(std::string target_addr, EventEngine::Duration timeout,
91                    bool client_type_oracle);
92 
93  private:
94   class Connection {
95    public:
96     Connection() = default;
97     ~Connection() = default;
98 
SetClientEndpoint(std::unique_ptr<EventEngine::Endpoint> && client_endpoint)99     void SetClientEndpoint(
100         std::unique_ptr<EventEngine::Endpoint>&& client_endpoint) {
101       client_endpoint_ = std::move(client_endpoint);
102       client_signal_.Notify();
103     }
SetServerEndpoint(std::unique_ptr<EventEngine::Endpoint> && server_endpoint)104     void SetServerEndpoint(
105         std::unique_ptr<EventEngine::Endpoint>&& server_endpoint) {
106       server_endpoint_ = std::move(server_endpoint);
107       server_signal_.Notify();
108     }
GetClientEndpoint()109     std::unique_ptr<EventEngine::Endpoint> GetClientEndpoint() {
110       auto client_endpoint = std::move(client_endpoint_);
111       client_endpoint_.reset();
112       return client_endpoint;
113     }
GetServerEndpoint()114     std::unique_ptr<EventEngine::Endpoint> GetServerEndpoint() {
115       auto server_endpoint = std::move(server_endpoint_);
116       server_endpoint_.reset();
117       return server_endpoint;
118     }
119 
120    private:
121     std::unique_ptr<EventEngine::Endpoint> client_endpoint_;
122     std::unique_ptr<EventEngine::Endpoint> server_endpoint_;
123     grpc_core::Notification client_signal_;
124     grpc_core::Notification server_signal_;
125   };
126 
127   grpc_core::Mutex mu_;
128   std::unique_ptr<grpc_core::MemoryQuota> memory_quota_;
129   int num_processed_connections_ = 0;
130   Connection last_in_progress_connection_;
131   std::map<std::string, std::shared_ptr<EventEngine::Listener>> listeners_;
132   std::unique_ptr<EventEngine> test_event_engine_;
133   std::unique_ptr<EventEngine> oracle_event_engine_;
134 };
135 
136 void AppendStringToSliceBuffer(SliceBuffer* buf, absl::string_view data);
137 
138 class NotifyOnDelete {
139  public:
NotifyOnDelete(grpc_core::Notification * signal)140   explicit NotifyOnDelete(grpc_core::Notification* signal) : signal_(signal) {}
141   NotifyOnDelete(const NotifyOnDelete&) = delete;
142   NotifyOnDelete& operator=(const NotifyOnDelete&) = delete;
NotifyOnDelete(NotifyOnDelete && other)143   NotifyOnDelete(NotifyOnDelete&& other) noexcept {
144     signal_ = other.signal_;
145     other.signal_ = nullptr;
146   }
147   NotifyOnDelete& operator=(NotifyOnDelete&& other) noexcept {
148     signal_ = other.signal_;
149     other.signal_ = nullptr;
150     return *this;
151   }
~NotifyOnDelete()152   ~NotifyOnDelete() {
153     if (signal_ != nullptr) {
154       signal_->Notify();
155     }
156   }
157 
158  private:
159   grpc_core::Notification* signal_;
160 };
161 
162 }  // namespace experimental
163 }  // namespace grpc_event_engine
164 
165 #endif  // GRPC_TEST_CORE_EVENT_ENGINE_EVENT_ENGINE_TEST_UTILS_H
166