xref: /aosp_15_r20/external/grpc-grpc/src/ruby/ext/grpc/rb_call.c (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <ruby/ruby.h>
20 
21 #include "rb_call.h"
22 
23 #include "rb_byte_buffer.h"
24 #include "rb_call_credentials.h"
25 #include "rb_completion_queue.h"
26 #include "rb_grpc.h"
27 #include "rb_grpc_imports.generated.h"
28 
29 #include <grpc/grpc.h>
30 #include <grpc/impl/codegen/compression_types.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/log.h>
33 
34 /* grpc_rb_cCall is the Call class whose instances proxy grpc_call. */
35 static VALUE grpc_rb_cCall;
36 
37 /* grpc_rb_eCallError is the ruby class of the exception thrown during call
38    operations; */
39 VALUE grpc_rb_eCallError = Qnil;
40 
41 /* grpc_rb_eOutOfTime is the ruby class of the exception thrown to indicate
42    a timeout. */
43 static VALUE grpc_rb_eOutOfTime = Qnil;
44 
45 /* grpc_rb_sBatchResult is struct class used to hold the results of a batch
46  * call. */
47 static VALUE grpc_rb_sBatchResult;
48 
49 /* grpc_rb_cMdAry is the MetadataArray class whose instances proxy
50  * grpc_metadata_array. */
51 VALUE grpc_rb_cMdAry;
52 
53 /* id_credentials is the name of the hidden ivar that preserves the value
54  * of the credentials added to the call */
55 static ID id_credentials;
56 
57 /* id_metadata is name of the attribute used to access the metadata hash
58  * received by the call and subsequently saved on it. */
59 static ID id_metadata;
60 
61 /* id_trailing_metadata is the name of the attribute used to access the trailing
62  * metadata hash received by the call and subsequently saved on it. */
63 static ID id_trailing_metadata;
64 
65 /* id_status is name of the attribute used to access the status object
66  * received by the call and subsequently saved on it. */
67 static ID id_status;
68 
69 /* id_write_flag is name of the attribute used to access the write_flag
70  * saved on the call. */
71 static ID id_write_flag;
72 
73 /* sym_* are the symbol for attributes of grpc_rb_sBatchResult. */
74 static VALUE sym_send_message;
75 static VALUE sym_send_metadata;
76 static VALUE sym_send_close;
77 static VALUE sym_send_status;
78 static VALUE sym_message;
79 static VALUE sym_status;
80 static VALUE sym_cancelled;
81 
82 typedef struct grpc_rb_call {
83   grpc_call* wrapped;
84   grpc_completion_queue* queue;
85 } grpc_rb_call;
86 
destroy_call(grpc_rb_call * call)87 static void destroy_call(grpc_rb_call* call) {
88   /* Ensure that we only try to destroy the call once */
89   if (call->wrapped != NULL) {
90     grpc_call_unref(call->wrapped);
91     call->wrapped = NULL;
92     grpc_rb_completion_queue_destroy(call->queue);
93     call->queue = NULL;
94   }
95 }
96 
97 /* Destroys a Call. */
grpc_rb_call_destroy(void * p)98 static void grpc_rb_call_destroy(void* p) {
99   if (p == NULL) {
100     return;
101   }
102   destroy_call((grpc_rb_call*)p);
103   xfree(p);
104 }
105 
106 const rb_data_type_t grpc_rb_md_ary_data_type = {
107     "grpc_metadata_array",
108     {GRPC_RB_GC_NOT_MARKED,
109      GRPC_RB_GC_DONT_FREE,
110      GRPC_RB_MEMSIZE_UNAVAILABLE,
111      {NULL, NULL}},
112     NULL,
113     NULL,
114 #ifdef RUBY_TYPED_FREE_IMMEDIATELY
115     /* it is unsafe to specify RUBY_TYPED_FREE_IMMEDIATELY because
116      * grpc_rb_call_destroy
117      * touches a hash object.
118      * TODO(yugui) Directly use st_table and call the free function earlier?
119      */
120     0,
121 #endif
122 };
123 
124 /* Describes grpc_call struct for RTypedData */
125 static const rb_data_type_t grpc_call_data_type = {"grpc_call",
126                                                    {GRPC_RB_GC_NOT_MARKED,
127                                                     grpc_rb_call_destroy,
128                                                     GRPC_RB_MEMSIZE_UNAVAILABLE,
129                                                     {NULL, NULL}},
130                                                    NULL,
131                                                    NULL,
132 #ifdef RUBY_TYPED_FREE_IMMEDIATELY
133                                                    RUBY_TYPED_FREE_IMMEDIATELY
134 #endif
135 };
136 
137 /* Error code details is a hash containing text strings describing errors */
138 VALUE rb_error_code_details;
139 
140 /* Obtains the error detail string for given error code */
grpc_call_error_detail_of(grpc_call_error err)141 const char* grpc_call_error_detail_of(grpc_call_error err) {
142   VALUE detail_ref = rb_hash_aref(rb_error_code_details, UINT2NUM(err));
143   const char* detail = "unknown error code!";
144   if (detail_ref != Qnil) {
145     detail = StringValueCStr(detail_ref);
146   }
147   return detail;
148 }
149 
150 /* Called by clients to cancel an RPC on the server.
151    Can be called multiple times, from any thread. */
grpc_rb_call_cancel(VALUE self)152 static VALUE grpc_rb_call_cancel(VALUE self) {
153   grpc_rb_call* call = NULL;
154   grpc_call_error err;
155   if (RTYPEDDATA_DATA(self) == NULL) {
156     // This call has been closed
157     return Qnil;
158   }
159 
160   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
161   err = grpc_call_cancel(call->wrapped, NULL);
162   if (err != GRPC_CALL_OK) {
163     rb_raise(grpc_rb_eCallError, "cancel failed: %s (code=%d)",
164              grpc_call_error_detail_of(err), err);
165   }
166 
167   return Qnil;
168 }
169 
170 /* TODO: expose this as part of the surface API if needed.
171  * This is meant for internal usage by the "write thread" of grpc-ruby
172  * client-side bidi calls. It provides a way for the background write-thread
173  * to propagate failures to the main read-thread and give the user an error
174  * message. */
grpc_rb_call_cancel_with_status(VALUE self,VALUE status_code,VALUE details)175 static VALUE grpc_rb_call_cancel_with_status(VALUE self, VALUE status_code,
176                                              VALUE details) {
177   grpc_rb_call* call = NULL;
178   grpc_call_error err;
179   if (RTYPEDDATA_DATA(self) == NULL) {
180     // This call has been closed
181     return Qnil;
182   }
183 
184   if (TYPE(details) != T_STRING || TYPE(status_code) != T_FIXNUM) {
185     rb_raise(rb_eTypeError,
186              "Bad parameter type error for cancel with status. Want Fixnum, "
187              "String.");
188     return Qnil;
189   }
190 
191   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
192   err = grpc_call_cancel_with_status(call->wrapped, NUM2LONG(status_code),
193                                      StringValueCStr(details), NULL);
194   if (err != GRPC_CALL_OK) {
195     rb_raise(grpc_rb_eCallError, "cancel with status failed: %s (code=%d)",
196              grpc_call_error_detail_of(err), err);
197   }
198 
199   return Qnil;
200 }
201 
202 /* Releases the c-level resources associated with a call
203    Once a call has been closed, no further requests can be
204    processed.
205 */
grpc_rb_call_close(VALUE self)206 static VALUE grpc_rb_call_close(VALUE self) {
207   grpc_rb_call* call = NULL;
208   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
209   if (call != NULL) {
210     destroy_call(call);
211     xfree(RTYPEDDATA_DATA(self));
212     RTYPEDDATA_DATA(self) = NULL;
213   }
214   return Qnil;
215 }
216 
217 /* Called to obtain the peer that this call is connected to. */
grpc_rb_call_get_peer(VALUE self)218 static VALUE grpc_rb_call_get_peer(VALUE self) {
219   VALUE res = Qnil;
220   grpc_rb_call* call = NULL;
221   char* peer = NULL;
222   if (RTYPEDDATA_DATA(self) == NULL) {
223     rb_raise(grpc_rb_eCallError, "Cannot get peer value on closed call");
224     return Qnil;
225   }
226   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
227   peer = grpc_call_get_peer(call->wrapped);
228   res = rb_str_new2(peer);
229   gpr_free(peer);
230 
231   return res;
232 }
233 
234 /* Called to obtain the x509 cert of an authenticated peer. */
grpc_rb_call_get_peer_cert(VALUE self)235 static VALUE grpc_rb_call_get_peer_cert(VALUE self) {
236   grpc_rb_call* call = NULL;
237   VALUE res = Qnil;
238   grpc_auth_context* ctx = NULL;
239   if (RTYPEDDATA_DATA(self) == NULL) {
240     rb_raise(grpc_rb_eCallError, "Cannot get peer cert on closed call");
241     return Qnil;
242   }
243   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
244 
245   ctx = grpc_call_auth_context(call->wrapped);
246 
247   if (!ctx || !grpc_auth_context_peer_is_authenticated(ctx)) {
248     return Qnil;
249   }
250 
251   {
252     grpc_auth_property_iterator it = grpc_auth_context_find_properties_by_name(
253         ctx, GRPC_X509_PEM_CERT_PROPERTY_NAME);
254     const grpc_auth_property* prop = grpc_auth_property_iterator_next(&it);
255     if (prop == NULL) {
256       return Qnil;
257     }
258 
259     res = rb_str_new2(prop->value);
260   }
261 
262   grpc_auth_context_release(ctx);
263 
264   return res;
265 }
266 
267 /*
268   call-seq:
269   status = call.status
270 
271   Gets the status object saved the call.  */
grpc_rb_call_get_status(VALUE self)272 static VALUE grpc_rb_call_get_status(VALUE self) {
273   return rb_ivar_get(self, id_status);
274 }
275 
276 /*
277   call-seq:
278   call.status = status
279 
280   Saves a status object on the call.  */
grpc_rb_call_set_status(VALUE self,VALUE status)281 static VALUE grpc_rb_call_set_status(VALUE self, VALUE status) {
282   if (!NIL_P(status) && rb_obj_class(status) != grpc_rb_sStatus) {
283     rb_raise(rb_eTypeError, "bad status: got:<%s> want: <Struct::Status>",
284              rb_obj_classname(status));
285     return Qnil;
286   }
287 
288   return rb_ivar_set(self, id_status, status);
289 }
290 
291 /*
292   call-seq:
293   metadata = call.metadata
294 
295   Gets the metadata object saved the call.  */
grpc_rb_call_get_metadata(VALUE self)296 static VALUE grpc_rb_call_get_metadata(VALUE self) {
297   return rb_ivar_get(self, id_metadata);
298 }
299 
300 /*
301   call-seq:
302   call.metadata = metadata
303 
304   Saves the metadata hash on the call.  */
grpc_rb_call_set_metadata(VALUE self,VALUE metadata)305 static VALUE grpc_rb_call_set_metadata(VALUE self, VALUE metadata) {
306   if (!NIL_P(metadata) && TYPE(metadata) != T_HASH) {
307     rb_raise(rb_eTypeError, "bad metadata: got:<%s> want: <Hash>",
308              rb_obj_classname(metadata));
309     return Qnil;
310   }
311 
312   return rb_ivar_set(self, id_metadata, metadata);
313 }
314 
315 /*
316   call-seq:
317   trailing_metadata = call.trailing_metadata
318 
319   Gets the trailing metadata object saved on the call */
grpc_rb_call_get_trailing_metadata(VALUE self)320 static VALUE grpc_rb_call_get_trailing_metadata(VALUE self) {
321   return rb_ivar_get(self, id_trailing_metadata);
322 }
323 
324 /*
325   call-seq:
326   call.trailing_metadata = trailing_metadata
327 
328   Saves the trailing metadata hash on the call. */
grpc_rb_call_set_trailing_metadata(VALUE self,VALUE metadata)329 static VALUE grpc_rb_call_set_trailing_metadata(VALUE self, VALUE metadata) {
330   if (!NIL_P(metadata) && TYPE(metadata) != T_HASH) {
331     rb_raise(rb_eTypeError, "bad metadata: got:<%s> want: <Hash>",
332              rb_obj_classname(metadata));
333     return Qnil;
334   }
335 
336   return rb_ivar_set(self, id_trailing_metadata, metadata);
337 }
338 
339 /*
340   call-seq:
341   write_flag = call.write_flag
342 
343   Gets the write_flag value saved the call.  */
grpc_rb_call_get_write_flag(VALUE self)344 static VALUE grpc_rb_call_get_write_flag(VALUE self) {
345   return rb_ivar_get(self, id_write_flag);
346 }
347 
348 /*
349   call-seq:
350   call.write_flag = write_flag
351 
352   Saves the write_flag on the call.  */
grpc_rb_call_set_write_flag(VALUE self,VALUE write_flag)353 static VALUE grpc_rb_call_set_write_flag(VALUE self, VALUE write_flag) {
354   if (!NIL_P(write_flag) && TYPE(write_flag) != T_FIXNUM) {
355     rb_raise(rb_eTypeError, "bad write_flag: got:<%s> want: <Fixnum>",
356              rb_obj_classname(write_flag));
357     return Qnil;
358   }
359 
360   return rb_ivar_set(self, id_write_flag, write_flag);
361 }
362 
363 /*
364   call-seq:
365   call.set_credentials call_credentials
366 
367   Sets credentials on a call */
grpc_rb_call_set_credentials(VALUE self,VALUE credentials)368 static VALUE grpc_rb_call_set_credentials(VALUE self, VALUE credentials) {
369   grpc_rb_call* call = NULL;
370   grpc_call_credentials* creds;
371   grpc_call_error err;
372   if (RTYPEDDATA_DATA(self) == NULL) {
373     rb_raise(grpc_rb_eCallError, "Cannot set credentials of closed call");
374     return Qnil;
375   }
376   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
377   creds = grpc_rb_get_wrapped_call_credentials(credentials);
378   err = grpc_call_set_credentials(call->wrapped, creds);
379   if (err != GRPC_CALL_OK) {
380     rb_raise(grpc_rb_eCallError,
381              "grpc_call_set_credentials failed with %s (code=%d)",
382              grpc_call_error_detail_of(err), err);
383   }
384   /* We need the credentials to be alive for as long as the call is alive,
385      but we don't care about destruction order. */
386   rb_ivar_set(self, id_credentials, credentials);
387   return Qnil;
388 }
389 
390 /* grpc_rb_md_ary_fill_hash_cb is the hash iteration callback used
391    to fill grpc_metadata_array.
392 
393    it's capacity should have been computed via a prior call to
394    grpc_rb_md_ary_capacity_hash_cb
395 */
grpc_rb_md_ary_fill_hash_cb(VALUE key,VALUE val,VALUE md_ary_obj)396 static int grpc_rb_md_ary_fill_hash_cb(VALUE key, VALUE val, VALUE md_ary_obj) {
397   grpc_metadata_array* md_ary = NULL;
398   long array_length;
399   long i;
400   grpc_slice key_slice;
401   grpc_slice value_slice;
402   char* tmp_str = NULL;
403 
404   if (TYPE(key) == T_SYMBOL) {
405     key_slice = grpc_slice_from_static_string(rb_id2name(SYM2ID(key)));
406   } else if (TYPE(key) == T_STRING) {
407     key_slice =
408         grpc_slice_from_copied_buffer(RSTRING_PTR(key), RSTRING_LEN(key));
409   } else {
410     rb_raise(rb_eTypeError,
411              "grpc_rb_md_ary_fill_hash_cb: bad type for key parameter");
412     return ST_STOP;
413   }
414 
415   if (!grpc_header_key_is_legal(key_slice)) {
416     tmp_str = grpc_slice_to_c_string(key_slice);
417     rb_raise(rb_eArgError,
418              "'%s' is an invalid header key, must match [a-z0-9-_.]+", tmp_str);
419     return ST_STOP;
420   }
421 
422   /* Construct a metadata object from key and value and add it */
423   TypedData_Get_Struct(md_ary_obj, grpc_metadata_array,
424                        &grpc_rb_md_ary_data_type, md_ary);
425 
426   if (TYPE(val) == T_ARRAY) {
427     array_length = RARRAY_LEN(val);
428     /* If the value is an array, add capacity for each value in the array */
429     for (i = 0; i < array_length; i++) {
430       value_slice = grpc_slice_from_copied_buffer(
431           RSTRING_PTR(rb_ary_entry(val, i)), RSTRING_LEN(rb_ary_entry(val, i)));
432       if (!grpc_is_binary_header(key_slice) &&
433           !grpc_header_nonbin_value_is_legal(value_slice)) {
434         // The value has invalid characters
435         tmp_str = grpc_slice_to_c_string(value_slice);
436         rb_raise(rb_eArgError, "Header value '%s' has invalid characters",
437                  tmp_str);
438         return ST_STOP;
439       }
440       GPR_ASSERT(md_ary->count < md_ary->capacity);
441       md_ary->metadata[md_ary->count].key = key_slice;
442       md_ary->metadata[md_ary->count].value = value_slice;
443       md_ary->count += 1;
444     }
445   } else if (TYPE(val) == T_STRING) {
446     value_slice =
447         grpc_slice_from_copied_buffer(RSTRING_PTR(val), RSTRING_LEN(val));
448     if (!grpc_is_binary_header(key_slice) &&
449         !grpc_header_nonbin_value_is_legal(value_slice)) {
450       // The value has invalid characters
451       tmp_str = grpc_slice_to_c_string(value_slice);
452       rb_raise(rb_eArgError, "Header value '%s' has invalid characters",
453                tmp_str);
454       return ST_STOP;
455     }
456     GPR_ASSERT(md_ary->count < md_ary->capacity);
457     md_ary->metadata[md_ary->count].key = key_slice;
458     md_ary->metadata[md_ary->count].value = value_slice;
459     md_ary->count += 1;
460   } else {
461     rb_raise(rb_eArgError, "Header values must be of type string or array");
462     return ST_STOP;
463   }
464   return ST_CONTINUE;
465 }
466 
467 /* grpc_rb_md_ary_capacity_hash_cb is the hash iteration callback used
468    to pre-compute the capacity a grpc_metadata_array.
469 */
grpc_rb_md_ary_capacity_hash_cb(VALUE key,VALUE val,VALUE md_ary_obj)470 static int grpc_rb_md_ary_capacity_hash_cb(VALUE key, VALUE val,
471                                            VALUE md_ary_obj) {
472   grpc_metadata_array* md_ary = NULL;
473 
474   (void)key;
475 
476   /* Construct a metadata object from key and value and add it */
477   TypedData_Get_Struct(md_ary_obj, grpc_metadata_array,
478                        &grpc_rb_md_ary_data_type, md_ary);
479 
480   if (TYPE(val) == T_ARRAY) {
481     /* If the value is an array, add capacity for each value in the array */
482     md_ary->capacity += RARRAY_LEN(val);
483   } else {
484     md_ary->capacity += 1;
485   }
486 
487   return ST_CONTINUE;
488 }
489 
490 /* grpc_rb_md_ary_convert converts a ruby metadata hash into
491    a grpc_metadata_array.
492    Note that this function may throw exceptions.
493 */
grpc_rb_md_ary_convert(VALUE md_ary_hash,grpc_metadata_array * md_ary)494 void grpc_rb_md_ary_convert(VALUE md_ary_hash, grpc_metadata_array* md_ary) {
495   VALUE md_ary_obj = Qnil;
496   if (md_ary_hash == Qnil) {
497     return; /* Do nothing if the expected has value is nil */
498   }
499   if (TYPE(md_ary_hash) != T_HASH) {
500     rb_raise(rb_eTypeError, "md_ary_convert: got <%s>, want <Hash>",
501              rb_obj_classname(md_ary_hash));
502     return;
503   }
504 
505   /* Initialize the array, compute it's capacity, then fill it. */
506   grpc_metadata_array_init(md_ary);
507   md_ary_obj =
508       TypedData_Wrap_Struct(grpc_rb_cMdAry, &grpc_rb_md_ary_data_type, md_ary);
509   rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_capacity_hash_cb, md_ary_obj);
510   md_ary->metadata = gpr_zalloc(md_ary->capacity * sizeof(grpc_metadata));
511   rb_hash_foreach(md_ary_hash, grpc_rb_md_ary_fill_hash_cb, md_ary_obj);
512 }
513 
514 /* Converts a metadata array to a hash. */
grpc_rb_md_ary_to_h(grpc_metadata_array * md_ary)515 VALUE grpc_rb_md_ary_to_h(grpc_metadata_array* md_ary) {
516   VALUE key = Qnil;
517   VALUE new_ary = Qnil;
518   VALUE value = Qnil;
519   VALUE result = rb_hash_new();
520   size_t i;
521 
522   for (i = 0; i < md_ary->count; i++) {
523     key = grpc_rb_slice_to_ruby_string(md_ary->metadata[i].key);
524     value = rb_hash_aref(result, key);
525     if (value == Qnil) {
526       value = grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value);
527       rb_hash_aset(result, key, value);
528     } else if (TYPE(value) == T_ARRAY) {
529       /* Add the string to the returned array */
530       rb_ary_push(value,
531                   grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value));
532     } else {
533       /* Add the current value with this key and the new one to an array */
534       new_ary = rb_ary_new();
535       rb_ary_push(new_ary, value);
536       rb_ary_push(new_ary,
537                   grpc_rb_slice_to_ruby_string(md_ary->metadata[i].value));
538       rb_hash_aset(result, key, new_ary);
539     }
540   }
541   return result;
542 }
543 
544 /* grpc_rb_call_check_op_keys_hash_cb is a hash iteration func that checks
545    each key of an ops hash is valid.
546 */
grpc_rb_call_check_op_keys_hash_cb(VALUE key,VALUE val,VALUE ops_ary)547 static int grpc_rb_call_check_op_keys_hash_cb(VALUE key, VALUE val,
548                                               VALUE ops_ary) {
549   (void)val;
550   /* Update the capacity; the value is an array, add capacity for each value in
551    * the array */
552   if (TYPE(key) != T_FIXNUM) {
553     rb_raise(rb_eTypeError, "invalid operation : got <%s>, want <Fixnum>",
554              rb_obj_classname(key));
555     return ST_STOP;
556   }
557   switch (NUM2INT(key)) {
558     case GRPC_OP_SEND_INITIAL_METADATA:
559     case GRPC_OP_SEND_MESSAGE:
560     case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
561     case GRPC_OP_SEND_STATUS_FROM_SERVER:
562     case GRPC_OP_RECV_INITIAL_METADATA:
563     case GRPC_OP_RECV_MESSAGE:
564     case GRPC_OP_RECV_STATUS_ON_CLIENT:
565     case GRPC_OP_RECV_CLOSE_ON_SERVER:
566       rb_ary_push(ops_ary, key);
567       return ST_CONTINUE;
568     default:
569       rb_raise(rb_eTypeError, "invalid operation : bad value %d", NUM2INT(key));
570   };
571   return ST_STOP;
572 }
573 
574 /* grpc_rb_op_update_status_from_server adds the values in a ruby status
575    struct to the 'send_status_from_server' portion of an op.
576 */
grpc_rb_op_update_status_from_server(grpc_op * op,grpc_metadata_array * md_ary,grpc_slice * send_status_details,VALUE status)577 static void grpc_rb_op_update_status_from_server(
578     grpc_op* op, grpc_metadata_array* md_ary, grpc_slice* send_status_details,
579     VALUE status) {
580   VALUE code = rb_struct_aref(status, sym_code);
581   VALUE details = rb_struct_aref(status, sym_details);
582   VALUE metadata_hash = rb_struct_aref(status, sym_metadata);
583 
584   /* TODO: add check to ensure status is the correct struct type */
585   if (TYPE(code) != T_FIXNUM) {
586     rb_raise(rb_eTypeError, "invalid code : got <%s>, want <Fixnum>",
587              rb_obj_classname(code));
588     return;
589   }
590   if (TYPE(details) != T_STRING) {
591     rb_raise(rb_eTypeError, "invalid details : got <%s>, want <String>",
592              rb_obj_classname(code));
593     return;
594   }
595 
596   *send_status_details =
597       grpc_slice_from_copied_buffer(RSTRING_PTR(details), RSTRING_LEN(details));
598 
599   op->data.send_status_from_server.status = NUM2INT(code);
600   op->data.send_status_from_server.status_details = send_status_details;
601   grpc_rb_md_ary_convert(metadata_hash, md_ary);
602   op->data.send_status_from_server.trailing_metadata_count = md_ary->count;
603   op->data.send_status_from_server.trailing_metadata = md_ary->metadata;
604 }
605 
606 /* run_batch_stack holds various values used by the
607  * grpc_rb_call_run_batch function */
608 typedef struct run_batch_stack {
609   /* The batch ops */
610   grpc_op ops[8]; /* 8 is the maximum number of operations */
611   size_t op_num;  /* tracks the last added operation */
612 
613   /* Data being sent */
614   grpc_metadata_array send_metadata;
615   grpc_metadata_array send_trailing_metadata;
616 
617   /* Data being received */
618   grpc_byte_buffer* recv_message;
619   grpc_metadata_array recv_metadata;
620   grpc_metadata_array recv_trailing_metadata;
621   int recv_cancelled;
622   grpc_status_code recv_status;
623   grpc_slice recv_status_details;
624   const char* recv_status_debug_error_string;
625   unsigned write_flag;
626   grpc_slice send_status_details;
627 } run_batch_stack;
628 
629 /* grpc_run_batch_stack_init ensures the run_batch_stack is properly
630  * initialized */
grpc_run_batch_stack_init(run_batch_stack * st,unsigned write_flag)631 static void grpc_run_batch_stack_init(run_batch_stack* st,
632                                       unsigned write_flag) {
633   MEMZERO(st, run_batch_stack, 1);
634   grpc_metadata_array_init(&st->send_metadata);
635   grpc_metadata_array_init(&st->send_trailing_metadata);
636   grpc_metadata_array_init(&st->recv_metadata);
637   grpc_metadata_array_init(&st->recv_trailing_metadata);
638   st->op_num = 0;
639   st->write_flag = write_flag;
640 }
641 
grpc_rb_metadata_array_destroy_including_entries(grpc_metadata_array * array)642 void grpc_rb_metadata_array_destroy_including_entries(
643     grpc_metadata_array* array) {
644   size_t i;
645   if (array->metadata) {
646     for (i = 0; i < array->count; i++) {
647       grpc_slice_unref(array->metadata[i].key);
648       grpc_slice_unref(array->metadata[i].value);
649     }
650   }
651   grpc_metadata_array_destroy(array);
652 }
653 
654 /* grpc_run_batch_stack_cleanup ensures the run_batch_stack is properly
655  * cleaned up */
grpc_run_batch_stack_cleanup(run_batch_stack * st)656 static void grpc_run_batch_stack_cleanup(run_batch_stack* st) {
657   size_t i = 0;
658 
659   grpc_rb_metadata_array_destroy_including_entries(&st->send_metadata);
660   grpc_rb_metadata_array_destroy_including_entries(&st->send_trailing_metadata);
661   grpc_metadata_array_destroy(&st->recv_metadata);
662   grpc_metadata_array_destroy(&st->recv_trailing_metadata);
663 
664   if (GRPC_SLICE_START_PTR(st->send_status_details) != NULL) {
665     grpc_slice_unref(st->send_status_details);
666   }
667 
668   if (GRPC_SLICE_START_PTR(st->recv_status_details) != NULL) {
669     grpc_slice_unref(st->recv_status_details);
670   }
671 
672   if (st->recv_message != NULL) {
673     grpc_byte_buffer_destroy(st->recv_message);
674   }
675 
676   for (i = 0; i < st->op_num; i++) {
677     if (st->ops[i].op == GRPC_OP_SEND_MESSAGE) {
678       grpc_byte_buffer_destroy(st->ops[i].data.send_message.send_message);
679     }
680   }
681 }
682 
683 /* grpc_run_batch_stack_fill_ops fills the run_batch_stack ops array from
684  * ops_hash */
grpc_run_batch_stack_fill_ops(run_batch_stack * st,VALUE ops_hash)685 static void grpc_run_batch_stack_fill_ops(run_batch_stack* st, VALUE ops_hash) {
686   VALUE this_op = Qnil;
687   VALUE this_value = Qnil;
688   VALUE ops_ary = rb_ary_new();
689   size_t i = 0;
690 
691   /* Create a ruby array with just the operation keys */
692   rb_hash_foreach(ops_hash, grpc_rb_call_check_op_keys_hash_cb, ops_ary);
693 
694   /* Fill the ops array */
695   for (i = 0; i < (size_t)RARRAY_LEN(ops_ary); i++) {
696     this_op = rb_ary_entry(ops_ary, i);
697     this_value = rb_hash_aref(ops_hash, this_op);
698     st->ops[st->op_num].flags = 0;
699     switch (NUM2INT(this_op)) {
700       case GRPC_OP_SEND_INITIAL_METADATA:
701         grpc_rb_md_ary_convert(this_value, &st->send_metadata);
702         st->ops[st->op_num].data.send_initial_metadata.count =
703             st->send_metadata.count;
704         st->ops[st->op_num].data.send_initial_metadata.metadata =
705             st->send_metadata.metadata;
706         break;
707       case GRPC_OP_SEND_MESSAGE:
708         st->ops[st->op_num].data.send_message.send_message =
709             grpc_rb_s_to_byte_buffer(RSTRING_PTR(this_value),
710                                      RSTRING_LEN(this_value));
711         st->ops[st->op_num].flags = st->write_flag;
712         break;
713       case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
714         break;
715       case GRPC_OP_SEND_STATUS_FROM_SERVER:
716         grpc_rb_op_update_status_from_server(
717             &st->ops[st->op_num], &st->send_trailing_metadata,
718             &st->send_status_details, this_value);
719         break;
720       case GRPC_OP_RECV_INITIAL_METADATA:
721         st->ops[st->op_num].data.recv_initial_metadata.recv_initial_metadata =
722             &st->recv_metadata;
723         break;
724       case GRPC_OP_RECV_MESSAGE:
725         st->ops[st->op_num].data.recv_message.recv_message = &st->recv_message;
726         break;
727       case GRPC_OP_RECV_STATUS_ON_CLIENT:
728         st->ops[st->op_num].data.recv_status_on_client.trailing_metadata =
729             &st->recv_trailing_metadata;
730         st->ops[st->op_num].data.recv_status_on_client.status =
731             &st->recv_status;
732         st->ops[st->op_num].data.recv_status_on_client.status_details =
733             &st->recv_status_details;
734         st->ops[st->op_num].data.recv_status_on_client.error_string =
735             &st->recv_status_debug_error_string;
736         break;
737       case GRPC_OP_RECV_CLOSE_ON_SERVER:
738         st->ops[st->op_num].data.recv_close_on_server.cancelled =
739             &st->recv_cancelled;
740         break;
741       default:
742         grpc_run_batch_stack_cleanup(st);
743         rb_raise(rb_eTypeError, "invalid operation : bad value %d",
744                  NUM2INT(this_op));
745     };
746     st->ops[st->op_num].op = (grpc_op_type)NUM2INT(this_op);
747     st->ops[st->op_num].reserved = NULL;
748     st->op_num++;
749   }
750 }
751 
752 /* grpc_run_batch_stack_build_result fills constructs a ruby BatchResult struct
753    after the results have run */
grpc_run_batch_stack_build_result(run_batch_stack * st)754 static VALUE grpc_run_batch_stack_build_result(run_batch_stack* st) {
755   size_t i = 0;
756   VALUE result = rb_struct_new(grpc_rb_sBatchResult, Qnil, Qnil, Qnil, Qnil,
757                                Qnil, Qnil, Qnil, Qnil, NULL);
758   for (i = 0; i < st->op_num; i++) {
759     switch (st->ops[i].op) {
760       case GRPC_OP_SEND_INITIAL_METADATA:
761         rb_struct_aset(result, sym_send_metadata, Qtrue);
762         break;
763       case GRPC_OP_SEND_MESSAGE:
764         rb_struct_aset(result, sym_send_message, Qtrue);
765         break;
766       case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
767         rb_struct_aset(result, sym_send_close, Qtrue);
768         break;
769       case GRPC_OP_SEND_STATUS_FROM_SERVER:
770         rb_struct_aset(result, sym_send_status, Qtrue);
771         break;
772       case GRPC_OP_RECV_INITIAL_METADATA:
773         rb_struct_aset(result, sym_metadata,
774                        grpc_rb_md_ary_to_h(&st->recv_metadata));
775       case GRPC_OP_RECV_MESSAGE:
776         rb_struct_aset(result, sym_message,
777                        grpc_rb_byte_buffer_to_s(st->recv_message));
778         break;
779       case GRPC_OP_RECV_STATUS_ON_CLIENT:
780         rb_struct_aset(
781             result, sym_status,
782             rb_struct_new(
783                 grpc_rb_sStatus, UINT2NUM(st->recv_status),
784                 (GRPC_SLICE_START_PTR(st->recv_status_details) == NULL
785                      ? Qnil
786                      : grpc_rb_slice_to_ruby_string(st->recv_status_details)),
787                 grpc_rb_md_ary_to_h(&st->recv_trailing_metadata),
788                 st->recv_status_debug_error_string == NULL
789                     ? Qnil
790                     : rb_str_new_cstr(st->recv_status_debug_error_string),
791                 NULL));
792         gpr_free((void*)st->recv_status_debug_error_string);
793         break;
794       case GRPC_OP_RECV_CLOSE_ON_SERVER:
795         rb_struct_aset(result, sym_send_close, Qtrue);
796         break;
797       default:
798         break;
799     }
800   }
801   return result;
802 }
803 
804 struct call_run_batch_args {
805   grpc_rb_call* call;
806   unsigned write_flag;
807   VALUE ops_hash;
808   run_batch_stack* st;
809 };
810 
grpc_rb_call_run_batch_try(VALUE value_args)811 static VALUE grpc_rb_call_run_batch_try(VALUE value_args) {
812   grpc_rb_fork_unsafe_begin();
813   struct call_run_batch_args* args = (struct call_run_batch_args*)value_args;
814   void* tag = (void*)&args->st;
815 
816   grpc_event ev;
817   grpc_call_error err;
818 
819   args->st = gpr_malloc(sizeof(run_batch_stack));
820   grpc_run_batch_stack_init(args->st, args->write_flag);
821   grpc_run_batch_stack_fill_ops(args->st, args->ops_hash);
822 
823   /* call grpc_call_start_batch, then wait for it to complete using
824    * pluck_event */
825   err = grpc_call_start_batch(args->call->wrapped, args->st->ops,
826                               args->st->op_num, tag, NULL);
827   if (err != GRPC_CALL_OK) {
828     rb_raise(grpc_rb_eCallError,
829              "grpc_call_start_batch failed with %s (code=%d)",
830              grpc_call_error_detail_of(err), err);
831   }
832   ev = rb_completion_queue_pluck(args->call->queue, tag,
833                                  gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
834   if (!ev.success) {
835     rb_raise(grpc_rb_eCallError, "call#run_batch failed somehow");
836   }
837   /* Build and return the BatchResult struct result,
838      if there is an error, it's reflected in the status */
839   return grpc_run_batch_stack_build_result(args->st);
840 }
841 
grpc_rb_call_run_batch_ensure(VALUE value_args)842 static VALUE grpc_rb_call_run_batch_ensure(VALUE value_args) {
843   grpc_rb_fork_unsafe_end();
844   struct call_run_batch_args* args = (struct call_run_batch_args*)value_args;
845 
846   if (args->st) {
847     grpc_run_batch_stack_cleanup(args->st);
848     gpr_free(args->st);
849   }
850 
851   return Qnil;
852 }
853 
854 /* call-seq:
855    ops = {
856      GRPC::Core::CallOps::SEND_INITIAL_METADATA => <op_value>,
857      GRPC::Core::CallOps::SEND_MESSAGE => <op_value>,
858      ...
859    }
860    tag = Object.new
861    timeout = 10
862    call.start_batch(tag, timeout, ops)
863 
864    Start a batch of operations defined in the array ops; when complete, post a
865    completion of type 'tag' to the completion queue bound to the call.
866 
867    Also waits for the batch to complete, until timeout is reached.
868    The order of ops specified in the batch has no significance.
869    Only one operation of each type can be active at once in any given
870    batch */
grpc_rb_call_run_batch(VALUE self,VALUE ops_hash)871 static VALUE grpc_rb_call_run_batch(VALUE self, VALUE ops_hash) {
872   grpc_ruby_fork_guard();
873   if (RTYPEDDATA_DATA(self) == NULL) {
874     rb_raise(grpc_rb_eCallError, "Cannot run batch on closed call");
875   }
876 
877   grpc_rb_call* call = NULL;
878   TypedData_Get_Struct(self, grpc_rb_call, &grpc_call_data_type, call);
879 
880   /* Validate the ops args, adding them to a ruby array */
881   if (TYPE(ops_hash) != T_HASH) {
882     rb_raise(rb_eTypeError, "call#run_batch: ops hash should be a hash");
883   }
884 
885   VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
886 
887   struct call_run_batch_args args = {
888       .call = call,
889       .write_flag = rb_write_flag == Qnil ? 0 : NUM2UINT(rb_write_flag),
890       .ops_hash = ops_hash,
891       .st = NULL};
892 
893   return rb_ensure(grpc_rb_call_run_batch_try, (VALUE)&args,
894                    grpc_rb_call_run_batch_ensure, (VALUE)&args);
895 }
896 
Init_grpc_write_flags()897 static void Init_grpc_write_flags() {
898   /* Constants representing the write flags in grpc.h */
899   VALUE grpc_rb_mWriteFlags =
900       rb_define_module_under(grpc_rb_mGrpcCore, "WriteFlags");
901   rb_define_const(grpc_rb_mWriteFlags, "BUFFER_HINT",
902                   UINT2NUM(GRPC_WRITE_BUFFER_HINT));
903   rb_define_const(grpc_rb_mWriteFlags, "NO_COMPRESS",
904                   UINT2NUM(GRPC_WRITE_NO_COMPRESS));
905 }
906 
Init_grpc_error_codes()907 static void Init_grpc_error_codes() {
908   /* Constants representing the error codes of grpc_call_error in grpc.h */
909   VALUE grpc_rb_mRpcErrors =
910       rb_define_module_under(grpc_rb_mGrpcCore, "RpcErrors");
911   rb_define_const(grpc_rb_mRpcErrors, "OK", UINT2NUM(GRPC_CALL_OK));
912   rb_define_const(grpc_rb_mRpcErrors, "ERROR", UINT2NUM(GRPC_CALL_ERROR));
913   rb_define_const(grpc_rb_mRpcErrors, "NOT_ON_SERVER",
914                   UINT2NUM(GRPC_CALL_ERROR_NOT_ON_SERVER));
915   rb_define_const(grpc_rb_mRpcErrors, "NOT_ON_CLIENT",
916                   UINT2NUM(GRPC_CALL_ERROR_NOT_ON_CLIENT));
917   rb_define_const(grpc_rb_mRpcErrors, "ALREADY_ACCEPTED",
918                   UINT2NUM(GRPC_CALL_ERROR_ALREADY_ACCEPTED));
919   rb_define_const(grpc_rb_mRpcErrors, "ALREADY_INVOKED",
920                   UINT2NUM(GRPC_CALL_ERROR_ALREADY_INVOKED));
921   rb_define_const(grpc_rb_mRpcErrors, "NOT_INVOKED",
922                   UINT2NUM(GRPC_CALL_ERROR_NOT_INVOKED));
923   rb_define_const(grpc_rb_mRpcErrors, "ALREADY_FINISHED",
924                   UINT2NUM(GRPC_CALL_ERROR_ALREADY_FINISHED));
925   rb_define_const(grpc_rb_mRpcErrors, "TOO_MANY_OPERATIONS",
926                   UINT2NUM(GRPC_CALL_ERROR_TOO_MANY_OPERATIONS));
927   rb_define_const(grpc_rb_mRpcErrors, "INVALID_FLAGS",
928                   UINT2NUM(GRPC_CALL_ERROR_INVALID_FLAGS));
929 
930   /* Hint the GC that this is a global and shouldn't be sweeped. */
931   rb_global_variable(&rb_error_code_details);
932 
933   /* Add the detail strings to a Hash */
934   rb_error_code_details = rb_hash_new();
935   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_OK),
936                rb_str_new2("ok"));
937   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR),
938                rb_str_new2("unknown error"));
939   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_NOT_ON_SERVER),
940                rb_str_new2("not available on a server"));
941   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_NOT_ON_CLIENT),
942                rb_str_new2("not available on a client"));
943   rb_hash_aset(rb_error_code_details,
944                UINT2NUM(GRPC_CALL_ERROR_ALREADY_ACCEPTED),
945                rb_str_new2("call is already accepted"));
946   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_ALREADY_INVOKED),
947                rb_str_new2("call is already invoked"));
948   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_NOT_INVOKED),
949                rb_str_new2("call is not yet invoked"));
950   rb_hash_aset(rb_error_code_details,
951                UINT2NUM(GRPC_CALL_ERROR_ALREADY_FINISHED),
952                rb_str_new2("call is already finished"));
953   rb_hash_aset(rb_error_code_details,
954                UINT2NUM(GRPC_CALL_ERROR_TOO_MANY_OPERATIONS),
955                rb_str_new2("outstanding read or write present"));
956   rb_hash_aset(rb_error_code_details, UINT2NUM(GRPC_CALL_ERROR_INVALID_FLAGS),
957                rb_str_new2("a bad flag was given"));
958   rb_define_const(grpc_rb_mRpcErrors, "ErrorMessages", rb_error_code_details);
959   rb_obj_freeze(rb_error_code_details);
960 }
961 
Init_grpc_op_codes()962 static void Init_grpc_op_codes() {
963   /* Constants representing operation type codes in grpc.h */
964   VALUE grpc_rb_mCallOps = rb_define_module_under(grpc_rb_mGrpcCore, "CallOps");
965   rb_define_const(grpc_rb_mCallOps, "SEND_INITIAL_METADATA",
966                   UINT2NUM(GRPC_OP_SEND_INITIAL_METADATA));
967   rb_define_const(grpc_rb_mCallOps, "SEND_MESSAGE",
968                   UINT2NUM(GRPC_OP_SEND_MESSAGE));
969   rb_define_const(grpc_rb_mCallOps, "SEND_CLOSE_FROM_CLIENT",
970                   UINT2NUM(GRPC_OP_SEND_CLOSE_FROM_CLIENT));
971   rb_define_const(grpc_rb_mCallOps, "SEND_STATUS_FROM_SERVER",
972                   UINT2NUM(GRPC_OP_SEND_STATUS_FROM_SERVER));
973   rb_define_const(grpc_rb_mCallOps, "RECV_INITIAL_METADATA",
974                   UINT2NUM(GRPC_OP_RECV_INITIAL_METADATA));
975   rb_define_const(grpc_rb_mCallOps, "RECV_MESSAGE",
976                   UINT2NUM(GRPC_OP_RECV_MESSAGE));
977   rb_define_const(grpc_rb_mCallOps, "RECV_STATUS_ON_CLIENT",
978                   UINT2NUM(GRPC_OP_RECV_STATUS_ON_CLIENT));
979   rb_define_const(grpc_rb_mCallOps, "RECV_CLOSE_ON_SERVER",
980                   UINT2NUM(GRPC_OP_RECV_CLOSE_ON_SERVER));
981 }
982 
Init_grpc_metadata_keys()983 static void Init_grpc_metadata_keys() {
984   VALUE grpc_rb_mMetadataKeys =
985       rb_define_module_under(grpc_rb_mGrpcCore, "MetadataKeys");
986   rb_define_const(grpc_rb_mMetadataKeys, "COMPRESSION_REQUEST_ALGORITHM",
987                   rb_str_new2(GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY));
988 }
989 
Init_grpc_call()990 void Init_grpc_call() {
991   /* CallError inherits from Exception to signal that it is non-recoverable */
992   grpc_rb_eCallError =
993       rb_define_class_under(grpc_rb_mGrpcCore, "CallError", rb_eException);
994   grpc_rb_eOutOfTime =
995       rb_define_class_under(grpc_rb_mGrpcCore, "OutOfTime", rb_eException);
996   grpc_rb_cCall = rb_define_class_under(grpc_rb_mGrpcCore, "Call", rb_cObject);
997   grpc_rb_cMdAry =
998       rb_define_class_under(grpc_rb_mGrpcCore, "MetadataArray", rb_cObject);
999   rb_undef_alloc_func(grpc_rb_cMdAry);
1000 
1001   /* Prevent allocation or inialization of the Call class */
1002   rb_define_alloc_func(grpc_rb_cCall, grpc_rb_cannot_alloc);
1003   rb_define_method(grpc_rb_cCall, "initialize", grpc_rb_cannot_init, 0);
1004   rb_define_method(grpc_rb_cCall, "initialize_copy", grpc_rb_cannot_init_copy,
1005                    1);
1006 
1007   /* Add ruby analogues of the Call methods. */
1008   rb_define_method(grpc_rb_cCall, "run_batch", grpc_rb_call_run_batch, 1);
1009   rb_define_method(grpc_rb_cCall, "cancel", grpc_rb_call_cancel, 0);
1010   rb_define_method(grpc_rb_cCall, "cancel_with_status",
1011                    grpc_rb_call_cancel_with_status, 2);
1012   rb_define_method(grpc_rb_cCall, "close", grpc_rb_call_close, 0);
1013   rb_define_method(grpc_rb_cCall, "peer", grpc_rb_call_get_peer, 0);
1014   rb_define_method(grpc_rb_cCall, "peer_cert", grpc_rb_call_get_peer_cert, 0);
1015   rb_define_method(grpc_rb_cCall, "status", grpc_rb_call_get_status, 0);
1016   rb_define_method(grpc_rb_cCall, "status=", grpc_rb_call_set_status, 1);
1017   rb_define_method(grpc_rb_cCall, "metadata", grpc_rb_call_get_metadata, 0);
1018   rb_define_method(grpc_rb_cCall, "metadata=", grpc_rb_call_set_metadata, 1);
1019   rb_define_method(grpc_rb_cCall, "trailing_metadata",
1020                    grpc_rb_call_get_trailing_metadata, 0);
1021   rb_define_method(grpc_rb_cCall,
1022                    "trailing_metadata=", grpc_rb_call_set_trailing_metadata, 1);
1023   rb_define_method(grpc_rb_cCall, "write_flag", grpc_rb_call_get_write_flag, 0);
1024   rb_define_method(grpc_rb_cCall, "write_flag=", grpc_rb_call_set_write_flag,
1025                    1);
1026   rb_define_method(grpc_rb_cCall, "set_credentials!",
1027                    grpc_rb_call_set_credentials, 1);
1028 
1029   /* Ids used to support call attributes */
1030   id_metadata = rb_intern("metadata");
1031   id_trailing_metadata = rb_intern("trailing_metadata");
1032   id_status = rb_intern("status");
1033   id_write_flag = rb_intern("write_flag");
1034 
1035   /* Ids used by the c wrapping internals. */
1036   id_credentials = rb_intern("__credentials");
1037 
1038   /* Ids used in constructing the batch result. */
1039   sym_send_message = ID2SYM(rb_intern("send_message"));
1040   sym_send_metadata = ID2SYM(rb_intern("send_metadata"));
1041   sym_send_close = ID2SYM(rb_intern("send_close"));
1042   sym_send_status = ID2SYM(rb_intern("send_status"));
1043   sym_message = ID2SYM(rb_intern("message"));
1044   sym_status = ID2SYM(rb_intern("status"));
1045   sym_cancelled = ID2SYM(rb_intern("cancelled"));
1046 
1047   /* The Struct used to return the run_batch result. */
1048   grpc_rb_sBatchResult = rb_struct_define(
1049       "BatchResult", "send_message", "send_metadata", "send_close",
1050       "send_status", "message", "metadata", "status", "cancelled", NULL);
1051 
1052   Init_grpc_error_codes();
1053   Init_grpc_op_codes();
1054   Init_grpc_write_flags();
1055   Init_grpc_metadata_keys();
1056 }
1057 
1058 /* Gets the call from the ruby object */
grpc_rb_get_wrapped_call(VALUE v)1059 grpc_call* grpc_rb_get_wrapped_call(VALUE v) {
1060   grpc_rb_call* call = NULL;
1061   TypedData_Get_Struct(v, grpc_rb_call, &grpc_call_data_type, call);
1062   return call->wrapped;
1063 }
1064 
1065 /* Obtains the wrapped object for a given call */
grpc_rb_wrap_call(grpc_call * c,grpc_completion_queue * q)1066 VALUE grpc_rb_wrap_call(grpc_call* c, grpc_completion_queue* q) {
1067   grpc_rb_call* wrapper;
1068   if (c == NULL || q == NULL) {
1069     return Qnil;
1070   }
1071   wrapper = ALLOC(grpc_rb_call);
1072   wrapper->wrapped = c;
1073   wrapper->queue = q;
1074   return TypedData_Wrap_Struct(grpc_rb_cCall, &grpc_call_data_type, wrapper);
1075 }
1076