xref: /aosp_15_r20/external/grpc-grpc/test/cpp/end2end/connection_attempt_injector.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2016 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_TEST_CPP_END2END_CONNECTION_ATTEMPT_INJECTOR_H
16 #define GRPC_TEST_CPP_END2END_CONNECTION_ATTEMPT_INJECTOR_H
17 
18 #include <memory>
19 
20 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
21 #include "src/core/lib/gprpp/time.h"
22 #include "src/core/lib/iomgr/tcp_client.h"
23 
24 namespace grpc {
25 namespace testing {
26 
27 // Allows injecting connection-establishment delays into C-core.
28 // Typical usage:
29 //
30 //  // At grpc_init() time.
31 //  ConnectionAttemptInjector::Init();
32 //
33 //  // Instantiate when injection is desired.
34 //  ConnectionAttemptInjector injector;
35 //
36 //  // To inject a hold for the next connection attempt for a given port.
37 //  auto hold = injector.AddHold(port);
38 //  hold.Wait();
39 //  // ...do stuff...
40 //  hold.Resume();  // Or hold.Fail() if you want to force a failure.
41 //
42 //  // Inject a fixed delay for all connection attempts.
43 //  injector.SetDelay(grpc_core::Duration::Seconds(10));
44 //
45 // The injection is global, so there must be only one ConnectionAttemptInjector
46 // object at any one time.
47 //
48 // Note: This must be "final" to avoid tsan problems in both the ctor
49 // and dtor related to initializing the vtable.
50 class ConnectionAttemptInjector final {
51  private:
52   // Forward declarations.
53   class QueuedAttempt;
54 
55   grpc_core::Mutex mu_;
56 
57  public:
58   class Hold {
59    public:
60     // Do not instantiate directly -- must be created via AddHold().
61     Hold(ConnectionAttemptInjector* injector, int port,
62          bool intercept_completion);
63 
64     // Waits for the connection attempt to start.
65     // After this returns, exactly one of Resume() or Fail() must be called.
66     void Wait();
67 
68     // Resumes a connection attempt.  Must be called after Wait().
69     void Resume();
70 
71     // Fails a connection attempt.  Must be called after Wait().
72     void Fail(grpc_error_handle error);
73 
74     // If the hold was created with intercept_completion=true, then this
75     // can be called after Resume() to wait for the connection attempt
76     // to complete.
77     void WaitForCompletion();
78 
79     // Returns true if the connection attempt has been started.
80     bool IsStarted();
81 
82    private:
83     friend class ConnectionAttemptInjector;
84 
85     static void OnComplete(void* arg, grpc_error_handle error);
86 
87     ConnectionAttemptInjector* injector_;
88     const int port_;
89     const bool intercept_completion_;
90     std::unique_ptr<QueuedAttempt> queued_attempt_
91         ABSL_GUARDED_BY(&ConnectionAttemptInjector::mu_);
92     grpc_core::CondVar start_cv_;
93     grpc_closure on_complete_;
94     grpc_closure* original_on_complete_;
95     grpc_core::CondVar complete_cv_;
96   };
97 
98   // Global initializer.  Replaces the iomgr TCP client vtable.
99   // Must be called exactly once after grpc_init() but before any TCP
100   // connections are established.
101   static void Init();
102 
103   ConnectionAttemptInjector();
104   ~ConnectionAttemptInjector();
105 
106   // Adds a hold for a given port.  The caller may then use Wait() on
107   // the resulting Hold object to wait for the connection attempt to start.
108   // If intercept_completion is true, the caller can use WaitForCompletion()
109   // on the resulting Hold object.
110   std::unique_ptr<Hold> AddHold(int port, bool intercept_completion = false);
111 
112   // Set a fixed delay for all RPCs.  Will be used only if there is no
113   // hold for the connection attempt.
114   void SetDelay(grpc_core::Duration delay);
115 
116  private:
117   static grpc_tcp_client_vtable kDelayedConnectVTable;
118 
119   // Represents a queued attempt.
120   // The caller must invoke either Resume() or Fail() before destroying.
121   class QueuedAttempt {
122    public:
123     QueuedAttempt(grpc_closure* closure, grpc_endpoint** ep,
124                   grpc_pollset_set* interested_parties,
125                   const grpc_event_engine::experimental::EndpointConfig& config,
126                   const grpc_resolved_address* addr,
127                   grpc_core::Timestamp deadline);
128     ~QueuedAttempt();
129 
130     // Caller must invoke this from a thread with an ExecCtx.
131     void Resume();
132 
133     // Caller must invoke this from a thread with an ExecCtx.
134     void Fail(grpc_error_handle error);
135 
136    private:
137     grpc_closure* closure_;
138     grpc_endpoint** endpoint_;
139     grpc_pollset_set* interested_parties_;
140     grpc_event_engine::experimental::ChannelArgsEndpointConfig config_;
141     grpc_resolved_address address_;
142     grpc_core::Timestamp deadline_;
143   };
144 
145   // Injects a delay before continuing a connection attempt.
146   class InjectedDelay {
147    public:
148     virtual ~InjectedDelay() = default;
149 
150     InjectedDelay(grpc_core::Duration duration, grpc_closure* closure,
151                   grpc_endpoint** ep, grpc_pollset_set* interested_parties,
152                   const grpc_event_engine::experimental::EndpointConfig& config,
153                   const grpc_resolved_address* addr,
154                   grpc_core::Timestamp deadline);
155 
156    private:
157     void TimerCallback();
158 
159     QueuedAttempt attempt_;
160   };
161 
162   // Invoked for every TCP connection attempt.
163   void HandleConnection(
164       grpc_closure* closure, grpc_endpoint** ep,
165       grpc_pollset_set* interested_parties,
166       const grpc_event_engine::experimental::EndpointConfig& config,
167       const grpc_resolved_address* addr, grpc_core::Timestamp deadline);
168 
169   static void AttemptConnection(
170       grpc_closure* closure, grpc_endpoint** ep,
171       grpc_pollset_set* interested_parties,
172       const grpc_event_engine::experimental::EndpointConfig& config,
173       const grpc_resolved_address* addr, grpc_core::Timestamp deadline);
174 
175   // Replacement iomgr tcp_connect vtable functions that use the current
176   // ConnectionAttemptInjector object.
177   static int64_t TcpConnect(
178       grpc_closure* closure, grpc_endpoint** ep,
179       grpc_pollset_set* interested_parties,
180       const grpc_event_engine::experimental::EndpointConfig& config,
181       const grpc_resolved_address* addr, grpc_core::Timestamp deadline);
182   static bool TcpConnectCancel(int64_t connection_handle);
183 
184   std::vector<Hold*> holds_ ABSL_GUARDED_BY(&mu_);
185   absl::optional<grpc_core::Duration> delay_ ABSL_GUARDED_BY(&mu_);
186 };
187 
188 }  // namespace testing
189 }  // namespace grpc
190 
191 #endif  // GRPC_TEST_CPP_END2END_CONNECTION_ATTEMPT_INJECTOR_H
192