1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/inproc/legacy_inproc_transport.h"
22
23 #include <stddef.h>
24 #include <stdint.h>
25
26 #include <algorithm>
27 #include <memory>
28 #include <new>
29 #include <string>
30 #include <utility>
31
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/str_cat.h"
35 #include "absl/strings/string_view.h"
36 #include "absl/types/optional.h"
37
38 #include <grpc/grpc.h>
39 #include <grpc/impl/channel_arg_names.h>
40 #include <grpc/impl/connectivity_state.h>
41 #include <grpc/status.h>
42 #include <grpc/support/alloc.h>
43 #include <grpc/support/log.h>
44 #include <grpc/support/sync.h>
45
46 #include "src/core/lib/channel/channel_args.h"
47 #include "src/core/lib/channel/channel_args_preconditioning.h"
48 #include "src/core/lib/channel/channelz.h"
49 #include "src/core/lib/config/core_configuration.h"
50 #include "src/core/lib/gprpp/debug_location.h"
51 #include "src/core/lib/gprpp/ref_counted_ptr.h"
52 #include "src/core/lib/gprpp/status_helper.h"
53 #include "src/core/lib/gprpp/time.h"
54 #include "src/core/lib/iomgr/closure.h"
55 #include "src/core/lib/iomgr/endpoint.h"
56 #include "src/core/lib/iomgr/error.h"
57 #include "src/core/lib/iomgr/exec_ctx.h"
58 #include "src/core/lib/iomgr/iomgr_fwd.h"
59 #include "src/core/lib/resource_quota/arena.h"
60 #include "src/core/lib/slice/slice.h"
61 #include "src/core/lib/slice/slice_buffer.h"
62 #include "src/core/lib/surface/api_trace.h"
63 #include "src/core/lib/surface/channel.h"
64 #include "src/core/lib/surface/channel_create.h"
65 #include "src/core/lib/surface/channel_stack_type.h"
66 #include "src/core/lib/surface/server.h"
67 #include "src/core/lib/transport/connectivity_state.h"
68 #include "src/core/lib/transport/metadata_batch.h"
69 #include "src/core/lib/transport/transport.h"
70
71 #define INPROC_LOG(...) \
72 do { \
73 if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) { \
74 gpr_log(__VA_ARGS__); \
75 } \
76 } while (0)
77
78 namespace {
79 struct inproc_stream;
80 bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error);
81 void maybe_process_ops_locked(inproc_stream* s, grpc_error_handle error);
82 void op_state_machine_locked(inproc_stream* s, grpc_error_handle error);
83 void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
84 bool is_initial);
85 void fill_in_metadata(inproc_stream* s, const grpc_metadata_batch* metadata,
86 grpc_metadata_batch* out_md, bool* markfilled);
87
ResetSendMessage(grpc_transport_stream_op_batch * batch)88 void ResetSendMessage(grpc_transport_stream_op_batch* batch) {
89 std::exchange(batch->payload->send_message.send_message, nullptr)->Clear();
90 }
91
92 struct shared_mu {
shared_mu__anonf7bd857f0111::shared_mu93 shared_mu() {
94 // Share one lock between both sides since both sides get affected
95 gpr_mu_init(&mu);
96 gpr_ref_init(&refs, 2);
97 }
98
~shared_mu__anonf7bd857f0111::shared_mu99 ~shared_mu() { gpr_mu_destroy(&mu); }
100
101 gpr_mu mu;
102 gpr_refcount refs;
103 };
104
105 struct inproc_transport final : public grpc_core::Transport,
106 public grpc_core::FilterStackTransport {
inproc_transport__anonf7bd857f0111::inproc_transport107 inproc_transport(shared_mu* mu, bool is_client)
108 : mu(mu),
109 is_client(is_client),
110 state_tracker(is_client ? "inproc_client" : "inproc_server",
111 GRPC_CHANNEL_READY) {
112 // Start each side of transport with 2 refs since they each have a ref
113 // to the other
114 gpr_ref_init(&refs, 2);
115 }
116
~inproc_transport__anonf7bd857f0111::inproc_transport117 ~inproc_transport() override {
118 if (gpr_unref(&mu->refs)) {
119 mu->~shared_mu();
120 gpr_free(mu);
121 }
122 }
123
filter_stack_transport__anonf7bd857f0111::inproc_transport124 grpc_core::FilterStackTransport* filter_stack_transport() override {
125 return this;
126 }
127
client_transport__anonf7bd857f0111::inproc_transport128 grpc_core::ClientTransport* client_transport() override { return nullptr; }
server_transport__anonf7bd857f0111::inproc_transport129 grpc_core::ServerTransport* server_transport() override { return nullptr; }
130
131 absl::string_view GetTransportName() const override;
132 void InitStream(grpc_stream* gs, grpc_stream_refcount* refcount,
133 const void* server_data, grpc_core::Arena* arena) override;
134 void SetPollset(grpc_stream* stream, grpc_pollset* pollset) override;
135 void SetPollsetSet(grpc_stream* stream,
136 grpc_pollset_set* pollset_set) override;
137 void PerformOp(grpc_transport_op* op) override;
138 grpc_endpoint* GetEndpoint() override;
139
140 size_t SizeOfStream() const override;
HackyDisableStreamOpBatchCoalescingInConnectedChannel__anonf7bd857f0111::inproc_transport141 bool HackyDisableStreamOpBatchCoalescingInConnectedChannel() const override {
142 return true;
143 }
144
145 void PerformStreamOp(grpc_stream* gs,
146 grpc_transport_stream_op_batch* op) override;
147 void DestroyStream(grpc_stream* gs,
148 grpc_closure* then_schedule_closure) override;
149
150 void Orphan() override;
151
ref__anonf7bd857f0111::inproc_transport152 void ref() {
153 INPROC_LOG(GPR_INFO, "ref_transport %p", this);
154 gpr_ref(&refs);
155 }
156
unref__anonf7bd857f0111::inproc_transport157 void unref() {
158 INPROC_LOG(GPR_INFO, "unref_transport %p", this);
159 if (!gpr_unref(&refs)) {
160 return;
161 }
162 INPROC_LOG(GPR_INFO, "really_destroy_transport %p", this);
163 this->~inproc_transport();
164 gpr_free(this);
165 }
166
167 shared_mu* mu;
168 gpr_refcount refs;
169 bool is_client;
170 grpc_core::ConnectivityStateTracker state_tracker;
171 void (*accept_stream_cb)(void* user_data, grpc_core::Transport* transport,
172 const void* server_data);
173 void (*registered_method_matcher_cb)(
174 void* user_data, grpc_core::ServerMetadata* metadata) = nullptr;
175 void* accept_stream_data;
176 bool is_closed = false;
177 struct inproc_transport* other_side;
178 struct inproc_stream* stream_list = nullptr;
179 };
180
181 struct inproc_stream {
inproc_stream__anonf7bd857f0111::inproc_stream182 inproc_stream(inproc_transport* t, grpc_stream_refcount* refcount,
183 const void* server_data, grpc_core::Arena* arena)
184 : t(t), refs(refcount), arena(arena) {
185 // Ref this stream right now for ctor and list.
186 ref("inproc_init_stream:init");
187 ref("inproc_init_stream:list");
188
189 stream_list_prev = nullptr;
190 gpr_mu_lock(&t->mu->mu);
191 stream_list_next = t->stream_list;
192 if (t->stream_list) {
193 t->stream_list->stream_list_prev = this;
194 }
195 t->stream_list = this;
196 gpr_mu_unlock(&t->mu->mu);
197
198 if (!server_data) {
199 t->ref();
200 inproc_transport* st = t->other_side;
201 st->ref();
202 other_side = nullptr; // will get filled in soon
203 // Pass the client-side stream address to the server-side for a ref
204 ref("inproc_init_stream:clt"); // ref it now on behalf of server
205 // side to avoid destruction
206 INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p",
207 st->accept_stream_cb, st->accept_stream_data);
208 (*st->accept_stream_cb)(st->accept_stream_data, t, this);
209 } else {
210 // This is the server-side and is being called through accept_stream_cb
211 inproc_stream* cs = const_cast<inproc_stream*>(
212 static_cast<const inproc_stream*>(server_data));
213 other_side = cs;
214 // Ref the server-side stream on behalf of the client now
215 ref("inproc_init_stream:srv");
216
217 // Now we are about to affect the other side, so lock the transport
218 // to make sure that it doesn't get destroyed
219 gpr_mu_lock(&t->mu->mu);
220 cs->other_side = this;
221 // Now transfer from the other side's write_buffer if any to the to_read
222 // buffer
223 if (cs->write_buffer_initial_md_filled) {
224 fill_in_metadata(this, &cs->write_buffer_initial_md,
225 &to_read_initial_md, &to_read_initial_md_filled);
226 deadline = std::min(deadline, cs->write_buffer_deadline);
227 cs->write_buffer_initial_md.Clear();
228 cs->write_buffer_initial_md_filled = false;
229 }
230 if (cs->write_buffer_trailing_md_filled) {
231 fill_in_metadata(this, &cs->write_buffer_trailing_md,
232 &to_read_trailing_md, &to_read_trailing_md_filled);
233 cs->write_buffer_trailing_md.Clear();
234 cs->write_buffer_trailing_md_filled = false;
235 }
236 if (!cs->write_buffer_cancel_error.ok()) {
237 cancel_other_error = cs->write_buffer_cancel_error;
238 cs->write_buffer_cancel_error = absl::OkStatus();
239 maybe_process_ops_locked(this, cancel_other_error);
240 }
241
242 gpr_mu_unlock(&t->mu->mu);
243 }
244 }
245
~inproc_stream__anonf7bd857f0111::inproc_stream246 ~inproc_stream() { t->unref(); }
247
248 #ifndef NDEBUG
249 #define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason)
250 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason)
251 #else
252 #define STREAM_REF(refs, reason) grpc_stream_ref(refs)
253 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs)
254 #endif
ref__anonf7bd857f0111::inproc_stream255 void ref(const char* reason) {
256 INPROC_LOG(GPR_INFO, "ref_stream %p %s", this, reason);
257 STREAM_REF(refs, reason);
258 }
259
unref__anonf7bd857f0111::inproc_stream260 void unref(const char* reason) {
261 INPROC_LOG(GPR_INFO, "unref_stream %p %s", this, reason);
262 STREAM_UNREF(refs, reason);
263 }
264 #undef STREAM_REF
265 #undef STREAM_UNREF
266
267 inproc_transport* t;
268 grpc_stream_refcount* refs;
269 grpc_core::Arena* arena;
270
271 grpc_metadata_batch to_read_initial_md;
272 bool to_read_initial_md_filled = false;
273 grpc_metadata_batch to_read_trailing_md;
274 bool to_read_trailing_md_filled = false;
275 bool ops_needed = false;
276 // Write buffer used only during gap at init time when client-side
277 // stream is set up but server side stream is not yet set up
278 grpc_metadata_batch write_buffer_initial_md;
279 bool write_buffer_initial_md_filled = false;
280 grpc_core::Timestamp write_buffer_deadline =
281 grpc_core::Timestamp::InfFuture();
282 grpc_metadata_batch write_buffer_trailing_md;
283 bool write_buffer_trailing_md_filled = false;
284 grpc_error_handle write_buffer_cancel_error;
285
286 struct inproc_stream* other_side;
287 bool other_side_closed = false; // won't talk anymore
288 bool write_buffer_other_side_closed = false; // on hold
289
290 grpc_transport_stream_op_batch* send_message_op = nullptr;
291 grpc_transport_stream_op_batch* send_trailing_md_op = nullptr;
292 grpc_transport_stream_op_batch* recv_initial_md_op = nullptr;
293 grpc_transport_stream_op_batch* recv_message_op = nullptr;
294 grpc_transport_stream_op_batch* recv_trailing_md_op = nullptr;
295
296 bool initial_md_sent = false;
297 bool trailing_md_sent = false;
298 bool initial_md_recvd = false;
299 bool trailing_md_recvd = false;
300 // The following tracks if the server-side only pretends to have received
301 // trailing metadata since it no longer cares about the RPC. If that is the
302 // case, it is still ok for the client to send trailing metadata (in which
303 // case it will be ignored).
304 bool trailing_md_recvd_implicit_only = false;
305
306 bool closed = false;
307
308 grpc_error_handle cancel_self_error;
309 grpc_error_handle cancel_other_error;
310
311 grpc_core::Timestamp deadline = grpc_core::Timestamp::InfFuture();
312
313 bool listed = true;
314 struct inproc_stream* stream_list_prev;
315 struct inproc_stream* stream_list_next;
316 };
317
log_metadata(const grpc_metadata_batch * md_batch,bool is_client,bool is_initial)318 void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
319 bool is_initial) {
320 std::string prefix = absl::StrCat(
321 "INPROC:", is_initial ? "HDR:" : "TRL:", is_client ? "CLI:" : "SVR:");
322 md_batch->Log([&prefix](absl::string_view key, absl::string_view value) {
323 gpr_log(GPR_INFO, "%s", absl::StrCat(prefix, key, ": ", value).c_str());
324 });
325 }
326
327 namespace {
328
329 class CopySink {
330 public:
CopySink(grpc_metadata_batch * dst)331 explicit CopySink(grpc_metadata_batch* dst) : dst_(dst) {}
332
Encode(const grpc_core::Slice & key,const grpc_core::Slice & value)333 void Encode(const grpc_core::Slice& key, const grpc_core::Slice& value) {
334 dst_->Append(key.as_string_view(), value.AsOwned(),
335 [](absl::string_view, const grpc_core::Slice&) {});
336 }
337
338 template <class T, class V>
Encode(T trait,V value)339 void Encode(T trait, V value) {
340 dst_->Set(trait, value);
341 }
342
343 template <class T>
Encode(T trait,const grpc_core::Slice & value)344 void Encode(T trait, const grpc_core::Slice& value) {
345 dst_->Set(trait, value.AsOwned());
346 }
347
348 private:
349 grpc_metadata_batch* dst_;
350 };
351
352 } // namespace
353
fill_in_metadata(inproc_stream * s,const grpc_metadata_batch * metadata,grpc_metadata_batch * out_md,bool * markfilled)354 void fill_in_metadata(inproc_stream* s, const grpc_metadata_batch* metadata,
355 grpc_metadata_batch* out_md, bool* markfilled) {
356 if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) {
357 log_metadata(metadata, s->t->is_client,
358 metadata->get_pointer(grpc_core::WaitForReady()) != nullptr);
359 }
360
361 if (markfilled != nullptr) {
362 *markfilled = true;
363 }
364
365 // TODO(ctiller): copy the metadata batch, don't rely on a bespoke copy
366 // function. Can only do this once mdelems are out of the way though, too
367 // many edge cases otherwise.
368 out_md->Clear();
369 CopySink sink(out_md);
370 metadata->Encode(&sink);
371 }
372
InitStream(grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)373 void inproc_transport::InitStream(grpc_stream* gs,
374 grpc_stream_refcount* refcount,
375 const void* server_data,
376 grpc_core::Arena* arena) {
377 INPROC_LOG(GPR_INFO, "init_stream %p %p %p", this, gs, server_data);
378 new (gs) inproc_stream(this, refcount, server_data, arena);
379 }
380
close_stream_locked(inproc_stream * s)381 void close_stream_locked(inproc_stream* s) {
382 if (!s->closed) {
383 // Release the metadata that we would have written out
384 s->write_buffer_initial_md.Clear();
385 s->write_buffer_trailing_md.Clear();
386
387 if (s->listed) {
388 inproc_stream* p = s->stream_list_prev;
389 inproc_stream* n = s->stream_list_next;
390 if (p != nullptr) {
391 p->stream_list_next = n;
392 } else {
393 s->t->stream_list = n;
394 }
395 if (n != nullptr) {
396 n->stream_list_prev = p;
397 }
398 s->listed = false;
399 s->unref("close_stream:list");
400 }
401 s->closed = true;
402 s->unref("close_stream:closing");
403 }
404 }
405
406 // This function means that we are done talking/listening to the other side
close_other_side_locked(inproc_stream * s,const char * reason)407 void close_other_side_locked(inproc_stream* s, const char* reason) {
408 if (s->other_side != nullptr) {
409 // First release the metadata that came from the other side's arena
410 s->to_read_initial_md.Clear();
411 s->to_read_trailing_md.Clear();
412
413 s->other_side->unref(reason);
414 s->other_side_closed = true;
415 s->other_side = nullptr;
416 } else if (!s->other_side_closed) {
417 s->write_buffer_other_side_closed = true;
418 }
419 }
420
421 // Call the on_complete closure associated with this stream_op_batch if
422 // this stream_op_batch is only one of the pending operations for this
423 // stream. This is called when one of the pending operations for the stream
424 // is done and about to be NULLed out
complete_if_batch_end_locked(inproc_stream * s,grpc_error_handle error,grpc_transport_stream_op_batch * op,const char * msg)425 void complete_if_batch_end_locked(inproc_stream* s, grpc_error_handle error,
426 grpc_transport_stream_op_batch* op,
427 const char* msg) {
428 int is_sm = static_cast<int>(op == s->send_message_op);
429 int is_stm = static_cast<int>(op == s->send_trailing_md_op);
430 // TODO(vjpai): We should not consider the recv ops here, since they
431 // have their own callbacks. We should invoke a batch's on_complete
432 // as soon as all of the batch's send ops are complete, even if there
433 // are still recv ops pending.
434 int is_rim = static_cast<int>(op == s->recv_initial_md_op);
435 int is_rm = static_cast<int>(op == s->recv_message_op);
436 int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
437
438 if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
439 INPROC_LOG(GPR_INFO, "%s %p %p %p %s", msg, s, op, op->on_complete,
440 grpc_core::StatusToString(error).c_str());
441 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, error);
442 }
443 }
444
maybe_process_ops_locked(inproc_stream * s,grpc_error_handle error)445 void maybe_process_ops_locked(inproc_stream* s, grpc_error_handle error) {
446 if (s && (!error.ok() || s->ops_needed)) {
447 s->ops_needed = false;
448 op_state_machine_locked(s, error);
449 }
450 }
451
fail_helper_locked(inproc_stream * s,grpc_error_handle error)452 void fail_helper_locked(inproc_stream* s, grpc_error_handle error) {
453 INPROC_LOG(GPR_INFO, "op_state_machine %p fail_helper", s);
454 // If we're failing this side, we need to make sure that
455 // we also send or have already sent trailing metadata
456 if (!s->trailing_md_sent) {
457 // Send trailing md to the other side indicating cancellation
458 s->trailing_md_sent = true;
459
460 grpc_metadata_batch fake_md;
461 inproc_stream* other = s->other_side;
462 grpc_metadata_batch* dest = (other == nullptr)
463 ? &s->write_buffer_trailing_md
464 : &other->to_read_trailing_md;
465 bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
466 : &other->to_read_trailing_md_filled;
467 fill_in_metadata(s, &fake_md, dest, destfilled);
468
469 if (other != nullptr) {
470 if (other->cancel_other_error.ok()) {
471 other->cancel_other_error = error;
472 }
473 maybe_process_ops_locked(other, error);
474 } else if (s->write_buffer_cancel_error.ok()) {
475 s->write_buffer_cancel_error = error;
476 }
477 }
478 if (s->recv_initial_md_op) {
479 grpc_error_handle err;
480 if (!s->t->is_client) {
481 // If this is a server, provide initial metadata with a path and
482 // authority since it expects that as well as no error yet
483 grpc_metadata_batch fake_md;
484 fake_md.Set(grpc_core::HttpPathMetadata(),
485 grpc_core::Slice::FromStaticString("/"));
486 fake_md.Set(grpc_core::HttpAuthorityMetadata(),
487 grpc_core::Slice::FromStaticString("inproc-fail"));
488
489 fill_in_metadata(s, &fake_md,
490 s->recv_initial_md_op->payload->recv_initial_metadata
491 .recv_initial_metadata,
492 nullptr);
493 err = absl::OkStatus();
494 } else {
495 err = error;
496 }
497 if (s->recv_initial_md_op->payload->recv_initial_metadata
498 .trailing_metadata_available != nullptr) {
499 // Set to true unconditionally, because we're failing the call, so even
500 // if we haven't actually seen the send_trailing_metadata op from the
501 // other side, we're going to return trailing metadata anyway.
502 *s->recv_initial_md_op->payload->recv_initial_metadata
503 .trailing_metadata_available = true;
504 }
505 INPROC_LOG(GPR_INFO,
506 "fail_helper %p scheduling initial-metadata-ready %s %s", s,
507 grpc_core::StatusToString(error).c_str(),
508 grpc_core::StatusToString(err).c_str());
509 grpc_core::ExecCtx::Run(
510 DEBUG_LOCATION,
511 s->recv_initial_md_op->payload->recv_initial_metadata
512 .recv_initial_metadata_ready,
513 err);
514 // Last use of err so no need to REF and then UNREF it
515
516 complete_if_batch_end_locked(
517 s, error, s->recv_initial_md_op,
518 "fail_helper scheduling recv-initial-metadata-on-complete");
519 s->recv_initial_md_op = nullptr;
520 }
521 if (s->recv_message_op) {
522 INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %s", s,
523 grpc_core::StatusToString(error).c_str());
524 if (s->recv_message_op->payload->recv_message
525 .call_failed_before_recv_message != nullptr) {
526 *s->recv_message_op->payload->recv_message
527 .call_failed_before_recv_message = true;
528 }
529 grpc_core::ExecCtx::Run(
530 DEBUG_LOCATION,
531 s->recv_message_op->payload->recv_message.recv_message_ready, error);
532 complete_if_batch_end_locked(
533 s, error, s->recv_message_op,
534 "fail_helper scheduling recv-message-on-complete");
535 s->recv_message_op = nullptr;
536 }
537 if (s->send_message_op) {
538 ResetSendMessage(s->send_message_op);
539 complete_if_batch_end_locked(
540 s, error, s->send_message_op,
541 "fail_helper scheduling send-message-on-complete");
542 s->send_message_op = nullptr;
543 }
544 if (s->send_trailing_md_op) {
545 complete_if_batch_end_locked(
546 s, error, s->send_trailing_md_op,
547 "fail_helper scheduling send-trailng-md-on-complete");
548 s->send_trailing_md_op = nullptr;
549 }
550 if (s->recv_trailing_md_op) {
551 INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %s",
552 s, grpc_core::StatusToString(error).c_str());
553 grpc_core::ExecCtx::Run(
554 DEBUG_LOCATION,
555 s->recv_trailing_md_op->payload->recv_trailing_metadata
556 .recv_trailing_metadata_ready,
557 error);
558 INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %s",
559 s, grpc_core::StatusToString(error).c_str());
560 complete_if_batch_end_locked(
561 s, error, s->recv_trailing_md_op,
562 "fail_helper scheduling recv-trailing-metadata-on-complete");
563 s->recv_trailing_md_op = nullptr;
564 }
565 close_other_side_locked(s, "fail_helper:other_side");
566 close_stream_locked(s);
567 }
568
569 // TODO(vjpai): It should not be necessary to drain the incoming byte
570 // stream and create a new one; instead, we should simply pass the byte
571 // stream from the sender directly to the receiver as-is.
572 //
573 // Note that fixing this will also avoid the assumption in this code
574 // that the incoming byte stream's next() call will always return
575 // synchronously. That assumption is true today but may not always be
576 // true in the future.
message_transfer_locked(inproc_stream * sender,inproc_stream * receiver)577 void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) {
578 *receiver->recv_message_op->payload->recv_message.recv_message =
579 std::move(*sender->send_message_op->payload->send_message.send_message);
580 *receiver->recv_message_op->payload->recv_message.flags =
581 sender->send_message_op->payload->send_message.flags;
582
583 INPROC_LOG(GPR_INFO, "message_transfer_locked %p scheduling message-ready",
584 receiver);
585 grpc_core::ExecCtx::Run(
586 DEBUG_LOCATION,
587 receiver->recv_message_op->payload->recv_message.recv_message_ready,
588 absl::OkStatus());
589 complete_if_batch_end_locked(
590 sender, absl::OkStatus(), sender->send_message_op,
591 "message_transfer scheduling sender on_complete");
592 complete_if_batch_end_locked(
593 receiver, absl::OkStatus(), receiver->recv_message_op,
594 "message_transfer scheduling receiver on_complete");
595
596 receiver->recv_message_op = nullptr;
597 sender->send_message_op = nullptr;
598 }
599
op_state_machine_locked(inproc_stream * s,grpc_error_handle error)600 void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
601 // This function gets called when we have contents in the unprocessed reads
602 // Get what we want based on our ops wanted
603 // Schedule our appropriate closures
604 // and then return to ops_needed state if still needed
605
606 grpc_error_handle new_err;
607
608 bool needs_close = false;
609
610 INPROC_LOG(GPR_INFO, "op_state_machine %p", s);
611 // cancellation takes precedence
612 inproc_stream* other = s->other_side;
613
614 if (!s->cancel_self_error.ok()) {
615 fail_helper_locked(s, s->cancel_self_error);
616 goto done;
617 } else if (!s->cancel_other_error.ok()) {
618 fail_helper_locked(s, s->cancel_other_error);
619 goto done;
620 } else if (!error.ok()) {
621 fail_helper_locked(s, error);
622 goto done;
623 }
624
625 if (s->send_message_op && other) {
626 if (other->recv_message_op) {
627 message_transfer_locked(s, other);
628 maybe_process_ops_locked(other, absl::OkStatus());
629 } else if (!s->t->is_client && s->trailing_md_sent) {
630 // A server send will never be matched if the server already sent status
631 ResetSendMessage(s->send_message_op);
632 complete_if_batch_end_locked(
633 s, absl::OkStatus(), s->send_message_op,
634 "op_state_machine scheduling send-message-on-complete case 1");
635 s->send_message_op = nullptr;
636 }
637 }
638 // Pause a send trailing metadata if there is still an outstanding
639 // send message unless we know that the send message will never get
640 // matched to a receive. This happens on the client if the server has
641 // already sent status or on the server if the client has requested
642 // status
643 if (s->send_trailing_md_op &&
644 (!s->send_message_op ||
645 (s->t->is_client &&
646 (s->trailing_md_recvd || s->to_read_trailing_md_filled)) ||
647 (!s->t->is_client && other &&
648 (other->trailing_md_recvd || other->to_read_trailing_md_filled ||
649 other->recv_trailing_md_op)))) {
650 grpc_metadata_batch* dest = (other == nullptr)
651 ? &s->write_buffer_trailing_md
652 : &other->to_read_trailing_md;
653 bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
654 : &other->to_read_trailing_md_filled;
655 if (*destfilled || s->trailing_md_sent) {
656 // The buffer is already in use; that's an error!
657 INPROC_LOG(GPR_INFO, "Extra trailing metadata %p", s);
658 new_err = GRPC_ERROR_CREATE("Extra trailing metadata");
659 fail_helper_locked(s, new_err);
660 goto done;
661 } else {
662 if (!other || !other->closed) {
663 fill_in_metadata(s,
664 s->send_trailing_md_op->payload->send_trailing_metadata
665 .send_trailing_metadata,
666 dest, destfilled);
667 }
668 s->trailing_md_sent = true;
669 if (s->send_trailing_md_op->payload->send_trailing_metadata.sent) {
670 *s->send_trailing_md_op->payload->send_trailing_metadata.sent = true;
671 }
672 if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
673 INPROC_LOG(GPR_INFO,
674 "op_state_machine %p scheduling trailing-metadata-ready", s);
675 grpc_core::ExecCtx::Run(
676 DEBUG_LOCATION,
677 s->recv_trailing_md_op->payload->recv_trailing_metadata
678 .recv_trailing_metadata_ready,
679 absl::OkStatus());
680 INPROC_LOG(GPR_INFO,
681 "op_state_machine %p scheduling trailing-md-on-complete", s);
682 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
683 s->recv_trailing_md_op->on_complete,
684 absl::OkStatus());
685 s->recv_trailing_md_op = nullptr;
686 needs_close = true;
687 }
688 }
689 maybe_process_ops_locked(other, absl::OkStatus());
690 complete_if_batch_end_locked(
691 s, absl::OkStatus(), s->send_trailing_md_op,
692 "op_state_machine scheduling send-trailing-metadata-on-complete");
693 s->send_trailing_md_op = nullptr;
694 }
695 if (s->recv_initial_md_op) {
696 if (s->initial_md_recvd) {
697 new_err = GRPC_ERROR_CREATE("Already recvd initial md");
698 INPROC_LOG(
699 GPR_INFO,
700 "op_state_machine %p scheduling on_complete errors for already "
701 "recvd initial md %s",
702 s, grpc_core::StatusToString(new_err).c_str());
703 fail_helper_locked(s, new_err);
704 goto done;
705 }
706
707 if (s->to_read_initial_md_filled) {
708 s->initial_md_recvd = true;
709 fill_in_metadata(s, &s->to_read_initial_md,
710 s->recv_initial_md_op->payload->recv_initial_metadata
711 .recv_initial_metadata,
712 nullptr);
713 if (s->deadline != grpc_core::Timestamp::InfFuture()) {
714 s->recv_initial_md_op->payload->recv_initial_metadata
715 .recv_initial_metadata->Set(grpc_core::GrpcTimeoutMetadata(),
716 s->deadline);
717 }
718 if (s->recv_initial_md_op->payload->recv_initial_metadata
719 .trailing_metadata_available != nullptr) {
720 *s->recv_initial_md_op->payload->recv_initial_metadata
721 .trailing_metadata_available =
722 (other != nullptr && other->send_trailing_md_op != nullptr);
723 }
724 s->to_read_initial_md.Clear();
725 s->to_read_initial_md_filled = false;
726 if (s->t->registered_method_matcher_cb != nullptr) {
727 s->t->registered_method_matcher_cb(
728 s->t->accept_stream_data,
729 s->recv_initial_md_op->payload->recv_initial_metadata
730 .recv_initial_metadata);
731 }
732 grpc_core::ExecCtx::Run(
733 DEBUG_LOCATION,
734 std::exchange(s->recv_initial_md_op->payload->recv_initial_metadata
735 .recv_initial_metadata_ready,
736 nullptr),
737 absl::OkStatus());
738 complete_if_batch_end_locked(
739 s, absl::OkStatus(), s->recv_initial_md_op,
740 "op_state_machine scheduling recv-initial-metadata-on-complete");
741 s->recv_initial_md_op = nullptr;
742 }
743 }
744 if (s->recv_message_op) {
745 if (other && other->send_message_op) {
746 message_transfer_locked(other, s);
747 maybe_process_ops_locked(other, absl::OkStatus());
748 }
749 }
750 if (s->to_read_trailing_md_filled) {
751 if (s->trailing_md_recvd) {
752 if (s->trailing_md_recvd_implicit_only) {
753 INPROC_LOG(GPR_INFO,
754 "op_state_machine %p already implicitly received trailing "
755 "metadata, so ignoring new trailing metadata from client",
756 s);
757 s->to_read_trailing_md.Clear();
758 s->to_read_trailing_md_filled = false;
759 s->trailing_md_recvd_implicit_only = false;
760 } else {
761 new_err = GRPC_ERROR_CREATE("Already recvd trailing md");
762 INPROC_LOG(
763 GPR_INFO,
764 "op_state_machine %p scheduling on_complete errors for already "
765 "recvd trailing md %s",
766 s, grpc_core::StatusToString(new_err).c_str());
767 fail_helper_locked(s, new_err);
768 goto done;
769 }
770 }
771 if (s->recv_message_op != nullptr) {
772 // This message needs to be wrapped up because it will never be
773 // satisfied
774 s->recv_message_op->payload->recv_message.recv_message->reset();
775 INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
776 grpc_core::ExecCtx::Run(
777 DEBUG_LOCATION,
778 s->recv_message_op->payload->recv_message.recv_message_ready,
779 absl::OkStatus());
780 complete_if_batch_end_locked(
781 s, new_err, s->recv_message_op,
782 "op_state_machine scheduling recv-message-on-complete");
783 s->recv_message_op = nullptr;
784 }
785 if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
786 // Nothing further will try to receive from this stream, so finish off
787 // any outstanding send_message op
788 ResetSendMessage(s->send_message_op);
789 s->send_message_op->payload->send_message.stream_write_closed = true;
790 complete_if_batch_end_locked(
791 s, new_err, s->send_message_op,
792 "op_state_machine scheduling send-message-on-complete case 2");
793 s->send_message_op = nullptr;
794 }
795 if (s->recv_trailing_md_op != nullptr) {
796 // We wanted trailing metadata and we got it
797 s->trailing_md_recvd = true;
798 fill_in_metadata(s, &s->to_read_trailing_md,
799 s->recv_trailing_md_op->payload->recv_trailing_metadata
800 .recv_trailing_metadata,
801 nullptr);
802 s->to_read_trailing_md.Clear();
803 s->to_read_trailing_md_filled = false;
804 s->recv_trailing_md_op->payload->recv_trailing_metadata
805 .recv_trailing_metadata->Set(grpc_core::GrpcStatusFromWire(), true);
806
807 // We should schedule the recv_trailing_md_op completion if
808 // 1. this stream is the client-side
809 // 2. this stream is the server-side AND has already sent its trailing md
810 // (If the server hasn't already sent its trailing md, it doesn't
811 // have
812 // a final status, so don't mark this op complete)
813 if (s->t->is_client || s->trailing_md_sent) {
814 grpc_core::ExecCtx::Run(
815 DEBUG_LOCATION,
816 s->recv_trailing_md_op->payload->recv_trailing_metadata
817 .recv_trailing_metadata_ready,
818 absl::OkStatus());
819 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
820 s->recv_trailing_md_op->on_complete,
821 absl::OkStatus());
822 s->recv_trailing_md_op = nullptr;
823 needs_close = s->trailing_md_sent;
824 }
825 } else if (!s->trailing_md_recvd) {
826 INPROC_LOG(
827 GPR_INFO,
828 "op_state_machine %p has trailing md but not yet waiting for it", s);
829 }
830 }
831 if (!s->t->is_client && s->trailing_md_sent &&
832 (s->recv_trailing_md_op != nullptr)) {
833 // In this case, we don't care to receive the write-close from the client
834 // because we have already sent status and the RPC is over as far as we
835 // are concerned.
836 INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-md-ready %s",
837 s, grpc_core::StatusToString(new_err).c_str());
838 grpc_core::ExecCtx::Run(
839 DEBUG_LOCATION,
840 s->recv_trailing_md_op->payload->recv_trailing_metadata
841 .recv_trailing_metadata_ready,
842 new_err);
843 complete_if_batch_end_locked(
844 s, new_err, s->recv_trailing_md_op,
845 "op_state_machine scheduling recv-trailing-md-on-complete");
846 s->trailing_md_recvd = true;
847 s->recv_trailing_md_op = nullptr;
848 // Since we are only pretending to have received the trailing MD, it would
849 // be ok (not an error) if the client actually sends it later.
850 s->trailing_md_recvd_implicit_only = true;
851 }
852 if (s->trailing_md_recvd && s->recv_message_op) {
853 // No further message will come on this stream, so finish off the
854 // recv_message_op
855 INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
856 s->recv_message_op->payload->recv_message.recv_message->reset();
857 grpc_core::ExecCtx::Run(
858 DEBUG_LOCATION,
859 s->recv_message_op->payload->recv_message.recv_message_ready,
860 absl::OkStatus());
861 complete_if_batch_end_locked(
862 s, new_err, s->recv_message_op,
863 "op_state_machine scheduling recv-message-on-complete");
864 s->recv_message_op = nullptr;
865 }
866 if (s->trailing_md_recvd && s->send_message_op && s->t->is_client) {
867 // Nothing further will try to receive from this stream, so finish off
868 // any outstanding send_message op
869 ResetSendMessage(s->send_message_op);
870 complete_if_batch_end_locked(
871 s, new_err, s->send_message_op,
872 "op_state_machine scheduling send-message-on-complete case 3");
873 s->send_message_op = nullptr;
874 }
875 if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
876 s->recv_message_op || s->recv_trailing_md_op) {
877 // Didn't get the item we wanted so we still need to get
878 // rescheduled
879 INPROC_LOG(
880 GPR_INFO, "op_state_machine %p still needs closure %p %p %p %p %p", s,
881 s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op,
882 s->recv_message_op, s->recv_trailing_md_op);
883 s->ops_needed = true;
884 }
885 done:
886 if (needs_close) {
887 close_other_side_locked(s, "op_state_machine");
888 close_stream_locked(s);
889 }
890 }
891
cancel_stream_locked(inproc_stream * s,grpc_error_handle error)892 bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error) {
893 bool ret = false; // was the cancel accepted
894 INPROC_LOG(GPR_INFO, "cancel_stream %p with %s", s,
895 grpc_core::StatusToString(error).c_str());
896 if (s->cancel_self_error.ok()) {
897 ret = true;
898 s->cancel_self_error = error;
899 // Catch current value of other before it gets closed off
900 inproc_stream* other = s->other_side;
901 maybe_process_ops_locked(s, s->cancel_self_error);
902 // Send trailing md to the other side indicating cancellation, even if we
903 // already have
904 s->trailing_md_sent = true;
905
906 grpc_metadata_batch cancel_md;
907
908 grpc_metadata_batch* dest = (other == nullptr)
909 ? &s->write_buffer_trailing_md
910 : &other->to_read_trailing_md;
911 bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
912 : &other->to_read_trailing_md_filled;
913 fill_in_metadata(s, &cancel_md, dest, destfilled);
914
915 if (other != nullptr) {
916 if (other->cancel_other_error.ok()) {
917 other->cancel_other_error = s->cancel_self_error;
918 }
919 maybe_process_ops_locked(other, other->cancel_other_error);
920 } else if (s->write_buffer_cancel_error.ok()) {
921 s->write_buffer_cancel_error = s->cancel_self_error;
922 }
923
924 // if we are a server and already received trailing md but
925 // couldn't complete that because we hadn't yet sent out trailing
926 // md, now's the chance
927 if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
928 grpc_core::ExecCtx::Run(
929 DEBUG_LOCATION,
930 s->recv_trailing_md_op->payload->recv_trailing_metadata
931 .recv_trailing_metadata_ready,
932 s->cancel_self_error);
933 complete_if_batch_end_locked(
934 s, s->cancel_self_error, s->recv_trailing_md_op,
935 "cancel_stream scheduling trailing-md-on-complete");
936 s->recv_trailing_md_op = nullptr;
937 }
938 }
939
940 close_other_side_locked(s, "cancel_stream:other_side");
941 close_stream_locked(s);
942
943 return ret;
944 }
945
PerformStreamOp(grpc_stream * gs,grpc_transport_stream_op_batch * op)946 void inproc_transport::PerformStreamOp(grpc_stream* gs,
947 grpc_transport_stream_op_batch* op) {
948 INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", this, gs, op);
949 inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
950 gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed
951 gpr_mu_lock(mu);
952
953 if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) {
954 if (op->send_initial_metadata) {
955 log_metadata(op->payload->send_initial_metadata.send_initial_metadata,
956 s->t->is_client, true);
957 }
958 if (op->send_trailing_metadata) {
959 log_metadata(op->payload->send_trailing_metadata.send_trailing_metadata,
960 s->t->is_client, false);
961 }
962 }
963 grpc_error_handle error;
964 grpc_closure* on_complete = op->on_complete;
965 // TODO(roth): This is a hack needed because we use data inside of the
966 // closure itself to do the barrier calculation (i.e., to ensure that
967 // we don't schedule the closure until all ops in the batch have been
968 // completed). This can go away once we move to a new C++ closure API
969 // that provides the ability to create a barrier closure.
970 if (on_complete == nullptr) {
971 on_complete = op->on_complete =
972 grpc_core::NewClosure([](grpc_error_handle) {});
973 }
974
975 if (op->cancel_stream) {
976 // Call cancel_stream_locked without ref'ing the cancel_error because
977 // this function is responsible to make sure that that field gets unref'ed
978 cancel_stream_locked(s, op->payload->cancel_stream.cancel_error);
979 // this op can complete without an error
980 } else if (!s->cancel_self_error.ok()) {
981 // already self-canceled so still give it an error
982 error = s->cancel_self_error;
983 } else {
984 INPROC_LOG(GPR_INFO, "perform_stream_op %p %s%s%s%s%s%s%s", s,
985 s->t->is_client ? "client" : "server",
986 op->send_initial_metadata ? " send_initial_metadata" : "",
987 op->send_message ? " send_message" : "",
988 op->send_trailing_metadata ? " send_trailing_metadata" : "",
989 op->recv_initial_metadata ? " recv_initial_metadata" : "",
990 op->recv_message ? " recv_message" : "",
991 op->recv_trailing_metadata ? " recv_trailing_metadata" : "");
992 }
993
994 inproc_stream* other = s->other_side;
995 if (error.ok() && (op->send_initial_metadata || op->send_trailing_metadata)) {
996 if (s->t->is_closed) {
997 error = GRPC_ERROR_CREATE("Endpoint already shutdown");
998 }
999 if (error.ok() && op->send_initial_metadata) {
1000 grpc_metadata_batch* dest = (other == nullptr)
1001 ? &s->write_buffer_initial_md
1002 : &other->to_read_initial_md;
1003 bool* destfilled = (other == nullptr) ? &s->write_buffer_initial_md_filled
1004 : &other->to_read_initial_md_filled;
1005 if (*destfilled || s->initial_md_sent) {
1006 // The buffer is already in use; that's an error!
1007 INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s);
1008 error = GRPC_ERROR_CREATE("Extra initial metadata");
1009 } else {
1010 if (!s->other_side_closed) {
1011 fill_in_metadata(
1012 s, op->payload->send_initial_metadata.send_initial_metadata, dest,
1013 destfilled);
1014 }
1015 if (s->t->is_client) {
1016 grpc_core::Timestamp* dl =
1017 (other == nullptr) ? &s->write_buffer_deadline : &other->deadline;
1018 *dl = std::min(
1019 *dl, op->payload->send_initial_metadata.send_initial_metadata
1020 ->get(grpc_core::GrpcTimeoutMetadata())
1021 .value_or(grpc_core::Timestamp::InfFuture()));
1022 s->initial_md_sent = true;
1023 }
1024 }
1025 maybe_process_ops_locked(other, error);
1026 }
1027 }
1028
1029 if (error.ok() && (op->send_message || op->send_trailing_metadata ||
1030 op->recv_initial_metadata || op->recv_message ||
1031 op->recv_trailing_metadata)) {
1032 // Mark ops that need to be processed by the state machine
1033 if (op->send_message) {
1034 s->send_message_op = op;
1035 }
1036 if (op->send_trailing_metadata) {
1037 s->send_trailing_md_op = op;
1038 }
1039 if (op->recv_initial_metadata) {
1040 s->recv_initial_md_op = op;
1041 }
1042 if (op->recv_message) {
1043 s->recv_message_op = op;
1044 }
1045 if (op->recv_trailing_metadata) {
1046 s->recv_trailing_md_op = op;
1047 }
1048
1049 // We want to initiate the state machine if:
1050 // 1. We want to send a message and the other side wants to receive
1051 // 2. We want to send trailing metadata and there isn't an unmatched send
1052 // or the other side wants trailing metadata
1053 // 3. We want initial metadata and the other side has sent it
1054 // 4. We want to receive a message and there is a message ready
1055 // 5. There is trailing metadata, even if nothing specifically wants
1056 // that because that can shut down the receive message as well
1057 if ((op->send_message && other && other->recv_message_op != nullptr) ||
1058 (op->send_trailing_metadata &&
1059 (!s->send_message_op || (other && other->recv_trailing_md_op))) ||
1060 (op->recv_initial_metadata && s->to_read_initial_md_filled) ||
1061 (op->recv_message && other && other->send_message_op != nullptr) ||
1062 (s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
1063 op_state_machine_locked(s, error);
1064 } else {
1065 s->ops_needed = true;
1066 }
1067 } else {
1068 if (!error.ok()) {
1069 // Consume any send message that was sent here but that we are not
1070 // pushing to the other side
1071 if (op->send_message) {
1072 ResetSendMessage(op);
1073 }
1074 // Schedule op's closures that we didn't push to op state machine
1075 if (op->recv_initial_metadata) {
1076 if (op->payload->recv_initial_metadata.trailing_metadata_available !=
1077 nullptr) {
1078 // Set to true unconditionally, because we're failing the call, so
1079 // even if we haven't actually seen the send_trailing_metadata op
1080 // from the other side, we're going to return trailing metadata
1081 // anyway.
1082 *op->payload->recv_initial_metadata.trailing_metadata_available =
1083 true;
1084 }
1085 INPROC_LOG(
1086 GPR_INFO,
1087 "perform_stream_op error %p scheduling initial-metadata-ready %s",
1088 s, grpc_core::StatusToString(error).c_str());
1089 grpc_core::ExecCtx::Run(
1090 DEBUG_LOCATION,
1091 op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1092 error);
1093 }
1094 if (op->recv_message) {
1095 INPROC_LOG(
1096 GPR_INFO,
1097 "perform_stream_op error %p scheduling recv message-ready %s", s,
1098 grpc_core::StatusToString(error).c_str());
1099 if (op->payload->recv_message.call_failed_before_recv_message !=
1100 nullptr) {
1101 *op->payload->recv_message.call_failed_before_recv_message = true;
1102 }
1103 grpc_core::ExecCtx::Run(DEBUG_LOCATION,
1104 op->payload->recv_message.recv_message_ready,
1105 error);
1106 }
1107 if (op->recv_trailing_metadata) {
1108 INPROC_LOG(GPR_INFO,
1109 "perform_stream_op error %p scheduling "
1110 "trailing-metadata-ready %s",
1111 s, grpc_core::StatusToString(error).c_str());
1112 grpc_core::ExecCtx::Run(
1113 DEBUG_LOCATION,
1114 op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1115 error);
1116 }
1117 }
1118 INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %s", s,
1119 grpc_core::StatusToString(error).c_str());
1120 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, error);
1121 }
1122 gpr_mu_unlock(mu);
1123 }
1124
close_transport_locked(inproc_transport * t)1125 void close_transport_locked(inproc_transport* t) {
1126 INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed);
1127 t->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::Status(),
1128 "close transport");
1129 if (!t->is_closed) {
1130 t->is_closed = true;
1131 // Also end all streams on this transport
1132 while (t->stream_list != nullptr) {
1133 // cancel_stream_locked also adjusts stream list
1134 cancel_stream_locked(
1135 t->stream_list,
1136 grpc_error_set_int(GRPC_ERROR_CREATE("Transport closed"),
1137 grpc_core::StatusIntProperty::kRpcStatus,
1138 GRPC_STATUS_UNAVAILABLE));
1139 }
1140 }
1141 }
1142
PerformOp(grpc_transport_op * op)1143 void inproc_transport::PerformOp(grpc_transport_op* op) {
1144 INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", this, op);
1145 gpr_mu_lock(&mu->mu);
1146 if (op->start_connectivity_watch != nullptr) {
1147 state_tracker.AddWatcher(op->start_connectivity_watch_state,
1148 std::move(op->start_connectivity_watch));
1149 }
1150 if (op->stop_connectivity_watch != nullptr) {
1151 state_tracker.RemoveWatcher(op->stop_connectivity_watch);
1152 }
1153 if (op->set_accept_stream) {
1154 accept_stream_cb = op->set_accept_stream_fn;
1155 registered_method_matcher_cb = op->set_registered_method_matcher_fn;
1156 accept_stream_data = op->set_accept_stream_user_data;
1157 }
1158 if (op->on_consumed) {
1159 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
1160 }
1161
1162 bool do_close = false;
1163 if (!op->goaway_error.ok()) {
1164 do_close = true;
1165 }
1166 if (!op->disconnect_with_error.ok()) {
1167 do_close = true;
1168 }
1169
1170 if (do_close) {
1171 close_transport_locked(this);
1172 }
1173 gpr_mu_unlock(&mu->mu);
1174 }
1175
DestroyStream(grpc_stream * gs,grpc_closure * then_schedule_closure)1176 void inproc_transport::DestroyStream(grpc_stream* gs,
1177 grpc_closure* then_schedule_closure) {
1178 INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure);
1179 inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
1180 gpr_mu_lock(&mu->mu);
1181 close_stream_locked(s);
1182 gpr_mu_unlock(&mu->mu);
1183 s->~inproc_stream();
1184 grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
1185 absl::OkStatus());
1186 }
1187
Orphan()1188 void inproc_transport::Orphan() {
1189 INPROC_LOG(GPR_INFO, "destroy_transport %p", this);
1190 gpr_mu_lock(&mu->mu);
1191 close_transport_locked(this);
1192 gpr_mu_unlock(&mu->mu);
1193 other_side->unref();
1194 unref();
1195 }
1196
1197 //******************************************************************************
1198 // INTEGRATION GLUE
1199 //
1200
SizeOfStream() const1201 size_t inproc_transport::SizeOfStream() const { return sizeof(inproc_stream); }
1202
GetTransportName() const1203 absl::string_view inproc_transport::GetTransportName() const {
1204 return "inproc";
1205 }
1206
SetPollset(grpc_stream *,grpc_pollset *)1207 void inproc_transport::SetPollset(grpc_stream* /*gs*/,
1208 grpc_pollset* /*pollset*/) {
1209 // Nothing to do here
1210 }
1211
SetPollsetSet(grpc_stream *,grpc_pollset_set *)1212 void inproc_transport::SetPollsetSet(grpc_stream* /*gs*/,
1213 grpc_pollset_set* /*pollset_set*/) {
1214 // Nothing to do here
1215 }
1216
GetEndpoint()1217 grpc_endpoint* inproc_transport::GetEndpoint() { return nullptr; }
1218
1219 //******************************************************************************
1220 // Main inproc transport functions
1221 //
inproc_transports_create(grpc_core::Transport ** server_transport,grpc_core::Transport ** client_transport)1222 void inproc_transports_create(grpc_core::Transport** server_transport,
1223 grpc_core::Transport** client_transport) {
1224 INPROC_LOG(GPR_INFO, "inproc_transports_create");
1225 shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu();
1226 inproc_transport* st =
1227 new (gpr_malloc(sizeof(*st))) inproc_transport(mu, /*is_client=*/false);
1228 inproc_transport* ct =
1229 new (gpr_malloc(sizeof(*ct))) inproc_transport(mu, /*is_client=*/true);
1230 st->other_side = ct;
1231 ct->other_side = st;
1232 *server_transport = reinterpret_cast<grpc_core::Transport*>(st);
1233 *client_transport = reinterpret_cast<grpc_core::Transport*>(ct);
1234 }
1235 } // namespace
1236
grpc_legacy_inproc_channel_create(grpc_server * server,const grpc_channel_args * args,void *)1237 grpc_channel* grpc_legacy_inproc_channel_create(grpc_server* server,
1238 const grpc_channel_args* args,
1239 void* /*reserved*/) {
1240 GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2,
1241 (server, args));
1242
1243 grpc_core::ExecCtx exec_ctx;
1244
1245 grpc_core::Server* core_server = grpc_core::Server::FromC(server);
1246 // Remove max_connection_idle and max_connection_age channel arguments since
1247 // those do not apply to inproc transports.
1248 grpc_core::ChannelArgs server_args =
1249 core_server->channel_args()
1250 .Remove(GRPC_ARG_MAX_CONNECTION_IDLE_MS)
1251 .Remove(GRPC_ARG_MAX_CONNECTION_AGE_MS);
1252
1253 // Add a default authority channel argument for the client
1254 grpc_core::ChannelArgs client_args =
1255 grpc_core::CoreConfiguration::Get()
1256 .channel_args_preconditioning()
1257 .PreconditionChannelArgs(args)
1258 .Set(GRPC_ARG_DEFAULT_AUTHORITY, "inproc.authority");
1259 grpc_core::Transport* server_transport;
1260 grpc_core::Transport* client_transport;
1261 inproc_transports_create(&server_transport, &client_transport);
1262
1263 // TODO(ncteisen): design and support channelz GetSocket for inproc.
1264 grpc_error_handle error = core_server->SetupTransport(
1265 server_transport, nullptr, server_args, nullptr);
1266 grpc_channel* channel = nullptr;
1267 if (error.ok()) {
1268 auto new_channel = grpc_core::ChannelCreate(
1269 "inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport);
1270 if (!new_channel.ok()) {
1271 GPR_ASSERT(!channel);
1272 gpr_log(GPR_ERROR, "Failed to create client channel: %s",
1273 grpc_core::StatusToString(error).c_str());
1274 intptr_t integer;
1275 grpc_status_code status = GRPC_STATUS_INTERNAL;
1276 if (grpc_error_get_int(error, grpc_core::StatusIntProperty::kRpcStatus,
1277 &integer)) {
1278 status = static_cast<grpc_status_code>(integer);
1279 }
1280 // client_transport was destroyed when grpc_channel_create_internal saw an
1281 // error.
1282 server_transport->Orphan();
1283 channel = grpc_lame_client_channel_create(
1284 nullptr, status, "Failed to create client channel");
1285 } else {
1286 channel = new_channel->release()->c_ptr();
1287 }
1288 } else {
1289 GPR_ASSERT(!channel);
1290 gpr_log(GPR_ERROR, "Failed to create server channel: %s",
1291 grpc_core::StatusToString(error).c_str());
1292 intptr_t integer;
1293 grpc_status_code status = GRPC_STATUS_INTERNAL;
1294 if (grpc_error_get_int(error, grpc_core::StatusIntProperty::kRpcStatus,
1295 &integer)) {
1296 status = static_cast<grpc_status_code>(integer);
1297 }
1298 client_transport->Orphan();
1299 server_transport->Orphan();
1300 channel = grpc_lame_client_channel_create(
1301 nullptr, status, "Failed to create server channel");
1302 }
1303 return channel;
1304 }
1305