1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/core/util/port_server_client.h"
20
21 #include <stdint.h>
22 #include <stdlib.h>
23 #include <string.h>
24
25 #include <cmath>
26 #include <memory>
27 #include <string>
28 #include <utility>
29
30 #include "absl/status/statusor.h"
31 #include "absl/strings/str_format.h"
32
33 #include <grpc/grpc.h>
34 #include <grpc/grpc_security.h>
35 #include <grpc/support/alloc.h>
36 #include <grpc/support/log.h>
37 #include <grpc/support/port_platform.h>
38 #include <grpc/support/sync.h>
39 #include <grpc/support/time.h>
40
41 #include "src/core/lib/gprpp/orphanable.h"
42 #include "src/core/lib/gprpp/ref_counted_ptr.h"
43 #include "src/core/lib/gprpp/status_helper.h"
44 #include "src/core/lib/gprpp/time.h"
45 #include "src/core/lib/http/httpcli.h"
46 #include "src/core/lib/http/parser.h"
47 #include "src/core/lib/iomgr/closure.h"
48 #include "src/core/lib/iomgr/error.h"
49 #include "src/core/lib/iomgr/exec_ctx.h"
50 #include "src/core/lib/iomgr/iomgr_fwd.h"
51 #include "src/core/lib/iomgr/polling_entity.h"
52 #include "src/core/lib/iomgr/pollset.h"
53 #include "src/core/lib/security/credentials/credentials.h"
54 #include "src/core/lib/uri/uri_parser.h"
55
56 typedef struct freereq {
57 gpr_mu* mu = nullptr;
58 grpc_polling_entity pops = {};
59 int done = 0;
60 } freereq;
61
destroy_pops_and_shutdown(void * p,grpc_error_handle)62 static void destroy_pops_and_shutdown(void* p, grpc_error_handle /*error*/) {
63 grpc_pollset* pollset =
64 grpc_polling_entity_pollset(static_cast<grpc_polling_entity*>(p));
65 grpc_pollset_destroy(pollset);
66 gpr_free(pollset);
67 }
68
freed_port_from_server(void * arg,grpc_error_handle)69 static void freed_port_from_server(void* arg, grpc_error_handle /*error*/) {
70 freereq* pr = static_cast<freereq*>(arg);
71 gpr_mu_lock(pr->mu);
72 pr->done = 1;
73 GRPC_LOG_IF_ERROR(
74 "pollset_kick",
75 grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
76 gpr_mu_unlock(pr->mu);
77 }
78
grpc_free_port_using_server(int port)79 void grpc_free_port_using_server(int port) {
80 grpc_http_request req;
81 grpc_http_response rsp;
82 freereq pr;
83 grpc_closure* shutdown_closure;
84
85 grpc_init();
86 {
87 grpc_core::ExecCtx exec_ctx;
88
89 pr = {};
90 memset(&req, 0, sizeof(req));
91 rsp = {};
92
93 grpc_pollset* pollset =
94 static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
95 grpc_pollset_init(pollset, &pr.mu);
96 pr.pops = grpc_polling_entity_create_from_pollset(pollset);
97 shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops,
98 grpc_schedule_on_exec_ctx);
99
100 std::string path = absl::StrFormat("/drop/%d", port);
101 auto uri = grpc_core::URI::Create("https", GRPC_PORT_SERVER_ADDRESS, path,
102 {} /* query params */, "" /* fragment */);
103 GPR_ASSERT(uri.ok());
104 auto http_request = grpc_core::HttpRequest::Get(
105 std::move(*uri), nullptr /* channel args */, &pr.pops, &req,
106 grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(30),
107 GRPC_CLOSURE_CREATE(freed_port_from_server, &pr,
108 grpc_schedule_on_exec_ctx),
109 &rsp,
110 grpc_core::RefCountedPtr<grpc_channel_credentials>(
111 grpc_insecure_credentials_create()));
112 http_request->Start();
113 grpc_core::ExecCtx::Get()->Flush();
114 gpr_mu_lock(pr.mu);
115 while (!pr.done) {
116 grpc_pollset_worker* worker = nullptr;
117 if (!GRPC_LOG_IF_ERROR(
118 "pollset_work",
119 grpc_pollset_work(grpc_polling_entity_pollset(&pr.pops), &worker,
120 grpc_core::Timestamp::Now() +
121 grpc_core::Duration::Seconds(1)))) {
122 pr.done = 1;
123 }
124 }
125 gpr_mu_unlock(pr.mu);
126
127 grpc_pollset_shutdown(grpc_polling_entity_pollset(&pr.pops),
128 shutdown_closure);
129
130 grpc_http_response_destroy(&rsp);
131 }
132 grpc_shutdown();
133 }
134
135 typedef struct portreq {
136 gpr_mu* mu = nullptr;
137 grpc_polling_entity pops = {};
138 int port = 0;
139 int retries = 0;
140 char* server = nullptr;
141 grpc_http_response response = {};
142 grpc_core::OrphanablePtr<grpc_core::HttpRequest> http_request;
143 } portreq;
144
got_port_from_server(void * arg,grpc_error_handle error)145 static void got_port_from_server(void* arg, grpc_error_handle error) {
146 size_t i;
147 int port = 0;
148 portreq* pr = static_cast<portreq*>(arg);
149 pr->http_request.reset();
150 int failed = 0;
151 grpc_http_response* response = &pr->response;
152
153 if (!error.ok()) {
154 failed = 1;
155 gpr_log(GPR_DEBUG, "failed port pick from server: retrying [%s]",
156 grpc_core::StatusToString(error).c_str());
157 } else if (response->status != 200) {
158 failed = 1;
159 gpr_log(GPR_DEBUG, "failed port pick from server: status=%d",
160 response->status);
161 }
162
163 if (failed) {
164 grpc_http_request req;
165 memset(&req, 0, sizeof(req));
166 if (pr->retries >= 5) {
167 gpr_mu_lock(pr->mu);
168 pr->port = 0;
169 GRPC_LOG_IF_ERROR(
170 "pollset_kick",
171 grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
172 gpr_mu_unlock(pr->mu);
173 return;
174 }
175 GPR_ASSERT(pr->retries < 10);
176 gpr_sleep_until(gpr_time_add(
177 gpr_now(GPR_CLOCK_REALTIME),
178 gpr_time_from_millis(
179 static_cast<int64_t>(
180 1000.0 * (1 + pow(1.3, pr->retries) * rand() / RAND_MAX)),
181 GPR_TIMESPAN)));
182 pr->retries++;
183 grpc_http_response_destroy(&pr->response);
184 pr->response = {};
185 auto uri = grpc_core::URI::Create("http", pr->server, "/get",
186 {} /* query params */, "" /* fragment */);
187 GPR_ASSERT(uri.ok());
188 pr->http_request = grpc_core::HttpRequest::Get(
189 std::move(*uri), nullptr /* channel args */, &pr->pops, &req,
190 grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(30),
191 GRPC_CLOSURE_CREATE(got_port_from_server, pr,
192 grpc_schedule_on_exec_ctx),
193 &pr->response,
194 grpc_core::RefCountedPtr<grpc_channel_credentials>(
195 grpc_insecure_credentials_create()));
196 pr->http_request->Start();
197 return;
198 }
199 GPR_ASSERT(response);
200 GPR_ASSERT(response->status == 200);
201 for (i = 0; i < response->body_length; i++) {
202 GPR_ASSERT(response->body[i] >= '0' && response->body[i] <= '9');
203 port = port * 10 + response->body[i] - '0';
204 }
205 GPR_ASSERT(port > 1024);
206 gpr_mu_lock(pr->mu);
207 pr->port = port;
208 GRPC_LOG_IF_ERROR(
209 "pollset_kick",
210 grpc_pollset_kick(grpc_polling_entity_pollset(&pr->pops), nullptr));
211 gpr_mu_unlock(pr->mu);
212 }
213
grpc_pick_port_using_server(void)214 int grpc_pick_port_using_server(void) {
215 grpc_http_request req;
216 portreq pr;
217 grpc_closure* shutdown_closure;
218
219 grpc_init();
220 {
221 grpc_core::ExecCtx exec_ctx;
222 pr = {};
223 memset(&req, 0, sizeof(req));
224 grpc_pollset* pollset =
225 static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
226 grpc_pollset_init(pollset, &pr.mu);
227 pr.pops = grpc_polling_entity_create_from_pollset(pollset);
228 shutdown_closure = GRPC_CLOSURE_CREATE(destroy_pops_and_shutdown, &pr.pops,
229 grpc_schedule_on_exec_ctx);
230 pr.port = -1;
231 pr.server = const_cast<char*>(GRPC_PORT_SERVER_ADDRESS);
232 auto uri = grpc_core::URI::Create("http", GRPC_PORT_SERVER_ADDRESS, "/get",
233 {} /* query params */, "" /* fragment */);
234 GPR_ASSERT(uri.ok());
235 auto http_request = grpc_core::HttpRequest::Get(
236 std::move(*uri), nullptr /* channel args */, &pr.pops, &req,
237 grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(30),
238 GRPC_CLOSURE_CREATE(got_port_from_server, &pr,
239 grpc_schedule_on_exec_ctx),
240 &pr.response,
241 grpc_core::RefCountedPtr<grpc_channel_credentials>(
242 grpc_insecure_credentials_create()));
243 http_request->Start();
244 grpc_core::ExecCtx::Get()->Flush();
245 gpr_mu_lock(pr.mu);
246 while (pr.port == -1) {
247 grpc_pollset_worker* worker = nullptr;
248 if (!GRPC_LOG_IF_ERROR(
249 "pollset_work",
250 grpc_pollset_work(grpc_polling_entity_pollset(&pr.pops), &worker,
251 grpc_core::Timestamp::Now() +
252 grpc_core::Duration::Seconds(1)))) {
253 pr.port = 0;
254 }
255 }
256 gpr_mu_unlock(pr.mu);
257
258 grpc_http_response_destroy(&pr.response);
259 grpc_pollset_shutdown(grpc_polling_entity_pollset(&pr.pops),
260 shutdown_closure);
261
262 grpc_core::ExecCtx::Get()->Flush();
263 }
264 grpc_shutdown();
265
266 return pr.port;
267 }
268