1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <ruby/ruby.h>
20
21 #include "rb_event_thread.h"
22
23 #include <ruby/thread.h>
24 #include <stdbool.h>
25
26 #include "rb_grpc.h"
27 #include "rb_grpc_imports.generated.h"
28
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/sync.h>
32 #include <grpc/support/time.h>
33
34 typedef struct grpc_rb_event {
35 // callback will be called with argument while holding the GVL
36 void (*callback)(void*);
37 void* argument;
38
39 struct grpc_rb_event* next;
40 } grpc_rb_event;
41
42 typedef struct grpc_rb_event_queue {
43 grpc_rb_event* head;
44 grpc_rb_event* tail;
45
46 gpr_mu mu;
47 gpr_cv cv;
48
49 // Indicates that the thread should stop waiting
50 bool abort;
51 } grpc_rb_event_queue;
52
53 static grpc_rb_event_queue event_queue;
54 static VALUE g_event_thread = Qnil;
55 static bool g_one_time_init_done = false;
56
grpc_rb_event_queue_enqueue(void (* callback)(void *),void * argument)57 void grpc_rb_event_queue_enqueue(void (*callback)(void*), void* argument) {
58 grpc_rb_event* event = gpr_malloc(sizeof(grpc_rb_event));
59 event->callback = callback;
60 event->argument = argument;
61 event->next = NULL;
62 gpr_mu_lock(&event_queue.mu);
63 if (event_queue.tail == NULL) {
64 event_queue.head = event_queue.tail = event;
65 } else {
66 event_queue.tail->next = event;
67 event_queue.tail = event;
68 }
69 gpr_cv_signal(&event_queue.cv);
70 gpr_mu_unlock(&event_queue.mu);
71 }
72
grpc_rb_event_queue_dequeue()73 static grpc_rb_event* grpc_rb_event_queue_dequeue() {
74 grpc_rb_event* event;
75 if (event_queue.head == NULL) {
76 event = NULL;
77 } else {
78 event = event_queue.head;
79 if (event_queue.head->next == NULL) {
80 event_queue.head = event_queue.tail = NULL;
81 } else {
82 event_queue.head = event_queue.head->next;
83 }
84 }
85 return event;
86 }
87
grpc_rb_event_queue_destroy()88 static void grpc_rb_event_queue_destroy() {
89 gpr_mu_destroy(&event_queue.mu);
90 gpr_cv_destroy(&event_queue.cv);
91 }
92
grpc_rb_wait_for_event_no_gil(void * param)93 static void* grpc_rb_wait_for_event_no_gil(void* param) {
94 grpc_rb_event* event = NULL;
95 (void)param;
96 gpr_mu_lock(&event_queue.mu);
97 while (!event_queue.abort) {
98 if ((event = grpc_rb_event_queue_dequeue()) != NULL) {
99 gpr_mu_unlock(&event_queue.mu);
100 return event;
101 }
102 gpr_cv_wait(&event_queue.cv, &event_queue.mu,
103 gpr_inf_future(GPR_CLOCK_REALTIME));
104 }
105 gpr_mu_unlock(&event_queue.mu);
106 return NULL;
107 }
108
grpc_rb_event_unblocking_func_wrapper(void * arg)109 static void* grpc_rb_event_unblocking_func_wrapper(void* arg) {
110 (void)arg;
111 gpr_mu_lock(&event_queue.mu);
112 event_queue.abort = true;
113 gpr_cv_signal(&event_queue.cv);
114 gpr_mu_unlock(&event_queue.mu);
115 return NULL;
116 }
117
grpc_rb_event_unblocking_func(void * arg)118 static void grpc_rb_event_unblocking_func(void* arg) {
119 grpc_rb_event_unblocking_func_wrapper(arg);
120 }
121
122 /* This is the implementation of the thread that handles auth metadata plugin
123 * events */
grpc_rb_event_thread(void * arg)124 static VALUE grpc_rb_event_thread(void* arg) {
125 grpc_rb_event* event;
126 (void)arg;
127 while (true) {
128 event = (grpc_rb_event*)rb_thread_call_without_gvl(
129 grpc_rb_wait_for_event_no_gil, NULL, grpc_rb_event_unblocking_func,
130 NULL);
131 if (event == NULL) {
132 // Indicates that the thread needs to shut down
133 break;
134 } else {
135 event->callback(event->argument);
136 gpr_free(event);
137 }
138 }
139 grpc_rb_event_queue_destroy();
140 return Qnil;
141 }
142
grpc_rb_event_queue_thread_start()143 void grpc_rb_event_queue_thread_start() {
144 if (!g_one_time_init_done) {
145 g_one_time_init_done = true;
146 gpr_mu_init(&event_queue.mu);
147 gpr_cv_init(&event_queue.cv);
148 rb_global_variable(&g_event_thread);
149 event_queue.head = event_queue.tail = NULL;
150 }
151 event_queue.abort = false;
152 GPR_ASSERT(!RTEST(g_event_thread));
153 g_event_thread = rb_thread_create(grpc_rb_event_thread, NULL);
154 }
155
grpc_rb_event_queue_thread_stop()156 void grpc_rb_event_queue_thread_stop() {
157 GPR_ASSERT(g_one_time_init_done);
158 if (!RTEST(g_event_thread)) {
159 gpr_log(GPR_ERROR,
160 "GRPC_RUBY: call credentials thread stop: thread not running");
161 return;
162 }
163 rb_thread_call_without_gvl(grpc_rb_event_unblocking_func_wrapper, NULL, NULL,
164 NULL);
165 rb_funcall(g_event_thread, rb_intern("join"), 0);
166 g_event_thread = Qnil;
167 }
168