1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <ruby/ruby.h>
20
21 #include "rb_server.h"
22
23 #include "rb_byte_buffer.h"
24 #include "rb_call.h"
25 #include "rb_channel_args.h"
26 #include "rb_completion_queue.h"
27 #include "rb_grpc.h"
28 #include "rb_grpc_imports.generated.h"
29 #include "rb_server_credentials.h"
30 #include "rb_xds_server_credentials.h"
31
32 #include <grpc/grpc.h>
33 #include <grpc/grpc_security.h>
34 #include <grpc/support/atm.h>
35 #include <grpc/support/log.h>
36
37 /* grpc_rb_cServer is the ruby class that proxies grpc_server. */
38 static VALUE grpc_rb_cServer = Qnil;
39
40 /* id_at is the constructor method of the ruby standard Time class. */
41 static ID id_at;
42
43 /* id_insecure_server is used to indicate that a server is insecure */
44 static VALUE id_insecure_server;
45
46 /* grpc_rb_server wraps a grpc_server. */
47 typedef struct grpc_rb_server {
48 /* The actual server */
49 grpc_server* wrapped;
50 grpc_completion_queue* queue;
51 int shutdown_and_notify_done;
52 int destroy_done;
53 } grpc_rb_server;
54
grpc_rb_server_maybe_shutdown_and_notify(grpc_rb_server * server,gpr_timespec deadline)55 static void grpc_rb_server_maybe_shutdown_and_notify(grpc_rb_server* server,
56 gpr_timespec deadline) {
57 grpc_event ev;
58 void* tag = &ev;
59 if (!server->shutdown_and_notify_done) {
60 server->shutdown_and_notify_done = 1;
61 if (server->wrapped != NULL) {
62 grpc_server_shutdown_and_notify(server->wrapped, server->queue, tag);
63 ev = rb_completion_queue_pluck(server->queue, tag, deadline, NULL);
64 if (ev.type == GRPC_QUEUE_TIMEOUT) {
65 grpc_server_cancel_all_calls(server->wrapped);
66 ev = rb_completion_queue_pluck(
67 server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
68 }
69 if (ev.type != GRPC_OP_COMPLETE) {
70 gpr_log(GPR_INFO,
71 "GRPC_RUBY: bad grpc_server_shutdown_and_notify result:%d",
72 ev.type);
73 }
74 }
75 }
76 }
77
grpc_rb_server_maybe_destroy(grpc_rb_server * server)78 static void grpc_rb_server_maybe_destroy(grpc_rb_server* server) {
79 // This can be started by app or implicitly by GC. Avoid a race between these.
80 if (!server->destroy_done) {
81 server->destroy_done = 1;
82 if (server->wrapped != NULL) {
83 grpc_server_destroy(server->wrapped);
84 grpc_rb_completion_queue_destroy(server->queue);
85 server->wrapped = NULL;
86 server->queue = NULL;
87 }
88 }
89 }
90
grpc_rb_server_free_internal(void * p)91 static void grpc_rb_server_free_internal(void* p) {
92 grpc_rb_server* svr = NULL;
93 gpr_timespec deadline;
94 if (p == NULL) {
95 return;
96 };
97 svr = (grpc_rb_server*)p;
98
99 deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
100 gpr_time_from_seconds(2, GPR_TIMESPAN));
101
102 grpc_rb_server_maybe_shutdown_and_notify(svr, deadline);
103 grpc_rb_server_maybe_destroy(svr);
104
105 xfree(p);
106 }
107
108 /* Destroys server instances. */
grpc_rb_server_free(void * p)109 static void grpc_rb_server_free(void* p) { grpc_rb_server_free_internal(p); }
110
111 static const rb_data_type_t grpc_rb_server_data_type = {
112 "grpc_server",
113 {GRPC_RB_GC_NOT_MARKED,
114 grpc_rb_server_free,
115 GRPC_RB_MEMSIZE_UNAVAILABLE,
116 {NULL, NULL}},
117 NULL,
118 NULL,
119 #ifdef RUBY_TYPED_FREE_IMMEDIATELY
120 /* It is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because the free
121 * function would block and we might want to unlock GVL
122 * TODO(yugui) Unlock GVL?
123 */
124 0,
125 #endif
126 };
127
128 /* Allocates grpc_rb_server instances. */
grpc_rb_server_alloc(VALUE cls)129 static VALUE grpc_rb_server_alloc(VALUE cls) {
130 grpc_ruby_init();
131 grpc_rb_server* wrapper = ALLOC(grpc_rb_server);
132 wrapper->wrapped = NULL;
133 wrapper->destroy_done = 0;
134 wrapper->shutdown_and_notify_done = 0;
135 return TypedData_Wrap_Struct(cls, &grpc_rb_server_data_type, wrapper);
136 }
137
138 /*
139 call-seq:
140 server = Server.new({'arg1': 'value1'})
141
142 Initializes server instances. */
grpc_rb_server_init(VALUE self,VALUE channel_args)143 static VALUE grpc_rb_server_init(VALUE self, VALUE channel_args) {
144 grpc_completion_queue* cq = NULL;
145 grpc_rb_server* wrapper = NULL;
146 grpc_server* srv = NULL;
147 grpc_channel_args args;
148 MEMZERO(&args, grpc_channel_args, 1);
149
150 cq = grpc_completion_queue_create_for_pluck(NULL);
151 TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type,
152 wrapper);
153 grpc_rb_hash_convert_to_channel_args(channel_args, &args);
154 srv = grpc_server_create(&args, NULL);
155 grpc_rb_channel_args_destroy(&args);
156 if (srv == NULL) {
157 rb_raise(rb_eRuntimeError, "could not create a gRPC server, not sure why");
158 }
159 grpc_server_register_completion_queue(srv, cq, NULL);
160 wrapper->wrapped = srv;
161 wrapper->queue = cq;
162
163 return self;
164 }
165
166 /* request_call_stack holds various values used by the
167 * grpc_rb_server_request_call function */
168 typedef struct request_call_stack {
169 grpc_call_details details;
170 grpc_metadata_array md_ary;
171 } request_call_stack;
172
173 /* grpc_request_call_stack_init ensures the request_call_stack is properly
174 * initialized */
grpc_request_call_stack_init(request_call_stack * st)175 static void grpc_request_call_stack_init(request_call_stack* st) {
176 MEMZERO(st, request_call_stack, 1);
177 grpc_metadata_array_init(&st->md_ary);
178 grpc_call_details_init(&st->details);
179 }
180
181 /* grpc_request_call_stack_cleanup ensures the request_call_stack is properly
182 * cleaned up */
grpc_request_call_stack_cleanup(request_call_stack * st)183 static void grpc_request_call_stack_cleanup(request_call_stack* st) {
184 grpc_metadata_array_destroy(&st->md_ary);
185 grpc_call_details_destroy(&st->details);
186 }
187
188 struct server_request_call_args {
189 grpc_rb_server* server;
190 grpc_completion_queue* call_queue;
191 request_call_stack st;
192 };
193
grpc_rb_server_request_call_try(VALUE value_args)194 static VALUE grpc_rb_server_request_call_try(VALUE value_args) {
195 grpc_rb_fork_unsafe_begin();
196 struct server_request_call_args* args =
197 (struct server_request_call_args*)value_args;
198
199 grpc_call* call = NULL;
200 void* tag = (void*)&args->st;
201
202 args->call_queue = grpc_completion_queue_create_for_pluck(NULL);
203 grpc_request_call_stack_init(&args->st);
204
205 /* call grpc_server_request_call, then wait for it to complete using
206 * pluck_event */
207 grpc_call_error err = grpc_server_request_call(
208 args->server->wrapped, &call, &args->st.details, &args->st.md_ary,
209 args->call_queue, args->server->queue, tag);
210 if (err != GRPC_CALL_OK) {
211 rb_raise(grpc_rb_eCallError,
212 "grpc_server_request_call failed: %s (code=%d)",
213 grpc_call_error_detail_of(err), err);
214 }
215
216 grpc_event ev = rb_completion_queue_pluck(
217 args->server->queue, tag, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
218 if (!ev.success) {
219 rb_raise(grpc_rb_eCallError, "request_call completion failed");
220 }
221
222 /* build the NewServerRpc struct result */
223 gpr_timespec deadline =
224 gpr_convert_clock_type(args->st.details.deadline, GPR_CLOCK_REALTIME);
225 VALUE result =
226 rb_struct_new(grpc_rb_sNewServerRpc,
227 grpc_rb_slice_to_ruby_string(args->st.details.method),
228 grpc_rb_slice_to_ruby_string(args->st.details.host),
229 rb_funcall(rb_cTime, id_at, 2, INT2NUM(deadline.tv_sec),
230 INT2NUM(deadline.tv_nsec / 1000)),
231 grpc_rb_md_ary_to_h(&args->st.md_ary),
232 grpc_rb_wrap_call(call, args->call_queue), NULL);
233 args->call_queue = NULL;
234 return result;
235 }
236
grpc_rb_server_request_call_ensure(VALUE value_args)237 static VALUE grpc_rb_server_request_call_ensure(VALUE value_args) {
238 grpc_rb_fork_unsafe_end();
239 struct server_request_call_args* args =
240 (struct server_request_call_args*)value_args;
241
242 if (args->call_queue) {
243 grpc_rb_completion_queue_destroy(args->call_queue);
244 }
245
246 grpc_request_call_stack_cleanup(&args->st);
247
248 return Qnil;
249 }
250
251 /* call-seq:
252 server.request_call
253
254 Requests notification of a new call on a server. */
grpc_rb_server_request_call(VALUE self)255 static VALUE grpc_rb_server_request_call(VALUE self) {
256 grpc_rb_server* s;
257 TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
258 grpc_ruby_fork_guard();
259 if (s->wrapped == NULL) {
260 rb_raise(rb_eRuntimeError, "destroyed!");
261 }
262 struct server_request_call_args args = {.server = s, .call_queue = NULL};
263 return rb_ensure(grpc_rb_server_request_call_try, (VALUE)&args,
264 grpc_rb_server_request_call_ensure, (VALUE)&args);
265 }
266
grpc_rb_server_start(VALUE self)267 static VALUE grpc_rb_server_start(VALUE self) {
268 grpc_rb_server* s = NULL;
269 TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
270 grpc_ruby_fork_guard();
271 if (s->wrapped == NULL) {
272 rb_raise(rb_eRuntimeError, "destroyed!");
273 } else {
274 grpc_server_start(s->wrapped);
275 }
276 return Qnil;
277 }
278
grpc_rb_server_shutdown_and_notify(VALUE self,VALUE timeout)279 static VALUE grpc_rb_server_shutdown_and_notify(VALUE self, VALUE timeout) {
280 gpr_timespec deadline;
281 grpc_rb_server* s = NULL;
282
283 TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
284 if (TYPE(timeout) == T_NIL) {
285 deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
286 } else {
287 deadline = grpc_rb_time_timeval(timeout, /* absolute time*/ 0);
288 }
289
290 grpc_rb_server_maybe_shutdown_and_notify(s, deadline);
291
292 return Qnil;
293 }
294
295 /*
296 call-seq:
297 server = Server.new({'arg1': 'value1'})
298 ... // do stuff with server
299 ...
300 ... // initiate server shutdown
301 server.shutdown_and_notify(timeout)
302 ... // to shutdown the server
303 server.destroy()
304
305 Destroys server instances. */
grpc_rb_server_destroy(VALUE self)306 static VALUE grpc_rb_server_destroy(VALUE self) {
307 grpc_rb_server* s = NULL;
308 TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
309 grpc_rb_server_maybe_destroy(s);
310 return Qnil;
311 }
312
313 /*
314 call-seq:
315 // insecure port
316 insecure_server = Server.new(cq, {'arg1': 'value1'})
317 insecure_server.add_http2_port('mydomain:50051', :this_port_is_insecure)
318
319 // secure port
320 server_creds = ...
321 secure_server = Server.new(cq, {'arg1': 'value1'})
322 secure_server.add_http_port('mydomain:50051', server_creds)
323
324 Adds a http2 port to server */
grpc_rb_server_add_http2_port(VALUE self,VALUE port,VALUE rb_creds)325 static VALUE grpc_rb_server_add_http2_port(VALUE self, VALUE port,
326 VALUE rb_creds) {
327 grpc_rb_server* s = NULL;
328 grpc_server_credentials* creds = NULL;
329 int recvd_port = 0;
330
331 TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type, s);
332 if (s->wrapped == NULL) {
333 rb_raise(rb_eRuntimeError, "destroyed!");
334 return Qnil;
335 } else if (TYPE(rb_creds) == T_SYMBOL) {
336 if (id_insecure_server != SYM2ID(rb_creds)) {
337 rb_raise(rb_eTypeError, "bad creds symbol, want :this_port_is_insecure");
338 return Qnil;
339 }
340 grpc_server_credentials* insecure_creds =
341 grpc_insecure_server_credentials_create();
342 recvd_port = grpc_server_add_http2_port(s->wrapped, StringValueCStr(port),
343 insecure_creds);
344 grpc_server_credentials_release(insecure_creds);
345 if (recvd_port == 0) {
346 rb_raise(rb_eRuntimeError,
347 "could not add port %s to server, not sure why",
348 StringValueCStr(port));
349 }
350 } else {
351 // TODO: create a common parent class for all server-side credentials,
352 // then we can have a single method to retrieve the underlying
353 // grpc_server_credentials object, and avoid the need for this reflection
354 if (grpc_rb_is_server_credentials(rb_creds)) {
355 creds = grpc_rb_get_wrapped_server_credentials(rb_creds);
356 } else if (grpc_rb_is_xds_server_credentials(rb_creds)) {
357 creds = grpc_rb_get_wrapped_xds_server_credentials(rb_creds);
358 } else {
359 rb_raise(rb_eTypeError,
360 "failed to create server because credentials parameter has an "
361 "invalid type, want ServerCredentials or XdsServerCredentials");
362 }
363 recvd_port =
364 grpc_server_add_http2_port(s->wrapped, StringValueCStr(port), creds);
365 if (recvd_port == 0) {
366 rb_raise(rb_eRuntimeError,
367 "could not add secure port %s to server, not sure why",
368 StringValueCStr(port));
369 }
370 }
371 return INT2NUM(recvd_port);
372 }
373
Init_grpc_server()374 void Init_grpc_server() {
375 grpc_rb_cServer =
376 rb_define_class_under(grpc_rb_mGrpcCore, "Server", rb_cObject);
377
378 /* Allocates an object managed by the ruby runtime */
379 rb_define_alloc_func(grpc_rb_cServer, grpc_rb_server_alloc);
380
381 /* Provides a ruby constructor and support for dup/clone. */
382 rb_define_method(grpc_rb_cServer, "initialize", grpc_rb_server_init, 1);
383 rb_define_method(grpc_rb_cServer, "initialize_copy", grpc_rb_cannot_init_copy,
384 1);
385
386 /* Add the server methods. */
387 rb_define_method(grpc_rb_cServer, "request_call", grpc_rb_server_request_call,
388 0);
389 rb_define_method(grpc_rb_cServer, "start", grpc_rb_server_start, 0);
390 rb_define_method(grpc_rb_cServer, "shutdown_and_notify",
391 grpc_rb_server_shutdown_and_notify, 1);
392 rb_define_method(grpc_rb_cServer, "destroy", grpc_rb_server_destroy, 0);
393 rb_define_alias(grpc_rb_cServer, "close", "destroy");
394 rb_define_method(grpc_rb_cServer, "add_http2_port",
395 grpc_rb_server_add_http2_port, 2);
396 id_at = rb_intern("at");
397 id_insecure_server = rb_intern("this_port_is_insecure");
398 }
399
400 /* Gets the wrapped server from the ruby wrapper */
grpc_rb_get_wrapped_server(VALUE v)401 grpc_server* grpc_rb_get_wrapped_server(VALUE v) {
402 grpc_rb_server* wrapper = NULL;
403 TypedData_Get_Struct(v, grpc_rb_server, &grpc_rb_server_data_type, wrapper);
404 return wrapper->wrapped;
405 }
406