1 // Copyright 2022 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_TEST_CORE_EVENT_ENGINE_EVENT_ENGINE_TEST_UTILS_H 16 #define GRPC_TEST_CORE_EVENT_ENGINE_EVENT_ENGINE_TEST_UTILS_H 17 18 #include <functional> 19 #include <map> 20 #include <memory> 21 #include <string> 22 #include <tuple> 23 #include <utility> 24 #include <vector> 25 26 #include "absl/status/status.h" 27 #include "absl/status/statusor.h" 28 #include "absl/strings/string_view.h" 29 30 #include <grpc/event_engine/event_engine.h> 31 #include <grpc/event_engine/slice_buffer.h> 32 33 #include "src/core/lib/gprpp/notification.h" 34 #include "src/core/lib/gprpp/sync.h" 35 #include "src/core/lib/resource_quota/memory_quota.h" 36 37 using EventEngineFactory = std::function< 38 std::unique_ptr<grpc_event_engine::experimental::EventEngine>()>; 39 40 namespace grpc_event_engine { 41 namespace experimental { 42 43 std::string ExtractSliceBufferIntoString(SliceBuffer* buf); 44 45 // Returns a random message with bounded length. 46 std::string GetNextSendMessage(); 47 48 // Waits until the use_count of the EventEngine shared_ptr has reached 1 49 // and returns. 50 // Callers must give up their ref, or this method will block forever. 51 // Usage: WaitForSingleOwner(std::move(engine)) 52 void WaitForSingleOwner(std::shared_ptr<EventEngine> engine); 53 54 // A helper method to exchange data between two endpoints. It is assumed 55 // that both endpoints are connected. The data (specified as a string) is 56 // written by the sender_endpoint and read by the receiver_endpoint. It 57 // returns OK status only if data written == data read. It also blocks the 58 // calling thread until said Write and Read operations are complete. 59 absl::Status SendValidatePayload(absl::string_view data, 60 EventEngine::Endpoint* send_endpoint, 61 EventEngine::Endpoint* receive_endpoint); 62 63 // A helper class to create clients/listeners and connections between them. 64 // The clients and listeners can be created by the oracle EventEngine 65 // or the EventEngine under test. The class provides handles into the 66 // connections that are created. Inidividual tests can test expected behavior by 67 // exchanging arbitrary data over these connections. 68 class ConnectionManager { 69 public: ConnectionManager(std::unique_ptr<EventEngine> test_event_engine,std::unique_ptr<EventEngine> oracle_event_engine)70 ConnectionManager(std::unique_ptr<EventEngine> test_event_engine, 71 std::unique_ptr<EventEngine> oracle_event_engine) 72 : memory_quota_(std::make_unique<grpc_core::MemoryQuota>("foo")), 73 test_event_engine_(std::move(test_event_engine)), 74 oracle_event_engine_(std::move(oracle_event_engine)) {} 75 ~ConnectionManager() = default; 76 77 // It creates and starts a listener bound to all the specified list of 78 // addresses. If successful, return OK status. The type of the listener is 79 // determined by the 2nd argument. 80 absl::Status BindAndStartListener(const std::vector<std::string>& addrs, 81 bool listener_type_oracle = true); 82 83 // If connection is successful, returns a tuple containing: 84 // 1. a pointer to the client side endpoint of the connection. 85 // 2. a pointer to the server side endpoint of the connection. 86 // If un-successful it returns a non-OK status containing the error 87 // encountered. 88 absl::StatusOr<std::tuple<std::unique_ptr<EventEngine::Endpoint>, 89 std::unique_ptr<EventEngine::Endpoint>>> 90 CreateConnection(std::string target_addr, EventEngine::Duration timeout, 91 bool client_type_oracle); 92 93 private: 94 class Connection { 95 public: 96 Connection() = default; 97 ~Connection() = default; 98 SetClientEndpoint(std::unique_ptr<EventEngine::Endpoint> && client_endpoint)99 void SetClientEndpoint( 100 std::unique_ptr<EventEngine::Endpoint>&& client_endpoint) { 101 client_endpoint_ = std::move(client_endpoint); 102 client_signal_.Notify(); 103 } SetServerEndpoint(std::unique_ptr<EventEngine::Endpoint> && server_endpoint)104 void SetServerEndpoint( 105 std::unique_ptr<EventEngine::Endpoint>&& server_endpoint) { 106 server_endpoint_ = std::move(server_endpoint); 107 server_signal_.Notify(); 108 } GetClientEndpoint()109 std::unique_ptr<EventEngine::Endpoint> GetClientEndpoint() { 110 auto client_endpoint = std::move(client_endpoint_); 111 client_endpoint_.reset(); 112 return client_endpoint; 113 } GetServerEndpoint()114 std::unique_ptr<EventEngine::Endpoint> GetServerEndpoint() { 115 auto server_endpoint = std::move(server_endpoint_); 116 server_endpoint_.reset(); 117 return server_endpoint; 118 } 119 120 private: 121 std::unique_ptr<EventEngine::Endpoint> client_endpoint_; 122 std::unique_ptr<EventEngine::Endpoint> server_endpoint_; 123 grpc_core::Notification client_signal_; 124 grpc_core::Notification server_signal_; 125 }; 126 127 grpc_core::Mutex mu_; 128 std::unique_ptr<grpc_core::MemoryQuota> memory_quota_; 129 int num_processed_connections_ = 0; 130 Connection last_in_progress_connection_; 131 std::map<std::string, std::shared_ptr<EventEngine::Listener>> listeners_; 132 std::unique_ptr<EventEngine> test_event_engine_; 133 std::unique_ptr<EventEngine> oracle_event_engine_; 134 }; 135 136 void AppendStringToSliceBuffer(SliceBuffer* buf, absl::string_view data); 137 138 class NotifyOnDelete { 139 public: NotifyOnDelete(grpc_core::Notification * signal)140 explicit NotifyOnDelete(grpc_core::Notification* signal) : signal_(signal) {} 141 NotifyOnDelete(const NotifyOnDelete&) = delete; 142 NotifyOnDelete& operator=(const NotifyOnDelete&) = delete; NotifyOnDelete(NotifyOnDelete && other)143 NotifyOnDelete(NotifyOnDelete&& other) noexcept { 144 signal_ = other.signal_; 145 other.signal_ = nullptr; 146 } 147 NotifyOnDelete& operator=(NotifyOnDelete&& other) noexcept { 148 signal_ = other.signal_; 149 other.signal_ = nullptr; 150 return *this; 151 } ~NotifyOnDelete()152 ~NotifyOnDelete() { 153 if (signal_ != nullptr) { 154 signal_->Notify(); 155 } 156 } 157 158 private: 159 grpc_core::Notification* signal_; 160 }; 161 162 } // namespace experimental 163 } // namespace grpc_event_engine 164 165 #endif // GRPC_TEST_CORE_EVENT_ENGINE_EVENT_ENGINE_TEST_UTILS_H 166