1 // Copyright 2016 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_TEST_CPP_END2END_CONNECTION_ATTEMPT_INJECTOR_H 16 #define GRPC_TEST_CPP_END2END_CONNECTION_ATTEMPT_INJECTOR_H 17 18 #include <memory> 19 20 #include "src/core/lib/event_engine/channel_args_endpoint_config.h" 21 #include "src/core/lib/gprpp/time.h" 22 #include "src/core/lib/iomgr/tcp_client.h" 23 24 namespace grpc { 25 namespace testing { 26 27 // Allows injecting connection-establishment delays into C-core. 28 // Typical usage: 29 // 30 // // At grpc_init() time. 31 // ConnectionAttemptInjector::Init(); 32 // 33 // // Instantiate when injection is desired. 34 // ConnectionAttemptInjector injector; 35 // 36 // // To inject a hold for the next connection attempt for a given port. 37 // auto hold = injector.AddHold(port); 38 // hold.Wait(); 39 // // ...do stuff... 40 // hold.Resume(); // Or hold.Fail() if you want to force a failure. 41 // 42 // // Inject a fixed delay for all connection attempts. 43 // injector.SetDelay(grpc_core::Duration::Seconds(10)); 44 // 45 // The injection is global, so there must be only one ConnectionAttemptInjector 46 // object at any one time. 47 // 48 // Note: This must be "final" to avoid tsan problems in both the ctor 49 // and dtor related to initializing the vtable. 50 class ConnectionAttemptInjector final { 51 private: 52 // Forward declarations. 53 class QueuedAttempt; 54 55 grpc_core::Mutex mu_; 56 57 public: 58 class Hold { 59 public: 60 // Do not instantiate directly -- must be created via AddHold(). 61 Hold(ConnectionAttemptInjector* injector, int port, 62 bool intercept_completion); 63 64 // Waits for the connection attempt to start. 65 // After this returns, exactly one of Resume() or Fail() must be called. 66 void Wait(); 67 68 // Resumes a connection attempt. Must be called after Wait(). 69 void Resume(); 70 71 // Fails a connection attempt. Must be called after Wait(). 72 void Fail(grpc_error_handle error); 73 74 // If the hold was created with intercept_completion=true, then this 75 // can be called after Resume() to wait for the connection attempt 76 // to complete. 77 void WaitForCompletion(); 78 79 // Returns true if the connection attempt has been started. 80 bool IsStarted(); 81 82 private: 83 friend class ConnectionAttemptInjector; 84 85 static void OnComplete(void* arg, grpc_error_handle error); 86 87 ConnectionAttemptInjector* injector_; 88 const int port_; 89 const bool intercept_completion_; 90 std::unique_ptr<QueuedAttempt> queued_attempt_ 91 ABSL_GUARDED_BY(&ConnectionAttemptInjector::mu_); 92 grpc_core::CondVar start_cv_; 93 grpc_closure on_complete_; 94 grpc_closure* original_on_complete_; 95 grpc_core::CondVar complete_cv_; 96 }; 97 98 // Global initializer. Replaces the iomgr TCP client vtable. 99 // Must be called exactly once after grpc_init() but before any TCP 100 // connections are established. 101 static void Init(); 102 103 ConnectionAttemptInjector(); 104 ~ConnectionAttemptInjector(); 105 106 // Adds a hold for a given port. The caller may then use Wait() on 107 // the resulting Hold object to wait for the connection attempt to start. 108 // If intercept_completion is true, the caller can use WaitForCompletion() 109 // on the resulting Hold object. 110 std::unique_ptr<Hold> AddHold(int port, bool intercept_completion = false); 111 112 // Set a fixed delay for all RPCs. Will be used only if there is no 113 // hold for the connection attempt. 114 void SetDelay(grpc_core::Duration delay); 115 116 private: 117 static grpc_tcp_client_vtable kDelayedConnectVTable; 118 119 // Represents a queued attempt. 120 // The caller must invoke either Resume() or Fail() before destroying. 121 class QueuedAttempt { 122 public: 123 QueuedAttempt(grpc_closure* closure, grpc_endpoint** ep, 124 grpc_pollset_set* interested_parties, 125 const grpc_event_engine::experimental::EndpointConfig& config, 126 const grpc_resolved_address* addr, 127 grpc_core::Timestamp deadline); 128 ~QueuedAttempt(); 129 130 // Caller must invoke this from a thread with an ExecCtx. 131 void Resume(); 132 133 // Caller must invoke this from a thread with an ExecCtx. 134 void Fail(grpc_error_handle error); 135 136 private: 137 grpc_closure* closure_; 138 grpc_endpoint** endpoint_; 139 grpc_pollset_set* interested_parties_; 140 grpc_event_engine::experimental::ChannelArgsEndpointConfig config_; 141 grpc_resolved_address address_; 142 grpc_core::Timestamp deadline_; 143 }; 144 145 // Injects a delay before continuing a connection attempt. 146 class InjectedDelay { 147 public: 148 virtual ~InjectedDelay() = default; 149 150 InjectedDelay(grpc_core::Duration duration, grpc_closure* closure, 151 grpc_endpoint** ep, grpc_pollset_set* interested_parties, 152 const grpc_event_engine::experimental::EndpointConfig& config, 153 const grpc_resolved_address* addr, 154 grpc_core::Timestamp deadline); 155 156 private: 157 void TimerCallback(); 158 159 QueuedAttempt attempt_; 160 }; 161 162 // Invoked for every TCP connection attempt. 163 void HandleConnection( 164 grpc_closure* closure, grpc_endpoint** ep, 165 grpc_pollset_set* interested_parties, 166 const grpc_event_engine::experimental::EndpointConfig& config, 167 const grpc_resolved_address* addr, grpc_core::Timestamp deadline); 168 169 static void AttemptConnection( 170 grpc_closure* closure, grpc_endpoint** ep, 171 grpc_pollset_set* interested_parties, 172 const grpc_event_engine::experimental::EndpointConfig& config, 173 const grpc_resolved_address* addr, grpc_core::Timestamp deadline); 174 175 // Replacement iomgr tcp_connect vtable functions that use the current 176 // ConnectionAttemptInjector object. 177 static int64_t TcpConnect( 178 grpc_closure* closure, grpc_endpoint** ep, 179 grpc_pollset_set* interested_parties, 180 const grpc_event_engine::experimental::EndpointConfig& config, 181 const grpc_resolved_address* addr, grpc_core::Timestamp deadline); 182 static bool TcpConnectCancel(int64_t connection_handle); 183 184 std::vector<Hold*> holds_ ABSL_GUARDED_BY(&mu_); 185 absl::optional<grpc_core::Duration> delay_ ABSL_GUARDED_BY(&mu_); 186 }; 187 188 } // namespace testing 189 } // namespace grpc 190 191 #endif // GRPC_TEST_CPP_END2END_CONNECTION_ATTEMPT_INJECTOR_H 192