xref: /aosp_15_r20/external/grpc-grpc/src/core/ext/transport/inproc/legacy_inproc_transport.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/transport/inproc/legacy_inproc_transport.h"
22 
23 #include <stddef.h>
24 #include <stdint.h>
25 
26 #include <algorithm>
27 #include <memory>
28 #include <new>
29 #include <string>
30 #include <utility>
31 
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/str_cat.h"
35 #include "absl/strings/string_view.h"
36 #include "absl/types/optional.h"
37 
38 #include <grpc/grpc.h>
39 #include <grpc/impl/channel_arg_names.h>
40 #include <grpc/impl/connectivity_state.h>
41 #include <grpc/status.h>
42 #include <grpc/support/alloc.h>
43 #include <grpc/support/log.h>
44 #include <grpc/support/sync.h>
45 
46 #include "src/core/lib/channel/channel_args.h"
47 #include "src/core/lib/channel/channel_args_preconditioning.h"
48 #include "src/core/lib/channel/channelz.h"
49 #include "src/core/lib/config/core_configuration.h"
50 #include "src/core/lib/gprpp/debug_location.h"
51 #include "src/core/lib/gprpp/ref_counted_ptr.h"
52 #include "src/core/lib/gprpp/status_helper.h"
53 #include "src/core/lib/gprpp/time.h"
54 #include "src/core/lib/iomgr/closure.h"
55 #include "src/core/lib/iomgr/endpoint.h"
56 #include "src/core/lib/iomgr/error.h"
57 #include "src/core/lib/iomgr/exec_ctx.h"
58 #include "src/core/lib/iomgr/iomgr_fwd.h"
59 #include "src/core/lib/resource_quota/arena.h"
60 #include "src/core/lib/slice/slice.h"
61 #include "src/core/lib/slice/slice_buffer.h"
62 #include "src/core/lib/surface/api_trace.h"
63 #include "src/core/lib/surface/channel.h"
64 #include "src/core/lib/surface/channel_create.h"
65 #include "src/core/lib/surface/channel_stack_type.h"
66 #include "src/core/lib/surface/server.h"
67 #include "src/core/lib/transport/connectivity_state.h"
68 #include "src/core/lib/transport/metadata_batch.h"
69 #include "src/core/lib/transport/transport.h"
70 
71 #define INPROC_LOG(...)                               \
72   do {                                                \
73     if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) { \
74       gpr_log(__VA_ARGS__);                           \
75     }                                                 \
76   } while (0)
77 
78 namespace {
79 struct inproc_stream;
80 bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error);
81 void maybe_process_ops_locked(inproc_stream* s, grpc_error_handle error);
82 void op_state_machine_locked(inproc_stream* s, grpc_error_handle error);
83 void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
84                   bool is_initial);
85 void fill_in_metadata(inproc_stream* s, const grpc_metadata_batch* metadata,
86                       grpc_metadata_batch* out_md, bool* markfilled);
87 
ResetSendMessage(grpc_transport_stream_op_batch * batch)88 void ResetSendMessage(grpc_transport_stream_op_batch* batch) {
89   std::exchange(batch->payload->send_message.send_message, nullptr)->Clear();
90 }
91 
92 struct shared_mu {
shared_mu__anonf7bd857f0111::shared_mu93   shared_mu() {
94     // Share one lock between both sides since both sides get affected
95     gpr_mu_init(&mu);
96     gpr_ref_init(&refs, 2);
97   }
98 
~shared_mu__anonf7bd857f0111::shared_mu99   ~shared_mu() { gpr_mu_destroy(&mu); }
100 
101   gpr_mu mu;
102   gpr_refcount refs;
103 };
104 
105 struct inproc_transport final : public grpc_core::Transport,
106                                 public grpc_core::FilterStackTransport {
inproc_transport__anonf7bd857f0111::inproc_transport107   inproc_transport(shared_mu* mu, bool is_client)
108       : mu(mu),
109         is_client(is_client),
110         state_tracker(is_client ? "inproc_client" : "inproc_server",
111                       GRPC_CHANNEL_READY) {
112     // Start each side of transport with 2 refs since they each have a ref
113     // to the other
114     gpr_ref_init(&refs, 2);
115   }
116 
~inproc_transport__anonf7bd857f0111::inproc_transport117   ~inproc_transport() override {
118     if (gpr_unref(&mu->refs)) {
119       mu->~shared_mu();
120       gpr_free(mu);
121     }
122   }
123 
filter_stack_transport__anonf7bd857f0111::inproc_transport124   grpc_core::FilterStackTransport* filter_stack_transport() override {
125     return this;
126   }
127 
client_transport__anonf7bd857f0111::inproc_transport128   grpc_core::ClientTransport* client_transport() override { return nullptr; }
server_transport__anonf7bd857f0111::inproc_transport129   grpc_core::ServerTransport* server_transport() override { return nullptr; }
130 
131   absl::string_view GetTransportName() const override;
132   void InitStream(grpc_stream* gs, grpc_stream_refcount* refcount,
133                   const void* server_data, grpc_core::Arena* arena) override;
134   void SetPollset(grpc_stream* stream, grpc_pollset* pollset) override;
135   void SetPollsetSet(grpc_stream* stream,
136                      grpc_pollset_set* pollset_set) override;
137   void PerformOp(grpc_transport_op* op) override;
138   grpc_endpoint* GetEndpoint() override;
139 
140   size_t SizeOfStream() const override;
HackyDisableStreamOpBatchCoalescingInConnectedChannel__anonf7bd857f0111::inproc_transport141   bool HackyDisableStreamOpBatchCoalescingInConnectedChannel() const override {
142     return true;
143   }
144 
145   void PerformStreamOp(grpc_stream* gs,
146                        grpc_transport_stream_op_batch* op) override;
147   void DestroyStream(grpc_stream* gs,
148                      grpc_closure* then_schedule_closure) override;
149 
150   void Orphan() override;
151 
ref__anonf7bd857f0111::inproc_transport152   void ref() {
153     INPROC_LOG(GPR_INFO, "ref_transport %p", this);
154     gpr_ref(&refs);
155   }
156 
unref__anonf7bd857f0111::inproc_transport157   void unref() {
158     INPROC_LOG(GPR_INFO, "unref_transport %p", this);
159     if (!gpr_unref(&refs)) {
160       return;
161     }
162     INPROC_LOG(GPR_INFO, "really_destroy_transport %p", this);
163     this->~inproc_transport();
164     gpr_free(this);
165   }
166 
167   shared_mu* mu;
168   gpr_refcount refs;
169   bool is_client;
170   grpc_core::ConnectivityStateTracker state_tracker;
171   void (*accept_stream_cb)(void* user_data, grpc_core::Transport* transport,
172                            const void* server_data);
173   void (*registered_method_matcher_cb)(
174       void* user_data, grpc_core::ServerMetadata* metadata) = nullptr;
175   void* accept_stream_data;
176   bool is_closed = false;
177   struct inproc_transport* other_side;
178   struct inproc_stream* stream_list = nullptr;
179 };
180 
181 struct inproc_stream {
inproc_stream__anonf7bd857f0111::inproc_stream182   inproc_stream(inproc_transport* t, grpc_stream_refcount* refcount,
183                 const void* server_data, grpc_core::Arena* arena)
184       : t(t), refs(refcount), arena(arena) {
185     // Ref this stream right now for ctor and list.
186     ref("inproc_init_stream:init");
187     ref("inproc_init_stream:list");
188 
189     stream_list_prev = nullptr;
190     gpr_mu_lock(&t->mu->mu);
191     stream_list_next = t->stream_list;
192     if (t->stream_list) {
193       t->stream_list->stream_list_prev = this;
194     }
195     t->stream_list = this;
196     gpr_mu_unlock(&t->mu->mu);
197 
198     if (!server_data) {
199       t->ref();
200       inproc_transport* st = t->other_side;
201       st->ref();
202       other_side = nullptr;  // will get filled in soon
203       // Pass the client-side stream address to the server-side for a ref
204       ref("inproc_init_stream:clt");  // ref it now on behalf of server
205                                       // side to avoid destruction
206       INPROC_LOG(GPR_INFO, "calling accept stream cb %p %p",
207                  st->accept_stream_cb, st->accept_stream_data);
208       (*st->accept_stream_cb)(st->accept_stream_data, t, this);
209     } else {
210       // This is the server-side and is being called through accept_stream_cb
211       inproc_stream* cs = const_cast<inproc_stream*>(
212           static_cast<const inproc_stream*>(server_data));
213       other_side = cs;
214       // Ref the server-side stream on behalf of the client now
215       ref("inproc_init_stream:srv");
216 
217       // Now we are about to affect the other side, so lock the transport
218       // to make sure that it doesn't get destroyed
219       gpr_mu_lock(&t->mu->mu);
220       cs->other_side = this;
221       // Now transfer from the other side's write_buffer if any to the to_read
222       // buffer
223       if (cs->write_buffer_initial_md_filled) {
224         fill_in_metadata(this, &cs->write_buffer_initial_md,
225                          &to_read_initial_md, &to_read_initial_md_filled);
226         deadline = std::min(deadline, cs->write_buffer_deadline);
227         cs->write_buffer_initial_md.Clear();
228         cs->write_buffer_initial_md_filled = false;
229       }
230       if (cs->write_buffer_trailing_md_filled) {
231         fill_in_metadata(this, &cs->write_buffer_trailing_md,
232                          &to_read_trailing_md, &to_read_trailing_md_filled);
233         cs->write_buffer_trailing_md.Clear();
234         cs->write_buffer_trailing_md_filled = false;
235       }
236       if (!cs->write_buffer_cancel_error.ok()) {
237         cancel_other_error = cs->write_buffer_cancel_error;
238         cs->write_buffer_cancel_error = absl::OkStatus();
239         maybe_process_ops_locked(this, cancel_other_error);
240       }
241 
242       gpr_mu_unlock(&t->mu->mu);
243     }
244   }
245 
~inproc_stream__anonf7bd857f0111::inproc_stream246   ~inproc_stream() { t->unref(); }
247 
248 #ifndef NDEBUG
249 #define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason)
250 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs, reason)
251 #else
252 #define STREAM_REF(refs, reason) grpc_stream_ref(refs)
253 #define STREAM_UNREF(refs, reason) grpc_stream_unref(refs)
254 #endif
ref__anonf7bd857f0111::inproc_stream255   void ref(const char* reason) {
256     INPROC_LOG(GPR_INFO, "ref_stream %p %s", this, reason);
257     STREAM_REF(refs, reason);
258   }
259 
unref__anonf7bd857f0111::inproc_stream260   void unref(const char* reason) {
261     INPROC_LOG(GPR_INFO, "unref_stream %p %s", this, reason);
262     STREAM_UNREF(refs, reason);
263   }
264 #undef STREAM_REF
265 #undef STREAM_UNREF
266 
267   inproc_transport* t;
268   grpc_stream_refcount* refs;
269   grpc_core::Arena* arena;
270 
271   grpc_metadata_batch to_read_initial_md;
272   bool to_read_initial_md_filled = false;
273   grpc_metadata_batch to_read_trailing_md;
274   bool to_read_trailing_md_filled = false;
275   bool ops_needed = false;
276   // Write buffer used only during gap at init time when client-side
277   // stream is set up but server side stream is not yet set up
278   grpc_metadata_batch write_buffer_initial_md;
279   bool write_buffer_initial_md_filled = false;
280   grpc_core::Timestamp write_buffer_deadline =
281       grpc_core::Timestamp::InfFuture();
282   grpc_metadata_batch write_buffer_trailing_md;
283   bool write_buffer_trailing_md_filled = false;
284   grpc_error_handle write_buffer_cancel_error;
285 
286   struct inproc_stream* other_side;
287   bool other_side_closed = false;               // won't talk anymore
288   bool write_buffer_other_side_closed = false;  // on hold
289 
290   grpc_transport_stream_op_batch* send_message_op = nullptr;
291   grpc_transport_stream_op_batch* send_trailing_md_op = nullptr;
292   grpc_transport_stream_op_batch* recv_initial_md_op = nullptr;
293   grpc_transport_stream_op_batch* recv_message_op = nullptr;
294   grpc_transport_stream_op_batch* recv_trailing_md_op = nullptr;
295 
296   bool initial_md_sent = false;
297   bool trailing_md_sent = false;
298   bool initial_md_recvd = false;
299   bool trailing_md_recvd = false;
300   // The following tracks if the server-side only pretends to have received
301   // trailing metadata since it no longer cares about the RPC. If that is the
302   // case, it is still ok for the client to send trailing metadata (in which
303   // case it will be ignored).
304   bool trailing_md_recvd_implicit_only = false;
305 
306   bool closed = false;
307 
308   grpc_error_handle cancel_self_error;
309   grpc_error_handle cancel_other_error;
310 
311   grpc_core::Timestamp deadline = grpc_core::Timestamp::InfFuture();
312 
313   bool listed = true;
314   struct inproc_stream* stream_list_prev;
315   struct inproc_stream* stream_list_next;
316 };
317 
log_metadata(const grpc_metadata_batch * md_batch,bool is_client,bool is_initial)318 void log_metadata(const grpc_metadata_batch* md_batch, bool is_client,
319                   bool is_initial) {
320   std::string prefix = absl::StrCat(
321       "INPROC:", is_initial ? "HDR:" : "TRL:", is_client ? "CLI:" : "SVR:");
322   md_batch->Log([&prefix](absl::string_view key, absl::string_view value) {
323     gpr_log(GPR_INFO, "%s", absl::StrCat(prefix, key, ": ", value).c_str());
324   });
325 }
326 
327 namespace {
328 
329 class CopySink {
330  public:
CopySink(grpc_metadata_batch * dst)331   explicit CopySink(grpc_metadata_batch* dst) : dst_(dst) {}
332 
Encode(const grpc_core::Slice & key,const grpc_core::Slice & value)333   void Encode(const grpc_core::Slice& key, const grpc_core::Slice& value) {
334     dst_->Append(key.as_string_view(), value.AsOwned(),
335                  [](absl::string_view, const grpc_core::Slice&) {});
336   }
337 
338   template <class T, class V>
Encode(T trait,V value)339   void Encode(T trait, V value) {
340     dst_->Set(trait, value);
341   }
342 
343   template <class T>
Encode(T trait,const grpc_core::Slice & value)344   void Encode(T trait, const grpc_core::Slice& value) {
345     dst_->Set(trait, value.AsOwned());
346   }
347 
348  private:
349   grpc_metadata_batch* dst_;
350 };
351 
352 }  // namespace
353 
fill_in_metadata(inproc_stream * s,const grpc_metadata_batch * metadata,grpc_metadata_batch * out_md,bool * markfilled)354 void fill_in_metadata(inproc_stream* s, const grpc_metadata_batch* metadata,
355                       grpc_metadata_batch* out_md, bool* markfilled) {
356   if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) {
357     log_metadata(metadata, s->t->is_client,
358                  metadata->get_pointer(grpc_core::WaitForReady()) != nullptr);
359   }
360 
361   if (markfilled != nullptr) {
362     *markfilled = true;
363   }
364 
365   // TODO(ctiller): copy the metadata batch, don't rely on a bespoke copy
366   // function. Can only do this once mdelems are out of the way though, too
367   // many edge cases otherwise.
368   out_md->Clear();
369   CopySink sink(out_md);
370   metadata->Encode(&sink);
371 }
372 
InitStream(grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena * arena)373 void inproc_transport::InitStream(grpc_stream* gs,
374                                   grpc_stream_refcount* refcount,
375                                   const void* server_data,
376                                   grpc_core::Arena* arena) {
377   INPROC_LOG(GPR_INFO, "init_stream %p %p %p", this, gs, server_data);
378   new (gs) inproc_stream(this, refcount, server_data, arena);
379 }
380 
close_stream_locked(inproc_stream * s)381 void close_stream_locked(inproc_stream* s) {
382   if (!s->closed) {
383     // Release the metadata that we would have written out
384     s->write_buffer_initial_md.Clear();
385     s->write_buffer_trailing_md.Clear();
386 
387     if (s->listed) {
388       inproc_stream* p = s->stream_list_prev;
389       inproc_stream* n = s->stream_list_next;
390       if (p != nullptr) {
391         p->stream_list_next = n;
392       } else {
393         s->t->stream_list = n;
394       }
395       if (n != nullptr) {
396         n->stream_list_prev = p;
397       }
398       s->listed = false;
399       s->unref("close_stream:list");
400     }
401     s->closed = true;
402     s->unref("close_stream:closing");
403   }
404 }
405 
406 // This function means that we are done talking/listening to the other side
close_other_side_locked(inproc_stream * s,const char * reason)407 void close_other_side_locked(inproc_stream* s, const char* reason) {
408   if (s->other_side != nullptr) {
409     // First release the metadata that came from the other side's arena
410     s->to_read_initial_md.Clear();
411     s->to_read_trailing_md.Clear();
412 
413     s->other_side->unref(reason);
414     s->other_side_closed = true;
415     s->other_side = nullptr;
416   } else if (!s->other_side_closed) {
417     s->write_buffer_other_side_closed = true;
418   }
419 }
420 
421 // Call the on_complete closure associated with this stream_op_batch if
422 // this stream_op_batch is only one of the pending operations for this
423 // stream. This is called when one of the pending operations for the stream
424 // is done and about to be NULLed out
complete_if_batch_end_locked(inproc_stream * s,grpc_error_handle error,grpc_transport_stream_op_batch * op,const char * msg)425 void complete_if_batch_end_locked(inproc_stream* s, grpc_error_handle error,
426                                   grpc_transport_stream_op_batch* op,
427                                   const char* msg) {
428   int is_sm = static_cast<int>(op == s->send_message_op);
429   int is_stm = static_cast<int>(op == s->send_trailing_md_op);
430   // TODO(vjpai): We should not consider the recv ops here, since they
431   // have their own callbacks.  We should invoke a batch's on_complete
432   // as soon as all of the batch's send ops are complete, even if there
433   // are still recv ops pending.
434   int is_rim = static_cast<int>(op == s->recv_initial_md_op);
435   int is_rm = static_cast<int>(op == s->recv_message_op);
436   int is_rtm = static_cast<int>(op == s->recv_trailing_md_op);
437 
438   if ((is_sm + is_stm + is_rim + is_rm + is_rtm) == 1) {
439     INPROC_LOG(GPR_INFO, "%s %p %p %p %s", msg, s, op, op->on_complete,
440                grpc_core::StatusToString(error).c_str());
441     grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_complete, error);
442   }
443 }
444 
maybe_process_ops_locked(inproc_stream * s,grpc_error_handle error)445 void maybe_process_ops_locked(inproc_stream* s, grpc_error_handle error) {
446   if (s && (!error.ok() || s->ops_needed)) {
447     s->ops_needed = false;
448     op_state_machine_locked(s, error);
449   }
450 }
451 
fail_helper_locked(inproc_stream * s,grpc_error_handle error)452 void fail_helper_locked(inproc_stream* s, grpc_error_handle error) {
453   INPROC_LOG(GPR_INFO, "op_state_machine %p fail_helper", s);
454   // If we're failing this side, we need to make sure that
455   // we also send or have already sent trailing metadata
456   if (!s->trailing_md_sent) {
457     // Send trailing md to the other side indicating cancellation
458     s->trailing_md_sent = true;
459 
460     grpc_metadata_batch fake_md;
461     inproc_stream* other = s->other_side;
462     grpc_metadata_batch* dest = (other == nullptr)
463                                     ? &s->write_buffer_trailing_md
464                                     : &other->to_read_trailing_md;
465     bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
466                                           : &other->to_read_trailing_md_filled;
467     fill_in_metadata(s, &fake_md, dest, destfilled);
468 
469     if (other != nullptr) {
470       if (other->cancel_other_error.ok()) {
471         other->cancel_other_error = error;
472       }
473       maybe_process_ops_locked(other, error);
474     } else if (s->write_buffer_cancel_error.ok()) {
475       s->write_buffer_cancel_error = error;
476     }
477   }
478   if (s->recv_initial_md_op) {
479     grpc_error_handle err;
480     if (!s->t->is_client) {
481       // If this is a server, provide initial metadata with a path and
482       // authority since it expects that as well as no error yet
483       grpc_metadata_batch fake_md;
484       fake_md.Set(grpc_core::HttpPathMetadata(),
485                   grpc_core::Slice::FromStaticString("/"));
486       fake_md.Set(grpc_core::HttpAuthorityMetadata(),
487                   grpc_core::Slice::FromStaticString("inproc-fail"));
488 
489       fill_in_metadata(s, &fake_md,
490                        s->recv_initial_md_op->payload->recv_initial_metadata
491                            .recv_initial_metadata,
492                        nullptr);
493       err = absl::OkStatus();
494     } else {
495       err = error;
496     }
497     if (s->recv_initial_md_op->payload->recv_initial_metadata
498             .trailing_metadata_available != nullptr) {
499       // Set to true unconditionally, because we're failing the call, so even
500       // if we haven't actually seen the send_trailing_metadata op from the
501       // other side, we're going to return trailing metadata anyway.
502       *s->recv_initial_md_op->payload->recv_initial_metadata
503            .trailing_metadata_available = true;
504     }
505     INPROC_LOG(GPR_INFO,
506                "fail_helper %p scheduling initial-metadata-ready %s %s", s,
507                grpc_core::StatusToString(error).c_str(),
508                grpc_core::StatusToString(err).c_str());
509     grpc_core::ExecCtx::Run(
510         DEBUG_LOCATION,
511         s->recv_initial_md_op->payload->recv_initial_metadata
512             .recv_initial_metadata_ready,
513         err);
514     // Last use of err so no need to REF and then UNREF it
515 
516     complete_if_batch_end_locked(
517         s, error, s->recv_initial_md_op,
518         "fail_helper scheduling recv-initial-metadata-on-complete");
519     s->recv_initial_md_op = nullptr;
520   }
521   if (s->recv_message_op) {
522     INPROC_LOG(GPR_INFO, "fail_helper %p scheduling message-ready %s", s,
523                grpc_core::StatusToString(error).c_str());
524     if (s->recv_message_op->payload->recv_message
525             .call_failed_before_recv_message != nullptr) {
526       *s->recv_message_op->payload->recv_message
527            .call_failed_before_recv_message = true;
528     }
529     grpc_core::ExecCtx::Run(
530         DEBUG_LOCATION,
531         s->recv_message_op->payload->recv_message.recv_message_ready, error);
532     complete_if_batch_end_locked(
533         s, error, s->recv_message_op,
534         "fail_helper scheduling recv-message-on-complete");
535     s->recv_message_op = nullptr;
536   }
537   if (s->send_message_op) {
538     ResetSendMessage(s->send_message_op);
539     complete_if_batch_end_locked(
540         s, error, s->send_message_op,
541         "fail_helper scheduling send-message-on-complete");
542     s->send_message_op = nullptr;
543   }
544   if (s->send_trailing_md_op) {
545     complete_if_batch_end_locked(
546         s, error, s->send_trailing_md_op,
547         "fail_helper scheduling send-trailng-md-on-complete");
548     s->send_trailing_md_op = nullptr;
549   }
550   if (s->recv_trailing_md_op) {
551     INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-metadata-ready %s",
552                s, grpc_core::StatusToString(error).c_str());
553     grpc_core::ExecCtx::Run(
554         DEBUG_LOCATION,
555         s->recv_trailing_md_op->payload->recv_trailing_metadata
556             .recv_trailing_metadata_ready,
557         error);
558     INPROC_LOG(GPR_INFO, "fail_helper %p scheduling trailing-md-on-complete %s",
559                s, grpc_core::StatusToString(error).c_str());
560     complete_if_batch_end_locked(
561         s, error, s->recv_trailing_md_op,
562         "fail_helper scheduling recv-trailing-metadata-on-complete");
563     s->recv_trailing_md_op = nullptr;
564   }
565   close_other_side_locked(s, "fail_helper:other_side");
566   close_stream_locked(s);
567 }
568 
569 // TODO(vjpai): It should not be necessary to drain the incoming byte
570 // stream and create a new one; instead, we should simply pass the byte
571 // stream from the sender directly to the receiver as-is.
572 //
573 // Note that fixing this will also avoid the assumption in this code
574 // that the incoming byte stream's next() call will always return
575 // synchronously.  That assumption is true today but may not always be
576 // true in the future.
message_transfer_locked(inproc_stream * sender,inproc_stream * receiver)577 void message_transfer_locked(inproc_stream* sender, inproc_stream* receiver) {
578   *receiver->recv_message_op->payload->recv_message.recv_message =
579       std::move(*sender->send_message_op->payload->send_message.send_message);
580   *receiver->recv_message_op->payload->recv_message.flags =
581       sender->send_message_op->payload->send_message.flags;
582 
583   INPROC_LOG(GPR_INFO, "message_transfer_locked %p scheduling message-ready",
584              receiver);
585   grpc_core::ExecCtx::Run(
586       DEBUG_LOCATION,
587       receiver->recv_message_op->payload->recv_message.recv_message_ready,
588       absl::OkStatus());
589   complete_if_batch_end_locked(
590       sender, absl::OkStatus(), sender->send_message_op,
591       "message_transfer scheduling sender on_complete");
592   complete_if_batch_end_locked(
593       receiver, absl::OkStatus(), receiver->recv_message_op,
594       "message_transfer scheduling receiver on_complete");
595 
596   receiver->recv_message_op = nullptr;
597   sender->send_message_op = nullptr;
598 }
599 
op_state_machine_locked(inproc_stream * s,grpc_error_handle error)600 void op_state_machine_locked(inproc_stream* s, grpc_error_handle error) {
601   // This function gets called when we have contents in the unprocessed reads
602   // Get what we want based on our ops wanted
603   // Schedule our appropriate closures
604   // and then return to ops_needed state if still needed
605 
606   grpc_error_handle new_err;
607 
608   bool needs_close = false;
609 
610   INPROC_LOG(GPR_INFO, "op_state_machine %p", s);
611   // cancellation takes precedence
612   inproc_stream* other = s->other_side;
613 
614   if (!s->cancel_self_error.ok()) {
615     fail_helper_locked(s, s->cancel_self_error);
616     goto done;
617   } else if (!s->cancel_other_error.ok()) {
618     fail_helper_locked(s, s->cancel_other_error);
619     goto done;
620   } else if (!error.ok()) {
621     fail_helper_locked(s, error);
622     goto done;
623   }
624 
625   if (s->send_message_op && other) {
626     if (other->recv_message_op) {
627       message_transfer_locked(s, other);
628       maybe_process_ops_locked(other, absl::OkStatus());
629     } else if (!s->t->is_client && s->trailing_md_sent) {
630       // A server send will never be matched if the server already sent status
631       ResetSendMessage(s->send_message_op);
632       complete_if_batch_end_locked(
633           s, absl::OkStatus(), s->send_message_op,
634           "op_state_machine scheduling send-message-on-complete case 1");
635       s->send_message_op = nullptr;
636     }
637   }
638   // Pause a send trailing metadata if there is still an outstanding
639   // send message unless we know that the send message will never get
640   // matched to a receive. This happens on the client if the server has
641   // already sent status or on the server if the client has requested
642   // status
643   if (s->send_trailing_md_op &&
644       (!s->send_message_op ||
645        (s->t->is_client &&
646         (s->trailing_md_recvd || s->to_read_trailing_md_filled)) ||
647        (!s->t->is_client && other &&
648         (other->trailing_md_recvd || other->to_read_trailing_md_filled ||
649          other->recv_trailing_md_op)))) {
650     grpc_metadata_batch* dest = (other == nullptr)
651                                     ? &s->write_buffer_trailing_md
652                                     : &other->to_read_trailing_md;
653     bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
654                                           : &other->to_read_trailing_md_filled;
655     if (*destfilled || s->trailing_md_sent) {
656       // The buffer is already in use; that's an error!
657       INPROC_LOG(GPR_INFO, "Extra trailing metadata %p", s);
658       new_err = GRPC_ERROR_CREATE("Extra trailing metadata");
659       fail_helper_locked(s, new_err);
660       goto done;
661     } else {
662       if (!other || !other->closed) {
663         fill_in_metadata(s,
664                          s->send_trailing_md_op->payload->send_trailing_metadata
665                              .send_trailing_metadata,
666                          dest, destfilled);
667       }
668       s->trailing_md_sent = true;
669       if (s->send_trailing_md_op->payload->send_trailing_metadata.sent) {
670         *s->send_trailing_md_op->payload->send_trailing_metadata.sent = true;
671       }
672       if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
673         INPROC_LOG(GPR_INFO,
674                    "op_state_machine %p scheduling trailing-metadata-ready", s);
675         grpc_core::ExecCtx::Run(
676             DEBUG_LOCATION,
677             s->recv_trailing_md_op->payload->recv_trailing_metadata
678                 .recv_trailing_metadata_ready,
679             absl::OkStatus());
680         INPROC_LOG(GPR_INFO,
681                    "op_state_machine %p scheduling trailing-md-on-complete", s);
682         grpc_core::ExecCtx::Run(DEBUG_LOCATION,
683                                 s->recv_trailing_md_op->on_complete,
684                                 absl::OkStatus());
685         s->recv_trailing_md_op = nullptr;
686         needs_close = true;
687       }
688     }
689     maybe_process_ops_locked(other, absl::OkStatus());
690     complete_if_batch_end_locked(
691         s, absl::OkStatus(), s->send_trailing_md_op,
692         "op_state_machine scheduling send-trailing-metadata-on-complete");
693     s->send_trailing_md_op = nullptr;
694   }
695   if (s->recv_initial_md_op) {
696     if (s->initial_md_recvd) {
697       new_err = GRPC_ERROR_CREATE("Already recvd initial md");
698       INPROC_LOG(
699           GPR_INFO,
700           "op_state_machine %p scheduling on_complete errors for already "
701           "recvd initial md %s",
702           s, grpc_core::StatusToString(new_err).c_str());
703       fail_helper_locked(s, new_err);
704       goto done;
705     }
706 
707     if (s->to_read_initial_md_filled) {
708       s->initial_md_recvd = true;
709       fill_in_metadata(s, &s->to_read_initial_md,
710                        s->recv_initial_md_op->payload->recv_initial_metadata
711                            .recv_initial_metadata,
712                        nullptr);
713       if (s->deadline != grpc_core::Timestamp::InfFuture()) {
714         s->recv_initial_md_op->payload->recv_initial_metadata
715             .recv_initial_metadata->Set(grpc_core::GrpcTimeoutMetadata(),
716                                         s->deadline);
717       }
718       if (s->recv_initial_md_op->payload->recv_initial_metadata
719               .trailing_metadata_available != nullptr) {
720         *s->recv_initial_md_op->payload->recv_initial_metadata
721              .trailing_metadata_available =
722             (other != nullptr && other->send_trailing_md_op != nullptr);
723       }
724       s->to_read_initial_md.Clear();
725       s->to_read_initial_md_filled = false;
726       if (s->t->registered_method_matcher_cb != nullptr) {
727         s->t->registered_method_matcher_cb(
728             s->t->accept_stream_data,
729             s->recv_initial_md_op->payload->recv_initial_metadata
730                 .recv_initial_metadata);
731       }
732       grpc_core::ExecCtx::Run(
733           DEBUG_LOCATION,
734           std::exchange(s->recv_initial_md_op->payload->recv_initial_metadata
735                             .recv_initial_metadata_ready,
736                         nullptr),
737           absl::OkStatus());
738       complete_if_batch_end_locked(
739           s, absl::OkStatus(), s->recv_initial_md_op,
740           "op_state_machine scheduling recv-initial-metadata-on-complete");
741       s->recv_initial_md_op = nullptr;
742     }
743   }
744   if (s->recv_message_op) {
745     if (other && other->send_message_op) {
746       message_transfer_locked(other, s);
747       maybe_process_ops_locked(other, absl::OkStatus());
748     }
749   }
750   if (s->to_read_trailing_md_filled) {
751     if (s->trailing_md_recvd) {
752       if (s->trailing_md_recvd_implicit_only) {
753         INPROC_LOG(GPR_INFO,
754                    "op_state_machine %p already implicitly received trailing "
755                    "metadata, so ignoring new trailing metadata from client",
756                    s);
757         s->to_read_trailing_md.Clear();
758         s->to_read_trailing_md_filled = false;
759         s->trailing_md_recvd_implicit_only = false;
760       } else {
761         new_err = GRPC_ERROR_CREATE("Already recvd trailing md");
762         INPROC_LOG(
763             GPR_INFO,
764             "op_state_machine %p scheduling on_complete errors for already "
765             "recvd trailing md %s",
766             s, grpc_core::StatusToString(new_err).c_str());
767         fail_helper_locked(s, new_err);
768         goto done;
769       }
770     }
771     if (s->recv_message_op != nullptr) {
772       // This message needs to be wrapped up because it will never be
773       // satisfied
774       s->recv_message_op->payload->recv_message.recv_message->reset();
775       INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
776       grpc_core::ExecCtx::Run(
777           DEBUG_LOCATION,
778           s->recv_message_op->payload->recv_message.recv_message_ready,
779           absl::OkStatus());
780       complete_if_batch_end_locked(
781           s, new_err, s->recv_message_op,
782           "op_state_machine scheduling recv-message-on-complete");
783       s->recv_message_op = nullptr;
784     }
785     if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
786       // Nothing further will try to receive from this stream, so finish off
787       // any outstanding send_message op
788       ResetSendMessage(s->send_message_op);
789       s->send_message_op->payload->send_message.stream_write_closed = true;
790       complete_if_batch_end_locked(
791           s, new_err, s->send_message_op,
792           "op_state_machine scheduling send-message-on-complete case 2");
793       s->send_message_op = nullptr;
794     }
795     if (s->recv_trailing_md_op != nullptr) {
796       // We wanted trailing metadata and we got it
797       s->trailing_md_recvd = true;
798       fill_in_metadata(s, &s->to_read_trailing_md,
799                        s->recv_trailing_md_op->payload->recv_trailing_metadata
800                            .recv_trailing_metadata,
801                        nullptr);
802       s->to_read_trailing_md.Clear();
803       s->to_read_trailing_md_filled = false;
804       s->recv_trailing_md_op->payload->recv_trailing_metadata
805           .recv_trailing_metadata->Set(grpc_core::GrpcStatusFromWire(), true);
806 
807       // We should schedule the recv_trailing_md_op completion if
808       // 1. this stream is the client-side
809       // 2. this stream is the server-side AND has already sent its trailing md
810       //    (If the server hasn't already sent its trailing md, it doesn't
811       //    have
812       //     a final status, so don't mark this op complete)
813       if (s->t->is_client || s->trailing_md_sent) {
814         grpc_core::ExecCtx::Run(
815             DEBUG_LOCATION,
816             s->recv_trailing_md_op->payload->recv_trailing_metadata
817                 .recv_trailing_metadata_ready,
818             absl::OkStatus());
819         grpc_core::ExecCtx::Run(DEBUG_LOCATION,
820                                 s->recv_trailing_md_op->on_complete,
821                                 absl::OkStatus());
822         s->recv_trailing_md_op = nullptr;
823         needs_close = s->trailing_md_sent;
824       }
825     } else if (!s->trailing_md_recvd) {
826       INPROC_LOG(
827           GPR_INFO,
828           "op_state_machine %p has trailing md but not yet waiting for it", s);
829     }
830   }
831   if (!s->t->is_client && s->trailing_md_sent &&
832       (s->recv_trailing_md_op != nullptr)) {
833     // In this case, we don't care to receive the write-close from the client
834     // because we have already sent status and the RPC is over as far as we
835     // are concerned.
836     INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling trailing-md-ready %s",
837                s, grpc_core::StatusToString(new_err).c_str());
838     grpc_core::ExecCtx::Run(
839         DEBUG_LOCATION,
840         s->recv_trailing_md_op->payload->recv_trailing_metadata
841             .recv_trailing_metadata_ready,
842         new_err);
843     complete_if_batch_end_locked(
844         s, new_err, s->recv_trailing_md_op,
845         "op_state_machine scheduling recv-trailing-md-on-complete");
846     s->trailing_md_recvd = true;
847     s->recv_trailing_md_op = nullptr;
848     // Since we are only pretending to have received the trailing MD, it would
849     // be ok (not an error) if the client actually sends it later.
850     s->trailing_md_recvd_implicit_only = true;
851   }
852   if (s->trailing_md_recvd && s->recv_message_op) {
853     // No further message will come on this stream, so finish off the
854     // recv_message_op
855     INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
856     s->recv_message_op->payload->recv_message.recv_message->reset();
857     grpc_core::ExecCtx::Run(
858         DEBUG_LOCATION,
859         s->recv_message_op->payload->recv_message.recv_message_ready,
860         absl::OkStatus());
861     complete_if_batch_end_locked(
862         s, new_err, s->recv_message_op,
863         "op_state_machine scheduling recv-message-on-complete");
864     s->recv_message_op = nullptr;
865   }
866   if (s->trailing_md_recvd && s->send_message_op && s->t->is_client) {
867     // Nothing further will try to receive from this stream, so finish off
868     // any outstanding send_message op
869     ResetSendMessage(s->send_message_op);
870     complete_if_batch_end_locked(
871         s, new_err, s->send_message_op,
872         "op_state_machine scheduling send-message-on-complete case 3");
873     s->send_message_op = nullptr;
874   }
875   if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
876       s->recv_message_op || s->recv_trailing_md_op) {
877     // Didn't get the item we wanted so we still need to get
878     // rescheduled
879     INPROC_LOG(
880         GPR_INFO, "op_state_machine %p still needs closure %p %p %p %p %p", s,
881         s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op,
882         s->recv_message_op, s->recv_trailing_md_op);
883     s->ops_needed = true;
884   }
885 done:
886   if (needs_close) {
887     close_other_side_locked(s, "op_state_machine");
888     close_stream_locked(s);
889   }
890 }
891 
cancel_stream_locked(inproc_stream * s,grpc_error_handle error)892 bool cancel_stream_locked(inproc_stream* s, grpc_error_handle error) {
893   bool ret = false;  // was the cancel accepted
894   INPROC_LOG(GPR_INFO, "cancel_stream %p with %s", s,
895              grpc_core::StatusToString(error).c_str());
896   if (s->cancel_self_error.ok()) {
897     ret = true;
898     s->cancel_self_error = error;
899     // Catch current value of other before it gets closed off
900     inproc_stream* other = s->other_side;
901     maybe_process_ops_locked(s, s->cancel_self_error);
902     // Send trailing md to the other side indicating cancellation, even if we
903     // already have
904     s->trailing_md_sent = true;
905 
906     grpc_metadata_batch cancel_md;
907 
908     grpc_metadata_batch* dest = (other == nullptr)
909                                     ? &s->write_buffer_trailing_md
910                                     : &other->to_read_trailing_md;
911     bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
912                                           : &other->to_read_trailing_md_filled;
913     fill_in_metadata(s, &cancel_md, dest, destfilled);
914 
915     if (other != nullptr) {
916       if (other->cancel_other_error.ok()) {
917         other->cancel_other_error = s->cancel_self_error;
918       }
919       maybe_process_ops_locked(other, other->cancel_other_error);
920     } else if (s->write_buffer_cancel_error.ok()) {
921       s->write_buffer_cancel_error = s->cancel_self_error;
922     }
923 
924     // if we are a server and already received trailing md but
925     // couldn't complete that because we hadn't yet sent out trailing
926     // md, now's the chance
927     if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
928       grpc_core::ExecCtx::Run(
929           DEBUG_LOCATION,
930           s->recv_trailing_md_op->payload->recv_trailing_metadata
931               .recv_trailing_metadata_ready,
932           s->cancel_self_error);
933       complete_if_batch_end_locked(
934           s, s->cancel_self_error, s->recv_trailing_md_op,
935           "cancel_stream scheduling trailing-md-on-complete");
936       s->recv_trailing_md_op = nullptr;
937     }
938   }
939 
940   close_other_side_locked(s, "cancel_stream:other_side");
941   close_stream_locked(s);
942 
943   return ret;
944 }
945 
PerformStreamOp(grpc_stream * gs,grpc_transport_stream_op_batch * op)946 void inproc_transport::PerformStreamOp(grpc_stream* gs,
947                                        grpc_transport_stream_op_batch* op) {
948   INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", this, gs, op);
949   inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
950   gpr_mu* mu = &s->t->mu->mu;  // save aside in case s gets closed
951   gpr_mu_lock(mu);
952 
953   if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) {
954     if (op->send_initial_metadata) {
955       log_metadata(op->payload->send_initial_metadata.send_initial_metadata,
956                    s->t->is_client, true);
957     }
958     if (op->send_trailing_metadata) {
959       log_metadata(op->payload->send_trailing_metadata.send_trailing_metadata,
960                    s->t->is_client, false);
961     }
962   }
963   grpc_error_handle error;
964   grpc_closure* on_complete = op->on_complete;
965   // TODO(roth): This is a hack needed because we use data inside of the
966   // closure itself to do the barrier calculation (i.e., to ensure that
967   // we don't schedule the closure until all ops in the batch have been
968   // completed).  This can go away once we move to a new C++ closure API
969   // that provides the ability to create a barrier closure.
970   if (on_complete == nullptr) {
971     on_complete = op->on_complete =
972         grpc_core::NewClosure([](grpc_error_handle) {});
973   }
974 
975   if (op->cancel_stream) {
976     // Call cancel_stream_locked without ref'ing the cancel_error because
977     // this function is responsible to make sure that that field gets unref'ed
978     cancel_stream_locked(s, op->payload->cancel_stream.cancel_error);
979     // this op can complete without an error
980   } else if (!s->cancel_self_error.ok()) {
981     // already self-canceled so still give it an error
982     error = s->cancel_self_error;
983   } else {
984     INPROC_LOG(GPR_INFO, "perform_stream_op %p %s%s%s%s%s%s%s", s,
985                s->t->is_client ? "client" : "server",
986                op->send_initial_metadata ? " send_initial_metadata" : "",
987                op->send_message ? " send_message" : "",
988                op->send_trailing_metadata ? " send_trailing_metadata" : "",
989                op->recv_initial_metadata ? " recv_initial_metadata" : "",
990                op->recv_message ? " recv_message" : "",
991                op->recv_trailing_metadata ? " recv_trailing_metadata" : "");
992   }
993 
994   inproc_stream* other = s->other_side;
995   if (error.ok() && (op->send_initial_metadata || op->send_trailing_metadata)) {
996     if (s->t->is_closed) {
997       error = GRPC_ERROR_CREATE("Endpoint already shutdown");
998     }
999     if (error.ok() && op->send_initial_metadata) {
1000       grpc_metadata_batch* dest = (other == nullptr)
1001                                       ? &s->write_buffer_initial_md
1002                                       : &other->to_read_initial_md;
1003       bool* destfilled = (other == nullptr) ? &s->write_buffer_initial_md_filled
1004                                             : &other->to_read_initial_md_filled;
1005       if (*destfilled || s->initial_md_sent) {
1006         // The buffer is already in use; that's an error!
1007         INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s);
1008         error = GRPC_ERROR_CREATE("Extra initial metadata");
1009       } else {
1010         if (!s->other_side_closed) {
1011           fill_in_metadata(
1012               s, op->payload->send_initial_metadata.send_initial_metadata, dest,
1013               destfilled);
1014         }
1015         if (s->t->is_client) {
1016           grpc_core::Timestamp* dl =
1017               (other == nullptr) ? &s->write_buffer_deadline : &other->deadline;
1018           *dl = std::min(
1019               *dl, op->payload->send_initial_metadata.send_initial_metadata
1020                        ->get(grpc_core::GrpcTimeoutMetadata())
1021                        .value_or(grpc_core::Timestamp::InfFuture()));
1022           s->initial_md_sent = true;
1023         }
1024       }
1025       maybe_process_ops_locked(other, error);
1026     }
1027   }
1028 
1029   if (error.ok() && (op->send_message || op->send_trailing_metadata ||
1030                      op->recv_initial_metadata || op->recv_message ||
1031                      op->recv_trailing_metadata)) {
1032     // Mark ops that need to be processed by the state machine
1033     if (op->send_message) {
1034       s->send_message_op = op;
1035     }
1036     if (op->send_trailing_metadata) {
1037       s->send_trailing_md_op = op;
1038     }
1039     if (op->recv_initial_metadata) {
1040       s->recv_initial_md_op = op;
1041     }
1042     if (op->recv_message) {
1043       s->recv_message_op = op;
1044     }
1045     if (op->recv_trailing_metadata) {
1046       s->recv_trailing_md_op = op;
1047     }
1048 
1049     // We want to initiate the state machine if:
1050     // 1. We want to send a message and the other side wants to receive
1051     // 2. We want to send trailing metadata and there isn't an unmatched send
1052     //    or the other side wants trailing metadata
1053     // 3. We want initial metadata and the other side has sent it
1054     // 4. We want to receive a message and there is a message ready
1055     // 5. There is trailing metadata, even if nothing specifically wants
1056     //    that because that can shut down the receive message as well
1057     if ((op->send_message && other && other->recv_message_op != nullptr) ||
1058         (op->send_trailing_metadata &&
1059          (!s->send_message_op || (other && other->recv_trailing_md_op))) ||
1060         (op->recv_initial_metadata && s->to_read_initial_md_filled) ||
1061         (op->recv_message && other && other->send_message_op != nullptr) ||
1062         (s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
1063       op_state_machine_locked(s, error);
1064     } else {
1065       s->ops_needed = true;
1066     }
1067   } else {
1068     if (!error.ok()) {
1069       // Consume any send message that was sent here but that we are not
1070       // pushing to the other side
1071       if (op->send_message) {
1072         ResetSendMessage(op);
1073       }
1074       // Schedule op's closures that we didn't push to op state machine
1075       if (op->recv_initial_metadata) {
1076         if (op->payload->recv_initial_metadata.trailing_metadata_available !=
1077             nullptr) {
1078           // Set to true unconditionally, because we're failing the call, so
1079           // even if we haven't actually seen the send_trailing_metadata op
1080           // from the other side, we're going to return trailing metadata
1081           // anyway.
1082           *op->payload->recv_initial_metadata.trailing_metadata_available =
1083               true;
1084         }
1085         INPROC_LOG(
1086             GPR_INFO,
1087             "perform_stream_op error %p scheduling initial-metadata-ready %s",
1088             s, grpc_core::StatusToString(error).c_str());
1089         grpc_core::ExecCtx::Run(
1090             DEBUG_LOCATION,
1091             op->payload->recv_initial_metadata.recv_initial_metadata_ready,
1092             error);
1093       }
1094       if (op->recv_message) {
1095         INPROC_LOG(
1096             GPR_INFO,
1097             "perform_stream_op error %p scheduling recv message-ready %s", s,
1098             grpc_core::StatusToString(error).c_str());
1099         if (op->payload->recv_message.call_failed_before_recv_message !=
1100             nullptr) {
1101           *op->payload->recv_message.call_failed_before_recv_message = true;
1102         }
1103         grpc_core::ExecCtx::Run(DEBUG_LOCATION,
1104                                 op->payload->recv_message.recv_message_ready,
1105                                 error);
1106       }
1107       if (op->recv_trailing_metadata) {
1108         INPROC_LOG(GPR_INFO,
1109                    "perform_stream_op error %p scheduling "
1110                    "trailing-metadata-ready %s",
1111                    s, grpc_core::StatusToString(error).c_str());
1112         grpc_core::ExecCtx::Run(
1113             DEBUG_LOCATION,
1114             op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
1115             error);
1116       }
1117     }
1118     INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %s", s,
1119                grpc_core::StatusToString(error).c_str());
1120     grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, error);
1121   }
1122   gpr_mu_unlock(mu);
1123 }
1124 
close_transport_locked(inproc_transport * t)1125 void close_transport_locked(inproc_transport* t) {
1126   INPROC_LOG(GPR_INFO, "close_transport %p %d", t, t->is_closed);
1127   t->state_tracker.SetState(GRPC_CHANNEL_SHUTDOWN, absl::Status(),
1128                             "close transport");
1129   if (!t->is_closed) {
1130     t->is_closed = true;
1131     // Also end all streams on this transport
1132     while (t->stream_list != nullptr) {
1133       // cancel_stream_locked also adjusts stream list
1134       cancel_stream_locked(
1135           t->stream_list,
1136           grpc_error_set_int(GRPC_ERROR_CREATE("Transport closed"),
1137                              grpc_core::StatusIntProperty::kRpcStatus,
1138                              GRPC_STATUS_UNAVAILABLE));
1139     }
1140   }
1141 }
1142 
PerformOp(grpc_transport_op * op)1143 void inproc_transport::PerformOp(grpc_transport_op* op) {
1144   INPROC_LOG(GPR_INFO, "perform_transport_op %p %p", this, op);
1145   gpr_mu_lock(&mu->mu);
1146   if (op->start_connectivity_watch != nullptr) {
1147     state_tracker.AddWatcher(op->start_connectivity_watch_state,
1148                              std::move(op->start_connectivity_watch));
1149   }
1150   if (op->stop_connectivity_watch != nullptr) {
1151     state_tracker.RemoveWatcher(op->stop_connectivity_watch);
1152   }
1153   if (op->set_accept_stream) {
1154     accept_stream_cb = op->set_accept_stream_fn;
1155     registered_method_matcher_cb = op->set_registered_method_matcher_fn;
1156     accept_stream_data = op->set_accept_stream_user_data;
1157   }
1158   if (op->on_consumed) {
1159     grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
1160   }
1161 
1162   bool do_close = false;
1163   if (!op->goaway_error.ok()) {
1164     do_close = true;
1165   }
1166   if (!op->disconnect_with_error.ok()) {
1167     do_close = true;
1168   }
1169 
1170   if (do_close) {
1171     close_transport_locked(this);
1172   }
1173   gpr_mu_unlock(&mu->mu);
1174 }
1175 
DestroyStream(grpc_stream * gs,grpc_closure * then_schedule_closure)1176 void inproc_transport::DestroyStream(grpc_stream* gs,
1177                                      grpc_closure* then_schedule_closure) {
1178   INPROC_LOG(GPR_INFO, "destroy_stream %p %p", gs, then_schedule_closure);
1179   inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
1180   gpr_mu_lock(&mu->mu);
1181   close_stream_locked(s);
1182   gpr_mu_unlock(&mu->mu);
1183   s->~inproc_stream();
1184   grpc_core::ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure,
1185                           absl::OkStatus());
1186 }
1187 
Orphan()1188 void inproc_transport::Orphan() {
1189   INPROC_LOG(GPR_INFO, "destroy_transport %p", this);
1190   gpr_mu_lock(&mu->mu);
1191   close_transport_locked(this);
1192   gpr_mu_unlock(&mu->mu);
1193   other_side->unref();
1194   unref();
1195 }
1196 
1197 //******************************************************************************
1198 // INTEGRATION GLUE
1199 //
1200 
SizeOfStream() const1201 size_t inproc_transport::SizeOfStream() const { return sizeof(inproc_stream); }
1202 
GetTransportName() const1203 absl::string_view inproc_transport::GetTransportName() const {
1204   return "inproc";
1205 }
1206 
SetPollset(grpc_stream *,grpc_pollset *)1207 void inproc_transport::SetPollset(grpc_stream* /*gs*/,
1208                                   grpc_pollset* /*pollset*/) {
1209   // Nothing to do here
1210 }
1211 
SetPollsetSet(grpc_stream *,grpc_pollset_set *)1212 void inproc_transport::SetPollsetSet(grpc_stream* /*gs*/,
1213                                      grpc_pollset_set* /*pollset_set*/) {
1214   // Nothing to do here
1215 }
1216 
GetEndpoint()1217 grpc_endpoint* inproc_transport::GetEndpoint() { return nullptr; }
1218 
1219 //******************************************************************************
1220 // Main inproc transport functions
1221 //
inproc_transports_create(grpc_core::Transport ** server_transport,grpc_core::Transport ** client_transport)1222 void inproc_transports_create(grpc_core::Transport** server_transport,
1223                               grpc_core::Transport** client_transport) {
1224   INPROC_LOG(GPR_INFO, "inproc_transports_create");
1225   shared_mu* mu = new (gpr_malloc(sizeof(*mu))) shared_mu();
1226   inproc_transport* st =
1227       new (gpr_malloc(sizeof(*st))) inproc_transport(mu, /*is_client=*/false);
1228   inproc_transport* ct =
1229       new (gpr_malloc(sizeof(*ct))) inproc_transport(mu, /*is_client=*/true);
1230   st->other_side = ct;
1231   ct->other_side = st;
1232   *server_transport = reinterpret_cast<grpc_core::Transport*>(st);
1233   *client_transport = reinterpret_cast<grpc_core::Transport*>(ct);
1234 }
1235 }  // namespace
1236 
grpc_legacy_inproc_channel_create(grpc_server * server,const grpc_channel_args * args,void *)1237 grpc_channel* grpc_legacy_inproc_channel_create(grpc_server* server,
1238                                                 const grpc_channel_args* args,
1239                                                 void* /*reserved*/) {
1240   GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2,
1241                  (server, args));
1242 
1243   grpc_core::ExecCtx exec_ctx;
1244 
1245   grpc_core::Server* core_server = grpc_core::Server::FromC(server);
1246   // Remove max_connection_idle and max_connection_age channel arguments since
1247   // those do not apply to inproc transports.
1248   grpc_core::ChannelArgs server_args =
1249       core_server->channel_args()
1250           .Remove(GRPC_ARG_MAX_CONNECTION_IDLE_MS)
1251           .Remove(GRPC_ARG_MAX_CONNECTION_AGE_MS);
1252 
1253   // Add a default authority channel argument for the client
1254   grpc_core::ChannelArgs client_args =
1255       grpc_core::CoreConfiguration::Get()
1256           .channel_args_preconditioning()
1257           .PreconditionChannelArgs(args)
1258           .Set(GRPC_ARG_DEFAULT_AUTHORITY, "inproc.authority");
1259   grpc_core::Transport* server_transport;
1260   grpc_core::Transport* client_transport;
1261   inproc_transports_create(&server_transport, &client_transport);
1262 
1263   // TODO(ncteisen): design and support channelz GetSocket for inproc.
1264   grpc_error_handle error = core_server->SetupTransport(
1265       server_transport, nullptr, server_args, nullptr);
1266   grpc_channel* channel = nullptr;
1267   if (error.ok()) {
1268     auto new_channel = grpc_core::ChannelCreate(
1269         "inproc", client_args, GRPC_CLIENT_DIRECT_CHANNEL, client_transport);
1270     if (!new_channel.ok()) {
1271       GPR_ASSERT(!channel);
1272       gpr_log(GPR_ERROR, "Failed to create client channel: %s",
1273               grpc_core::StatusToString(error).c_str());
1274       intptr_t integer;
1275       grpc_status_code status = GRPC_STATUS_INTERNAL;
1276       if (grpc_error_get_int(error, grpc_core::StatusIntProperty::kRpcStatus,
1277                              &integer)) {
1278         status = static_cast<grpc_status_code>(integer);
1279       }
1280       // client_transport was destroyed when grpc_channel_create_internal saw an
1281       // error.
1282       server_transport->Orphan();
1283       channel = grpc_lame_client_channel_create(
1284           nullptr, status, "Failed to create client channel");
1285     } else {
1286       channel = new_channel->release()->c_ptr();
1287     }
1288   } else {
1289     GPR_ASSERT(!channel);
1290     gpr_log(GPR_ERROR, "Failed to create server channel: %s",
1291             grpc_core::StatusToString(error).c_str());
1292     intptr_t integer;
1293     grpc_status_code status = GRPC_STATUS_INTERNAL;
1294     if (grpc_error_get_int(error, grpc_core::StatusIntProperty::kRpcStatus,
1295                            &integer)) {
1296       status = static_cast<grpc_status_code>(integer);
1297     }
1298     client_transport->Orphan();
1299     server_transport->Orphan();
1300     channel = grpc_lame_client_channel_create(
1301         nullptr, status, "Failed to create server channel");
1302   }
1303   return channel;
1304 }
1305