xref: /aosp_15_r20/external/grpc-grpc/src/core/ext/transport/chttp2/transport/chttp2_transport.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
20 
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <string.h>
24 
25 #include <algorithm>
26 #include <atomic>
27 #include <cstddef>
28 #include <limits>
29 #include <memory>
30 #include <new>
31 #include <string>
32 #include <type_traits>
33 #include <utility>
34 #include <vector>
35 
36 #include "absl/base/attributes.h"
37 #include "absl/container/flat_hash_map.h"
38 #include "absl/hash/hash.h"
39 #include "absl/meta/type_traits.h"
40 #include "absl/random/random.h"
41 #include "absl/status/status.h"
42 #include "absl/strings/cord.h"
43 #include "absl/strings/str_cat.h"
44 #include "absl/strings/str_format.h"
45 #include "absl/strings/string_view.h"
46 #include "absl/types/optional.h"
47 #include "absl/types/variant.h"
48 
49 #include <grpc/event_engine/event_engine.h>
50 #include <grpc/grpc.h>
51 #include <grpc/impl/channel_arg_names.h>
52 #include <grpc/impl/connectivity_state.h>
53 #include <grpc/slice_buffer.h>
54 #include <grpc/status.h>
55 #include <grpc/support/alloc.h>
56 #include <grpc/support/log.h>
57 #include <grpc/support/time.h>
58 
59 #include "src/core/ext/transport/chttp2/transport/context_list_entry.h"
60 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
61 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
62 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
63 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
64 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
65 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
66 #include "src/core/ext/transport/chttp2/transport/http_trace.h"
67 #include "src/core/ext/transport/chttp2/transport/internal.h"
68 #include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
69 #include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h"
70 #include "src/core/ext/transport/chttp2/transport/ping_abuse_policy.h"
71 #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
72 #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
73 #include "src/core/ext/transport/chttp2/transport/varint.h"
74 #include "src/core/ext/transport/chttp2/transport/write_size_policy.h"
75 #include "src/core/lib/channel/call_tracer.h"
76 #include "src/core/lib/channel/channel_args.h"
77 #include "src/core/lib/channel/context.h"
78 #include "src/core/lib/channel/tcp_tracer.h"
79 #include "src/core/lib/debug/stats.h"
80 #include "src/core/lib/debug/stats_data.h"
81 #include "src/core/lib/experiments/experiments.h"
82 #include "src/core/lib/gpr/string.h"
83 #include "src/core/lib/gpr/useful.h"
84 #include "src/core/lib/gprpp/bitset.h"
85 #include "src/core/lib/gprpp/crash.h"
86 #include "src/core/lib/gprpp/debug_location.h"
87 #include "src/core/lib/gprpp/ref_counted.h"
88 #include "src/core/lib/gprpp/status_helper.h"
89 #include "src/core/lib/gprpp/time.h"
90 #include "src/core/lib/http/parser.h"
91 #include "src/core/lib/iomgr/combiner.h"
92 #include "src/core/lib/iomgr/error.h"
93 #include "src/core/lib/iomgr/exec_ctx.h"
94 #include "src/core/lib/iomgr/iomgr_fwd.h"
95 #include "src/core/lib/iomgr/port.h"
96 #include "src/core/lib/promise/poll.h"
97 #include "src/core/lib/resource_quota/arena.h"
98 #include "src/core/lib/resource_quota/memory_quota.h"
99 #include "src/core/lib/resource_quota/resource_quota.h"
100 #include "src/core/lib/resource_quota/trace.h"
101 #include "src/core/lib/slice/slice.h"
102 #include "src/core/lib/slice/slice_buffer.h"
103 #include "src/core/lib/slice/slice_internal.h"
104 #include "src/core/lib/transport/bdp_estimator.h"
105 #include "src/core/lib/transport/connectivity_state.h"
106 #include "src/core/lib/transport/error_utils.h"
107 #include "src/core/lib/transport/http2_errors.h"
108 #include "src/core/lib/transport/metadata_batch.h"
109 #include "src/core/lib/transport/metadata_info.h"
110 #include "src/core/lib/transport/status_conversion.h"
111 #include "src/core/lib/transport/transport.h"
112 
113 #ifdef GRPC_POSIX_SOCKET_TCP
114 #include "src/core/lib/iomgr/ev_posix.h"
115 #endif
116 
117 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
118 #define MAX_WINDOW 0x7fffffffu
119 #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
120 
121 #define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2
122 
123 #define DEFAULT_MAX_PENDING_INDUCED_FRAMES 10000
124 
125 #define GRPC_ARG_HTTP2_PING_ON_RST_STREAM_PERCENT \
126   "grpc.http2.ping_on_rst_stream_percent"
127 
128 static grpc_core::Duration g_default_client_keepalive_time =
129     grpc_core::Duration::Infinity();
130 static grpc_core::Duration g_default_client_keepalive_timeout =
131     grpc_core::Duration::Seconds(20);
132 static grpc_core::Duration g_default_server_keepalive_time =
133     grpc_core::Duration::Hours(2);
134 static grpc_core::Duration g_default_server_keepalive_timeout =
135     grpc_core::Duration::Seconds(20);
136 static bool g_default_client_keepalive_permit_without_calls = false;
137 static bool g_default_server_keepalive_permit_without_calls = false;
138 
139 // EXPERIMENTAL: control tarpitting in chttp2
140 #define GRPC_ARG_HTTP_ALLOW_TARPIT "grpc.http.tarpit"
141 #define GRPC_ARG_HTTP_TARPIT_MIN_DURATION_MS "grpc.http.tarpit_min_duration_ms"
142 #define GRPC_ARG_HTTP_TARPIT_MAX_DURATION_MS "grpc.http.tarpit_max_duration_ms"
143 
144 #define MAX_CLIENT_STREAM_ID 0x7fffffffu
145 grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive");
146 grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
147                                                          "chttp2_refcount");
148 
149 // forward declarations of various callbacks that we'll build closures around
150 static void write_action_begin_locked(
151     grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
152 static void write_action(grpc_chttp2_transport* t);
153 static void write_action_end(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
154                              grpc_error_handle error);
155 static void write_action_end_locked(
156     grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
157 
158 static void read_action(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
159                         grpc_error_handle error);
160 static void read_action_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
161                                grpc_error_handle error);
162 static void continue_read_action_locked(
163     grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
164 
165 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
166                            grpc_error_handle error, bool tarpit);
167 
168 // Start new streams that have been created if we can
169 static void maybe_start_some_streams(grpc_chttp2_transport* t);
170 
171 static void connectivity_state_set(grpc_chttp2_transport* t,
172                                    grpc_connectivity_state state,
173                                    const absl::Status& status,
174                                    const char* reason);
175 
176 static void benign_reclaimer_locked(
177     grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
178 static void destructive_reclaimer_locked(
179     grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
180 
181 static void post_benign_reclaimer(grpc_chttp2_transport* t);
182 static void post_destructive_reclaimer(grpc_chttp2_transport* t);
183 
184 static void close_transport_locked(grpc_chttp2_transport* t,
185                                    grpc_error_handle error);
186 static void end_all_the_calls(grpc_chttp2_transport* t,
187                               grpc_error_handle error);
188 
189 static void start_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
190                            grpc_error_handle error);
191 static void finish_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
192                             grpc_error_handle error);
193 static void start_bdp_ping_locked(
194     grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
195 static void finish_bdp_ping_locked(
196     grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
197 static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t);
198 static void next_bdp_ping_timer_expired_locked(
199     grpc_core::RefCountedPtr<grpc_chttp2_transport> tp,
200     GRPC_UNUSED grpc_error_handle error);
201 
202 static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error);
203 static void send_ping_locked(grpc_chttp2_transport* t,
204                              grpc_closure* on_initiate, grpc_closure* on_ack);
205 static void retry_initiate_ping_locked(
206     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
207     GRPC_UNUSED grpc_error_handle error);
208 
209 // keepalive-relevant functions
210 static void init_keepalive_ping(
211     grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
212 static void init_keepalive_ping_locked(
213     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
214     GRPC_UNUSED grpc_error_handle error);
215 static void finish_keepalive_ping(
216     grpc_core::RefCountedPtr<grpc_chttp2_transport> t, grpc_error_handle error);
217 static void finish_keepalive_ping_locked(
218     grpc_core::RefCountedPtr<grpc_chttp2_transport> t, grpc_error_handle error);
219 static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t);
220 
221 static void send_goaway(grpc_chttp2_transport* t, grpc_error_handle error,
222                         bool immediate_disconnect_hint);
223 
224 // Timeout for getting an ack back on settings changes
225 #define GRPC_ARG_SETTINGS_TIMEOUT "grpc.http2.settings_timeout"
226 
227 namespace {
228 
229 using TaskHandle = ::grpc_event_engine::experimental::EventEngine::TaskHandle;
230 
CallTracerIfSampled(grpc_chttp2_stream * s)231 grpc_core::CallTracerInterface* CallTracerIfSampled(grpc_chttp2_stream* s) {
232   if (s->context == nullptr || !grpc_core::IsTraceRecordCallopsEnabled()) {
233     return nullptr;
234   }
235   auto* call_tracer = static_cast<grpc_core::CallTracerInterface*>(
236       static_cast<grpc_call_context_element*>(
237           s->context)[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
238           .value);
239   if (call_tracer == nullptr || !call_tracer->IsSampled()) {
240     return nullptr;
241   }
242   return call_tracer;
243 }
244 
TcpTracerIfSampled(grpc_chttp2_stream * s)245 std::shared_ptr<grpc_core::TcpTracerInterface> TcpTracerIfSampled(
246     grpc_chttp2_stream* s) {
247   if (s->context == nullptr || !grpc_core::IsTraceRecordCallopsEnabled()) {
248     return nullptr;
249   }
250   auto* call_attempt_tracer = static_cast<grpc_core::CallTracerInterface*>(
251       static_cast<grpc_call_context_element*>(
252           s->context)[GRPC_CONTEXT_CALL_TRACER]
253           .value);
254   if (call_attempt_tracer == nullptr || !call_attempt_tracer->IsSampled()) {
255     return nullptr;
256   }
257   return call_attempt_tracer->StartNewTcpTrace();
258 }
259 
260 grpc_core::WriteTimestampsCallback g_write_timestamps_callback = nullptr;
261 grpc_core::CopyContextFn g_get_copied_context_fn = nullptr;
262 }  // namespace
263 
264 namespace grpc_core {
265 
266 namespace {
267 // Initialize a grpc_closure \a c to call \a Fn with \a t and \a error. Holds
268 // the passed in reference to \a t until it's moved into Fn.
269 template <void (*Fn)(RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle)>
InitTransportClosure(RefCountedPtr<grpc_chttp2_transport> t,grpc_closure * c)270 grpc_closure* InitTransportClosure(RefCountedPtr<grpc_chttp2_transport> t,
271                                    grpc_closure* c) {
272   GRPC_CLOSURE_INIT(
273       c,
274       [](void* tp, grpc_error_handle error) {
275         Fn(RefCountedPtr<grpc_chttp2_transport>(
276                static_cast<grpc_chttp2_transport*>(tp)),
277            std::move(error));
278       },
279       t.release(), nullptr);
280   return c;
281 }
282 }  // namespace
283 
284 namespace {
285 TestOnlyGlobalHttp2TransportInitCallback test_only_init_callback = nullptr;
286 TestOnlyGlobalHttp2TransportDestructCallback test_only_destruct_callback =
287     nullptr;
288 bool test_only_disable_transient_failure_state_notification = false;
289 }  // namespace
290 
TestOnlySetGlobalHttp2TransportInitCallback(TestOnlyGlobalHttp2TransportInitCallback callback)291 void TestOnlySetGlobalHttp2TransportInitCallback(
292     TestOnlyGlobalHttp2TransportInitCallback callback) {
293   test_only_init_callback = callback;
294 }
295 
TestOnlySetGlobalHttp2TransportDestructCallback(TestOnlyGlobalHttp2TransportDestructCallback callback)296 void TestOnlySetGlobalHttp2TransportDestructCallback(
297     TestOnlyGlobalHttp2TransportDestructCallback callback) {
298   test_only_destruct_callback = callback;
299 }
300 
TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(bool disable)301 void TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(
302     bool disable) {
303   test_only_disable_transient_failure_state_notification = disable;
304 }
305 
GrpcHttp2SetWriteTimestampsCallback(WriteTimestampsCallback fn)306 void GrpcHttp2SetWriteTimestampsCallback(WriteTimestampsCallback fn) {
307   g_write_timestamps_callback = fn;
308 }
309 
GrpcHttp2SetCopyContextFn(CopyContextFn fn)310 void GrpcHttp2SetCopyContextFn(CopyContextFn fn) {
311   g_get_copied_context_fn = fn;
312 }
313 
GrpcHttp2GetWriteTimestampsCallback()314 WriteTimestampsCallback GrpcHttp2GetWriteTimestampsCallback() {
315   return g_write_timestamps_callback;
316 }
317 
GrpcHttp2GetCopyContextFn()318 CopyContextFn GrpcHttp2GetCopyContextFn() { return g_get_copied_context_fn; }
319 
320 // For each entry in the passed ContextList, it executes the function set using
321 // GrpcHttp2SetWriteTimestampsCallback method with each context in the list
322 // and \a ts. It also deletes/frees up the passed ContextList after this
323 // operation.
ForEachContextListEntryExecute(void * arg,Timestamps * ts,grpc_error_handle error)324 void ForEachContextListEntryExecute(void* arg, Timestamps* ts,
325                                     grpc_error_handle error) {
326   ContextList* context_list = reinterpret_cast<ContextList*>(arg);
327   if (!context_list) {
328     return;
329   }
330   for (auto it = context_list->begin(); it != context_list->end(); it++) {
331     ContextListEntry& entry = (*it);
332     if (ts) {
333       ts->byte_offset = static_cast<uint32_t>(entry.ByteOffsetInStream());
334     }
335     g_write_timestamps_callback(entry.TraceContext(), ts, error);
336   }
337   delete context_list;
338 }
339 
HttpAnnotation(Type type,gpr_timespec time)340 HttpAnnotation::HttpAnnotation(Type type, gpr_timespec time)
341     : CallTracerAnnotationInterface::Annotation(
342           CallTracerAnnotationInterface::AnnotationType::kHttpTransport),
343       type_(type),
344       time_(time) {}
345 
ToString() const346 std::string HttpAnnotation::ToString() const {
347   std::string s = "HttpAnnotation type: ";
348   switch (type_) {
349     case Type::kStart:
350       absl::StrAppend(&s, "Start");
351       break;
352     case Type::kHeadWritten:
353       absl::StrAppend(&s, "HeadWritten");
354       break;
355     case Type::kEnd:
356       absl::StrAppend(&s, "End");
357       break;
358     default:
359       absl::StrAppend(&s, "Unknown");
360   }
361   absl::StrAppend(&s, " time: ", gpr_format_timespec(time_));
362   if (transport_stats_.has_value()) {
363     absl::StrAppend(&s, " transport:[", transport_stats_->ToString(), "]");
364   }
365   if (stream_stats_.has_value()) {
366     absl::StrAppend(&s, " stream:[", stream_stats_->ToString(), "]");
367   }
368   return s;
369 }
370 
371 }  // namespace grpc_core
372 
373 //
374 // CONSTRUCTION/DESTRUCTION/REFCOUNTING
375 //
376 
~grpc_chttp2_transport()377 grpc_chttp2_transport::~grpc_chttp2_transport() {
378   size_t i;
379 
380   cancel_pings(this, GRPC_ERROR_CREATE("Transport destroyed"));
381 
382   event_engine.reset();
383 
384   if (channelz_socket != nullptr) {
385     channelz_socket.reset();
386   }
387 
388   grpc_endpoint_destroy(ep);
389 
390   grpc_slice_buffer_destroy(&qbuf);
391 
392   grpc_error_handle error = GRPC_ERROR_CREATE("Transport destroyed");
393   // ContextList::Execute follows semantics of a callback function and does not
394   // take a ref on error
395   if (cl != nullptr) {
396     grpc_core::ForEachContextListEntryExecute(cl, nullptr, error);
397   }
398   cl = nullptr;
399 
400   grpc_slice_buffer_destroy(&read_buffer);
401   grpc_chttp2_goaway_parser_destroy(&goaway_parser);
402 
403   for (i = 0; i < STREAM_LIST_COUNT; i++) {
404     GPR_ASSERT(lists[i].head == nullptr);
405     GPR_ASSERT(lists[i].tail == nullptr);
406   }
407 
408   GPR_ASSERT(stream_map.empty());
409   GRPC_COMBINER_UNREF(combiner, "chttp2_transport");
410 
411   while (write_cb_pool) {
412     grpc_chttp2_write_cb* next = write_cb_pool->next;
413     gpr_free(write_cb_pool);
414     write_cb_pool = next;
415   }
416 
417   gpr_free(ping_acks);
418   if (grpc_core::test_only_destruct_callback != nullptr) {
419     grpc_core::test_only_destruct_callback();
420   }
421 }
422 
read_channel_args(grpc_chttp2_transport * t,const grpc_core::ChannelArgs & channel_args,bool is_client)423 static void read_channel_args(grpc_chttp2_transport* t,
424                               const grpc_core::ChannelArgs& channel_args,
425                               bool is_client) {
426   const int initial_sequence_number =
427       channel_args.GetInt(GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER).value_or(-1);
428   if (initial_sequence_number > 0) {
429     if ((t->next_stream_id & 1) != (initial_sequence_number & 1)) {
430       gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
431               GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1,
432               is_client ? "client" : "server");
433     } else {
434       t->next_stream_id = static_cast<uint32_t>(initial_sequence_number);
435     }
436   }
437 
438   const int max_hpack_table_size =
439       channel_args.GetInt(GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER).value_or(-1);
440   if (max_hpack_table_size >= 0) {
441     t->hpack_compressor.SetMaxUsableSize(max_hpack_table_size);
442   }
443 
444   t->write_buffer_size =
445       std::max(0, channel_args.GetInt(GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)
446                       .value_or(grpc_core::chttp2::kDefaultWindow));
447   t->keepalive_time =
448       std::max(grpc_core::Duration::Milliseconds(1),
449                channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIME_MS)
450                    .value_or(t->is_client ? g_default_client_keepalive_time
451                                           : g_default_server_keepalive_time));
452   t->keepalive_timeout = std::max(
453       grpc_core::Duration::Zero(),
454       channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIMEOUT_MS)
455           .value_or(t->keepalive_time == grpc_core::Duration::Infinity()
456                         ? grpc_core::Duration::Infinity()
457                         : (t->is_client ? g_default_client_keepalive_timeout
458                                         : g_default_server_keepalive_timeout)));
459   t->ping_timeout = std::max(
460       grpc_core::Duration::Zero(),
461       channel_args.GetDurationFromIntMillis(GRPC_ARG_PING_TIMEOUT_MS)
462           .value_or(t->keepalive_time == grpc_core::Duration::Infinity()
463                         ? grpc_core::Duration::Infinity()
464                         : grpc_core::Duration::Minutes(1)));
465   if (t->is_client) {
466     t->keepalive_permit_without_calls =
467         channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)
468             .value_or(grpc_core::IsKeepaliveFixEnabled()
469                           ? g_default_client_keepalive_permit_without_calls
470                           : false);
471   } else {
472     t->keepalive_permit_without_calls =
473         channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)
474             .value_or(grpc_core::IsKeepaliveServerFixEnabled()
475                           ? g_default_server_keepalive_permit_without_calls
476                           : false);
477   }
478 
479   t->settings_timeout =
480       channel_args.GetDurationFromIntMillis(GRPC_ARG_SETTINGS_TIMEOUT)
481           .value_or(std::max(t->keepalive_timeout * 2,
482                              grpc_core::Duration::Minutes(1)));
483 
484   // Only send the prefered rx frame size http2 setting if we are instructed
485   // to auto size the buffers allocated at tcp level and we also can adjust
486   // sending frame size.
487   t->enable_preferred_rx_crypto_frame_advertisement =
488       channel_args
489           .GetBool(GRPC_ARG_EXPERIMENTAL_HTTP2_PREFERRED_CRYPTO_FRAME_SIZE)
490           .value_or(false);
491 
492   const auto max_requests_per_read =
493       channel_args.GetInt("grpc.http2.max_requests_per_read");
494   if (max_requests_per_read.has_value()) {
495     t->max_requests_per_read =
496         grpc_core::Clamp(*max_requests_per_read, 1, 10000);
497   } else {
498     t->max_requests_per_read = 32;
499   }
500 
501   if (channel_args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
502           .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) {
503     t->channelz_socket =
504         grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>(
505             std::string(grpc_endpoint_get_local_address(t->ep)),
506             std::string(t->peer_string.as_string_view()),
507             absl::StrCat(t->GetTransportName(), " ",
508                          t->peer_string.as_string_view()),
509             channel_args
510                 .GetObjectRef<grpc_core::channelz::SocketNode::Security>());
511   }
512 
513   t->ack_pings = channel_args.GetBool("grpc.http2.ack_pings").value_or(true);
514 
515   t->allow_tarpit =
516       channel_args.GetBool(GRPC_ARG_HTTP_ALLOW_TARPIT).value_or(true);
517   t->min_tarpit_duration_ms =
518       channel_args
519           .GetDurationFromIntMillis(GRPC_ARG_HTTP_TARPIT_MIN_DURATION_MS)
520           .value_or(grpc_core::Duration::Milliseconds(100))
521           .millis();
522   t->max_tarpit_duration_ms =
523       channel_args
524           .GetDurationFromIntMillis(GRPC_ARG_HTTP_TARPIT_MAX_DURATION_MS)
525           .value_or(grpc_core::Duration::Seconds(1))
526           .millis();
527   t->max_header_list_size_soft_limit =
528       grpc_core::GetSoftLimitFromChannelArgs(channel_args);
529 
530   int value;
531   if (!is_client) {
532     value = channel_args.GetInt(GRPC_ARG_MAX_CONCURRENT_STREAMS).value_or(-1);
533     if (value >= 0) {
534       t->settings.mutable_local().SetMaxConcurrentStreams(value);
535       t->max_concurrent_streams_policy.SetTarget(value);
536     }
537   } else if (channel_args.Contains(GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
538     gpr_log(GPR_DEBUG, "%s is not available on clients",
539             GRPC_ARG_MAX_CONCURRENT_STREAMS);
540   }
541   value =
542       channel_args.GetInt(GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER).value_or(-1);
543   if (value >= 0) {
544     t->settings.mutable_local().SetHeaderTableSize(value);
545   }
546   t->settings.mutable_local().SetMaxHeaderListSize(
547       grpc_core::GetHardLimitFromChannelArgs(channel_args));
548   value = channel_args.GetInt(GRPC_ARG_HTTP2_MAX_FRAME_SIZE).value_or(-1);
549   if (value >= 0) {
550     t->settings.mutable_local().SetMaxFrameSize(value);
551   }
552   value =
553       channel_args.GetInt(GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES).value_or(-1);
554   if (value >= 0) {
555     t->settings.mutable_local().SetInitialWindowSize(value);
556   }
557   value = channel_args.GetInt(GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY).value_or(-1);
558   if (value >= 0) {
559     t->settings.mutable_local().SetAllowTrueBinaryMetadata(value != 0);
560   }
561 
562   if (t->enable_preferred_rx_crypto_frame_advertisement) {
563     t->settings.mutable_local().SetPreferredReceiveCryptoMessageSize(INT_MAX);
564   }
565 
566   t->ping_on_rst_stream_percent = grpc_core::Clamp(
567       channel_args.GetInt(GRPC_ARG_HTTP2_PING_ON_RST_STREAM_PERCENT)
568           .value_or(1),
569       0, 100);
570 
571   t->max_concurrent_streams_overload_protection =
572       channel_args.GetBool(GRPC_ARG_MAX_CONCURRENT_STREAMS_OVERLOAD_PROTECTION)
573           .value_or(true);
574 }
575 
init_keepalive_pings_if_enabled_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,GRPC_UNUSED grpc_error_handle error)576 static void init_keepalive_pings_if_enabled_locked(
577     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
578     GRPC_UNUSED grpc_error_handle error) {
579   GPR_DEBUG_ASSERT(error.ok());
580   if (t->keepalive_time != grpc_core::Duration::Infinity()) {
581     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
582     t->keepalive_ping_timer_handle =
583         t->event_engine->RunAfter(t->keepalive_time, [t = t->Ref()]() mutable {
584           grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
585           grpc_core::ExecCtx exec_ctx;
586           init_keepalive_ping(std::move(t));
587         });
588   } else {
589     // Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
590     // inflight keepalive timers
591     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
592   }
593 }
594 
grpc_chttp2_transport(const grpc_core::ChannelArgs & channel_args,grpc_endpoint * ep,bool is_client)595 grpc_chttp2_transport::grpc_chttp2_transport(
596     const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep,
597     bool is_client)
598     : grpc_core::RefCounted<grpc_chttp2_transport,
599                             grpc_core::NonPolymorphicRefCount>(
600           GRPC_TRACE_FLAG_ENABLED(grpc_trace_chttp2_refcount)
601               ? "chttp2_refcount"
602               : nullptr),
603       ep(ep),
604       peer_string(
605           grpc_core::Slice::FromCopiedString(grpc_endpoint_get_peer(ep))),
606       memory_owner(channel_args.GetObject<grpc_core::ResourceQuota>()
607                        ->memory_quota()
608                        ->CreateMemoryOwner()),
609       self_reservation(
610           memory_owner.MakeReservation(sizeof(grpc_chttp2_transport))),
611       event_engine(
612           channel_args
613               .GetObjectRef<grpc_event_engine::experimental::EventEngine>()),
614       combiner(grpc_combiner_create(event_engine)),
615       state_tracker(is_client ? "client_transport" : "server_transport",
616                     GRPC_CHANNEL_READY),
617       next_stream_id(is_client ? 1 : 2),
618       ping_abuse_policy(channel_args),
619       ping_rate_policy(channel_args, is_client),
620       flow_control(
621           peer_string.as_string_view(),
622           channel_args.GetBool(GRPC_ARG_HTTP2_BDP_PROBE).value_or(true),
623           &memory_owner),
624       deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0),
625       is_client(is_client) {
626   cl = new grpc_core::ContextList();
627   GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
628              GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
629 
630   grpc_slice_buffer_init(&read_buffer);
631   if (is_client) {
632     grpc_slice_buffer_add(
633         outbuf.c_slice_buffer(),
634         grpc_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
635   }
636   grpc_slice_buffer_init(&qbuf);
637   grpc_chttp2_goaway_parser_init(&goaway_parser);
638 
639   // configure http2 the way we like it
640   if (is_client) {
641     settings.mutable_local().SetEnablePush(false);
642     settings.mutable_local().SetMaxConcurrentStreams(0);
643   }
644   settings.mutable_local().SetMaxHeaderListSize(DEFAULT_MAX_HEADER_LIST_SIZE);
645   settings.mutable_local().SetAllowTrueBinaryMetadata(true);
646 
647   read_channel_args(this, channel_args, is_client);
648 
649   // Initially allow *UP TO* MAX_CONCURRENT_STREAMS incoming before we start
650   // blanket cancelling them.
651   num_incoming_streams_before_settings_ack =
652       settings.local().max_concurrent_streams();
653 
654   grpc_core::ExecCtx exec_ctx;
655   combiner->Run(
656       grpc_core::InitTransportClosure<init_keepalive_pings_if_enabled_locked>(
657           Ref(), &init_keepalive_ping_locked),
658       absl::OkStatus());
659 
660   if (flow_control.bdp_probe()) {
661     bdp_ping_blocked = true;
662     grpc_chttp2_act_on_flowctl_action(flow_control.PeriodicUpdate(), this,
663                                       nullptr);
664   }
665 
666   grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
667   post_benign_reclaimer(this);
668   if (grpc_core::test_only_init_callback != nullptr) {
669     grpc_core::test_only_init_callback();
670   }
671 
672 #ifdef GRPC_POSIX_SOCKET_TCP
673   closure_barrier_may_cover_write =
674       grpc_event_engine_run_in_background() &&
675               grpc_core::IsScheduleCancellationOverWriteEnabled()
676           ? 0
677           : CLOSURE_BARRIER_MAY_COVER_WRITE;
678 #endif
679 }
680 
destroy_transport_locked(void * tp,grpc_error_handle)681 static void destroy_transport_locked(void* tp, grpc_error_handle /*error*/) {
682   grpc_core::RefCountedPtr<grpc_chttp2_transport> t(
683       static_cast<grpc_chttp2_transport*>(tp));
684   t->destroying = 1;
685   close_transport_locked(
686       t.get(),
687       grpc_error_set_int(GRPC_ERROR_CREATE("Transport destroyed"),
688                          grpc_core::StatusIntProperty::kOccurredDuringWrite,
689                          t->write_state));
690   t->memory_owner.Reset();
691 }
692 
Orphan()693 void grpc_chttp2_transport::Orphan() {
694   combiner->Run(GRPC_CLOSURE_CREATE(destroy_transport_locked, this, nullptr),
695                 absl::OkStatus());
696 }
697 
close_transport_locked(grpc_chttp2_transport * t,grpc_error_handle error)698 static void close_transport_locked(grpc_chttp2_transport* t,
699                                    grpc_error_handle error) {
700   end_all_the_calls(t, error);
701   cancel_pings(t, error);
702   if (t->closed_with_error.ok()) {
703     if (!grpc_error_has_clear_grpc_status(error)) {
704       error =
705           grpc_error_set_int(error, grpc_core::StatusIntProperty::kRpcStatus,
706                              GRPC_STATUS_UNAVAILABLE);
707     }
708     if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
709       if (t->close_transport_on_writes_finished.ok()) {
710         t->close_transport_on_writes_finished =
711             GRPC_ERROR_CREATE("Delayed close due to in-progress write");
712       }
713       t->close_transport_on_writes_finished =
714           grpc_error_add_child(t->close_transport_on_writes_finished, error);
715       return;
716     }
717     GPR_ASSERT(!error.ok());
718     t->closed_with_error = error;
719     connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, absl::Status(),
720                            "close_transport");
721     if (t->keepalive_ping_timeout_handle != TaskHandle::kInvalid) {
722       t->event_engine->Cancel(std::exchange(t->keepalive_ping_timeout_handle,
723                                             TaskHandle::kInvalid));
724     }
725     if (t->settings_ack_watchdog != TaskHandle::kInvalid) {
726       t->event_engine->Cancel(
727           std::exchange(t->settings_ack_watchdog, TaskHandle::kInvalid));
728     }
729     if (t->delayed_ping_timer_handle != TaskHandle::kInvalid &&
730         t->event_engine->Cancel(t->delayed_ping_timer_handle)) {
731       t->delayed_ping_timer_handle = TaskHandle::kInvalid;
732     }
733     if (t->next_bdp_ping_timer_handle != TaskHandle::kInvalid &&
734         t->event_engine->Cancel(t->next_bdp_ping_timer_handle)) {
735       t->next_bdp_ping_timer_handle = TaskHandle::kInvalid;
736     }
737     switch (t->keepalive_state) {
738       case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
739         if (t->keepalive_ping_timer_handle != TaskHandle::kInvalid &&
740             t->event_engine->Cancel(t->keepalive_ping_timer_handle)) {
741           t->keepalive_ping_timer_handle = TaskHandle::kInvalid;
742         }
743         break;
744       case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
745         if (t->keepalive_ping_timer_handle != TaskHandle::kInvalid &&
746             t->event_engine->Cancel(t->keepalive_ping_timer_handle)) {
747           t->keepalive_ping_timer_handle = TaskHandle::kInvalid;
748         }
749         break;
750       case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
751       case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
752         // keepalive timers are not set in these two states
753         break;
754     }
755 
756     // flush writable stream list to avoid dangling references
757     grpc_chttp2_stream* s;
758     while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
759       GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
760     }
761     GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
762     grpc_endpoint_shutdown(t->ep, error);
763   }
764   if (t->notify_on_receive_settings != nullptr) {
765     grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings,
766                             error);
767     t->notify_on_receive_settings = nullptr;
768   }
769   if (t->notify_on_close != nullptr) {
770     grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_close, error);
771     t->notify_on_close = nullptr;
772   }
773 }
774 
775 #ifndef NDEBUG
grpc_chttp2_stream_ref(grpc_chttp2_stream * s,const char * reason)776 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) {
777   grpc_stream_ref(s->refcount, reason);
778 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s,const char * reason)779 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) {
780   grpc_stream_unref(s->refcount, reason);
781 }
782 #else
grpc_chttp2_stream_ref(grpc_chttp2_stream * s)783 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
784   grpc_stream_ref(s->refcount);
785 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s)786 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
787   grpc_stream_unref(s->refcount);
788 }
789 #endif
790 
grpc_chttp2_stream(grpc_chttp2_transport * t,grpc_stream_refcount * refcount,const void * server_data)791 grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
792                                        grpc_stream_refcount* refcount,
793                                        const void* server_data)
794     : t(t->Ref()),
795       refcount([refcount]() {
796 // We reserve one 'active stream' that's dropped when the stream is
797 //   read-closed. The others are for Chttp2IncomingByteStreams that are
798 //   actively reading
799 // We do this here to avoid cache misses.
800 #ifndef NDEBUG
801         grpc_stream_ref(refcount, "chttp2");
802 #else
803         grpc_stream_ref(refcount);
804 #endif
805         return refcount;
806       }()),
807       flow_control(&t->flow_control) {
808   t->streams_allocated.fetch_add(1, std::memory_order_relaxed);
809   if (server_data) {
810     id = static_cast<uint32_t>(reinterpret_cast<uintptr_t>(server_data));
811     if (grpc_http_trace.enabled()) {
812       gpr_log(GPR_DEBUG, "HTTP:%p/%p creating accept stream %d [from %p]", t,
813               this, id, server_data);
814     }
815     *t->accepting_stream = this;
816     t->stream_map.emplace(id, this);
817     post_destructive_reclaimer(t);
818   }
819 
820   grpc_slice_buffer_init(&frame_storage);
821   grpc_slice_buffer_init(&flow_controlled_buffer);
822 }
823 
~grpc_chttp2_stream()824 grpc_chttp2_stream::~grpc_chttp2_stream() {
825   t->streams_allocated.fetch_sub(1, std::memory_order_relaxed);
826   grpc_chttp2_list_remove_stalled_by_stream(t.get(), this);
827   grpc_chttp2_list_remove_stalled_by_transport(t.get(), this);
828 
829   if (t->channelz_socket != nullptr) {
830     if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) {
831       t->channelz_socket->RecordStreamSucceeded();
832     } else {
833       t->channelz_socket->RecordStreamFailed();
834     }
835   }
836 
837   GPR_ASSERT((write_closed && read_closed) || id == 0);
838   if (id != 0) {
839     GPR_ASSERT(t->stream_map.count(id) == 0);
840   }
841 
842   grpc_slice_buffer_destroy(&frame_storage);
843 
844   for (int i = 0; i < STREAM_LIST_COUNT; i++) {
845     if (GPR_UNLIKELY(included.is_set(i))) {
846       grpc_core::Crash(absl::StrFormat("%s stream %d still included in list %d",
847                                        t->is_client ? "client" : "server", id,
848                                        i));
849     }
850   }
851 
852   GPR_ASSERT(send_initial_metadata_finished == nullptr);
853   GPR_ASSERT(send_trailing_metadata_finished == nullptr);
854   GPR_ASSERT(recv_initial_metadata_ready == nullptr);
855   GPR_ASSERT(recv_message_ready == nullptr);
856   GPR_ASSERT(recv_trailing_metadata_finished == nullptr);
857   grpc_slice_buffer_destroy(&flow_controlled_buffer);
858   grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_stream_arg, absl::OkStatus());
859 }
860 
InitStream(grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena *)861 void grpc_chttp2_transport::InitStream(grpc_stream* gs,
862                                        grpc_stream_refcount* refcount,
863                                        const void* server_data,
864                                        grpc_core::Arena*) {
865   new (gs) grpc_chttp2_stream(this, refcount, server_data);
866 }
867 
destroy_stream_locked(void * sp,grpc_error_handle)868 static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) {
869   grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
870   s->~grpc_chttp2_stream();
871 }
872 
DestroyStream(grpc_stream * gs,grpc_closure * then_schedule_closure)873 void grpc_chttp2_transport::DestroyStream(grpc_stream* gs,
874                                           grpc_closure* then_schedule_closure) {
875   grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
876 
877   s->destroy_stream_arg = then_schedule_closure;
878   combiner->Run(
879       GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, nullptr),
880       absl::OkStatus());
881 }
882 
grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport * t,uint32_t id)883 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
884                                                       uint32_t id) {
885   if (t->accept_stream_cb == nullptr) {
886     return nullptr;
887   }
888   grpc_chttp2_stream* accepting = nullptr;
889   GPR_ASSERT(t->accepting_stream == nullptr);
890   t->accepting_stream = &accepting;
891   t->accept_stream_cb(t->accept_stream_cb_user_data, t,
892                       reinterpret_cast<void*>(id));
893   t->accepting_stream = nullptr;
894   return accepting;
895 }
896 
897 //
898 // OUTPUT PROCESSING
899 //
900 
write_state_name(grpc_chttp2_write_state st)901 static const char* write_state_name(grpc_chttp2_write_state st) {
902   switch (st) {
903     case GRPC_CHTTP2_WRITE_STATE_IDLE:
904       return "IDLE";
905     case GRPC_CHTTP2_WRITE_STATE_WRITING:
906       return "WRITING";
907     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
908       return "WRITING+MORE";
909   }
910   GPR_UNREACHABLE_CODE(return "UNKNOWN");
911 }
912 
set_write_state(grpc_chttp2_transport * t,grpc_chttp2_write_state st,const char * reason)913 static void set_write_state(grpc_chttp2_transport* t,
914                             grpc_chttp2_write_state st, const char* reason) {
915   GRPC_CHTTP2_IF_TRACING(
916       gpr_log(GPR_INFO, "W:%p %s [%s] state %s -> %s [%s]", t,
917               t->is_client ? "CLIENT" : "SERVER",
918               std::string(t->peer_string.as_string_view()).c_str(),
919               write_state_name(t->write_state), write_state_name(st), reason));
920   t->write_state = st;
921   // If the state is being reset back to idle, it means a write was just
922   // finished. Make sure all the run_after_write closures are scheduled.
923   //
924   // This is also our chance to close the transport if the transport was marked
925   // to be closed after all writes finish (for example, if we received a go-away
926   // from peer while we had some pending writes)
927   if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
928     grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
929     if (!t->close_transport_on_writes_finished.ok()) {
930       grpc_error_handle err = t->close_transport_on_writes_finished;
931       t->close_transport_on_writes_finished = absl::OkStatus();
932       close_transport_locked(t, err);
933     }
934   }
935 }
936 
grpc_chttp2_initiate_write(grpc_chttp2_transport * t,grpc_chttp2_initiate_write_reason reason)937 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
938                                 grpc_chttp2_initiate_write_reason reason) {
939   switch (t->write_state) {
940     case GRPC_CHTTP2_WRITE_STATE_IDLE:
941       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
942                       grpc_chttp2_initiate_write_reason_string(reason));
943       // Note that the 'write_action_begin_locked' closure is being scheduled
944       // on the 'finally_scheduler' of t->combiner. This means that
945       // 'write_action_begin_locked' is called only *after* all the other
946       // closures (some of which are potentially initiating more writes on the
947       // transport) are executed on the t->combiner.
948       //
949       // The reason for scheduling on finally_scheduler is to make sure we batch
950       // as many writes as possible. 'write_action_begin_locked' is the function
951       // that gathers all the relevant bytes (which are at various places in the
952       // grpc_chttp2_transport structure) and append them to 'outbuf' field in
953       // grpc_chttp2_transport thereby batching what would have been potentially
954       // multiple write operations.
955       //
956       // Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
957       // It does not call the endpoint to write the bytes. That is done by the
958       // 'write_action' (which is scheduled by 'write_action_begin_locked')
959       t->combiner->FinallyRun(
960           grpc_core::InitTransportClosure<write_action_begin_locked>(
961               t->Ref(), &t->write_action_begin_locked),
962           absl::OkStatus());
963       break;
964     case GRPC_CHTTP2_WRITE_STATE_WRITING:
965       set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
966                       grpc_chttp2_initiate_write_reason_string(reason));
967       break;
968     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
969       break;
970   }
971 }
972 
grpc_chttp2_mark_stream_writable(grpc_chttp2_transport * t,grpc_chttp2_stream * s)973 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
974                                       grpc_chttp2_stream* s) {
975   if (t->closed_with_error.ok() && grpc_chttp2_list_add_writable_stream(t, s)) {
976     GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
977   }
978 }
979 
begin_writing_desc(bool partial)980 static const char* begin_writing_desc(bool partial) {
981   if (partial) {
982     return "begin partial write in background";
983   } else {
984     return "begin write in current thread";
985   }
986 }
987 
write_action_begin_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle)988 static void write_action_begin_locked(
989     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
990     grpc_error_handle /*error_ignored*/) {
991   GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
992   grpc_chttp2_begin_write_result r;
993   if (!t->closed_with_error.ok()) {
994     r.writing = false;
995   } else {
996     r = grpc_chttp2_begin_write(t.get());
997   }
998   if (r.writing) {
999     set_write_state(t.get(),
1000                     r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
1001                               : GRPC_CHTTP2_WRITE_STATE_WRITING,
1002                     begin_writing_desc(r.partial));
1003     write_action(t.get());
1004     if (t->reading_paused_on_pending_induced_frames) {
1005       GPR_ASSERT(t->num_pending_induced_frames == 0);
1006       // We had paused reading, because we had many induced frames (SETTINGS
1007       // ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have
1008       // been able to flush qbuf, we can resume reading.
1009       GRPC_CHTTP2_IF_TRACING(gpr_log(
1010           GPR_INFO,
1011           "transport %p : Resuming reading after being paused due to too "
1012           "many unwritten SETTINGS ACK, PINGS ACK and RST_STREAM frames",
1013           t.get()));
1014       t->reading_paused_on_pending_induced_frames = false;
1015       continue_read_action_locked(std::move(t));
1016     }
1017   } else {
1018     set_write_state(t.get(), GRPC_CHTTP2_WRITE_STATE_IDLE,
1019                     "begin writing nothing");
1020   }
1021 }
1022 
write_action(grpc_chttp2_transport * t)1023 static void write_action(grpc_chttp2_transport* t) {
1024   void* cl = t->cl;
1025   if (!t->cl->empty()) {
1026     // Transfer the ownership of the context list to the endpoint and create and
1027     // associate a new context list with the transport.
1028     // The old context list is stored in the cl local variable which is passed
1029     // to the endpoint. Its upto the endpoint to manage its lifetime.
1030     t->cl = new grpc_core::ContextList();
1031   } else {
1032     // t->cl is Empty. There is nothing to trace in this endpoint_write. set cl
1033     // to nullptr.
1034     cl = nullptr;
1035   }
1036   // Choose max_frame_size as the prefered rx crypto frame size indicated by the
1037   // peer.
1038   int max_frame_size =
1039       t->settings.peer().preferred_receive_crypto_message_size();
1040   // Note: max frame size is 0 if the remote peer does not support adjusting the
1041   // sending frame size.
1042   if (max_frame_size == 0) {
1043     max_frame_size = INT_MAX;
1044   }
1045   if (GRPC_TRACE_FLAG_ENABLED(grpc_ping_trace)) {
1046     gpr_log(GPR_INFO, "%s[%p]: Write %" PRIdPTR " bytes",
1047             t->is_client ? "CLIENT" : "SERVER", t, t->outbuf.Length());
1048   }
1049   t->write_size_policy.BeginWrite(t->outbuf.Length());
1050   grpc_endpoint_write(t->ep, t->outbuf.c_slice_buffer(),
1051                       grpc_core::InitTransportClosure<write_action_end>(
1052                           t->Ref(), &t->write_action_end_locked),
1053                       cl, max_frame_size);
1054 }
1055 
write_action_end(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)1056 static void write_action_end(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
1057                              grpc_error_handle error) {
1058   auto* tp = t.get();
1059   if (GRPC_TRACE_FLAG_ENABLED(grpc_ping_trace)) {
1060     gpr_log(GPR_INFO, "%s[%p]: Finish write",
1061             t->is_client ? "CLIENT" : "SERVER", t.get());
1062   }
1063   tp->combiner->Run(grpc_core::InitTransportClosure<write_action_end_locked>(
1064                         std::move(t), &tp->write_action_end_locked),
1065                     error);
1066 }
1067 
1068 // Callback from the grpc_endpoint after bytes have been written by calling
1069 // sendmsg
write_action_end_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)1070 static void write_action_end_locked(
1071     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
1072     grpc_error_handle error) {
1073   t->write_size_policy.EndWrite(error.ok());
1074 
1075   bool closed = false;
1076   if (!error.ok()) {
1077     close_transport_locked(t.get(), error);
1078     closed = true;
1079   }
1080 
1081   if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED) {
1082     t->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SENT;
1083     closed = true;
1084     if (t->stream_map.empty()) {
1085       close_transport_locked(t.get(), GRPC_ERROR_CREATE("goaway sent"));
1086     }
1087   }
1088 
1089   switch (t->write_state) {
1090     case GRPC_CHTTP2_WRITE_STATE_IDLE:
1091       GPR_UNREACHABLE_CODE(break);
1092     case GRPC_CHTTP2_WRITE_STATE_WRITING:
1093       set_write_state(t.get(), GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
1094       break;
1095     case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
1096       set_write_state(t.get(), GRPC_CHTTP2_WRITE_STATE_WRITING,
1097                       "continue writing");
1098       // If the transport is closed, we will retry writing on the endpoint
1099       // and next write may contain part of the currently serialized frames.
1100       // So, we should only call the run_after_write callbacks when the next
1101       // write finishes, or the callbacks will be invoked when the stream is
1102       // closed.
1103       if (!closed) {
1104         grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
1105       }
1106       t->combiner->FinallyRun(
1107           grpc_core::InitTransportClosure<write_action_begin_locked>(
1108               t, &t->write_action_begin_locked),
1109           absl::OkStatus());
1110       break;
1111   }
1112 
1113   grpc_chttp2_end_write(t.get(), error);
1114 }
1115 
1116 // Cancel out streams that haven't yet started if we have received a GOAWAY
cancel_unstarted_streams(grpc_chttp2_transport * t,grpc_error_handle error,bool tarpit)1117 static void cancel_unstarted_streams(grpc_chttp2_transport* t,
1118                                      grpc_error_handle error, bool tarpit) {
1119   grpc_chttp2_stream* s;
1120   while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1121     s->trailing_metadata_buffer.Set(
1122         grpc_core::GrpcStreamNetworkState(),
1123         grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1124     grpc_chttp2_cancel_stream(t, s, error, tarpit);
1125   }
1126 }
1127 
grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport * t,uint32_t goaway_error,uint32_t last_stream_id,absl::string_view goaway_text)1128 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
1129                                      uint32_t goaway_error,
1130                                      uint32_t last_stream_id,
1131                                      absl::string_view goaway_text) {
1132   t->goaway_error = grpc_error_set_str(
1133       grpc_error_set_int(
1134           grpc_error_set_int(
1135               grpc_core::StatusCreate(
1136                   absl::StatusCode::kUnavailable,
1137                   absl::StrFormat(
1138                       "GOAWAY received; Error code: %u; Debug Text: %s",
1139                       goaway_error, goaway_text),
1140                   DEBUG_LOCATION, {}),
1141               grpc_core::StatusIntProperty::kHttp2Error,
1142               static_cast<intptr_t>(goaway_error)),
1143           grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE),
1144       grpc_core::StatusStrProperty::kRawBytes, goaway_text);
1145 
1146   GRPC_CHTTP2_IF_TRACING(
1147       gpr_log(GPR_INFO, "transport %p got goaway with last stream id %d", t,
1148               last_stream_id));
1149   // We want to log this irrespective of whether http tracing is enabled if we
1150   // received a GOAWAY with a non NO_ERROR code.
1151   if (goaway_error != GRPC_HTTP2_NO_ERROR) {
1152     gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s",
1153             std::string(t->peer_string.as_string_view()).c_str(), goaway_error,
1154             grpc_core::StatusToString(t->goaway_error).c_str());
1155   }
1156   if (t->is_client) {
1157     cancel_unstarted_streams(t, t->goaway_error, false);
1158     // Cancel all unseen streams
1159     std::vector<grpc_chttp2_stream*> to_cancel;
1160     for (auto id_stream : t->stream_map) {
1161       if (id_stream.first > last_stream_id) {
1162         to_cancel.push_back(id_stream.second);
1163       }
1164     }
1165     for (auto s : to_cancel) {
1166       s->trailing_metadata_buffer.Set(
1167           grpc_core::GrpcStreamNetworkState(),
1168           grpc_core::GrpcStreamNetworkState::kNotSeenByServer);
1169       grpc_chttp2_cancel_stream(s->t.get(), s, s->t->goaway_error, false);
1170     }
1171   }
1172   absl::Status status = grpc_error_to_absl_status(t->goaway_error);
1173   // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
1174   // data equal to "too_many_pings", it should log the occurrence at a log level
1175   // that is enabled by default and double the configured KEEPALIVE_TIME used
1176   // for new connections on that channel.
1177   if (GPR_UNLIKELY(t->is_client &&
1178                    goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
1179                    goaway_text == "too_many_pings")) {
1180     gpr_log(GPR_ERROR,
1181             "%s: Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug "
1182             "data equal to \"too_many_pings\". Current keepalive time (before "
1183             "throttling): %s",
1184             std::string(t->peer_string.as_string_view()).c_str(),
1185             t->keepalive_time.ToString().c_str());
1186     constexpr int max_keepalive_time_millis =
1187         INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER;
1188     int64_t throttled_keepalive_time =
1189         t->keepalive_time.millis() > max_keepalive_time_millis
1190             ? INT_MAX
1191             : t->keepalive_time.millis() * KEEPALIVE_TIME_BACKOFF_MULTIPLIER;
1192     status.SetPayload(grpc_core::kKeepaliveThrottlingKey,
1193                       absl::Cord(std::to_string(throttled_keepalive_time)));
1194   }
1195   // lie: use transient failure from the transport to indicate goaway has been
1196   // received.
1197   if (!grpc_core::test_only_disable_transient_failure_state_notification) {
1198     connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1199                            "got_goaway");
1200   }
1201 }
1202 
maybe_start_some_streams(grpc_chttp2_transport * t)1203 static void maybe_start_some_streams(grpc_chttp2_transport* t) {
1204   grpc_chttp2_stream* s;
1205   // maybe cancel out streams that haven't yet started if we have received a
1206   // GOAWAY
1207   if (!t->goaway_error.ok()) {
1208     cancel_unstarted_streams(t, t->goaway_error, false);
1209     return;
1210   }
1211   // start streams where we have free grpc_chttp2_stream ids and free
1212   // * concurrency
1213   while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
1214          t->stream_map.size() < t->settings.peer().max_concurrent_streams() &&
1215          grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1216     // safe since we can't (legally) be parsing this stream yet
1217     GRPC_CHTTP2_IF_TRACING(gpr_log(
1218         GPR_INFO,
1219         "HTTP:%s: Transport %p allocating new grpc_chttp2_stream %p to id %d",
1220         t->is_client ? "CLI" : "SVR", t, s, t->next_stream_id));
1221 
1222     GPR_ASSERT(s->id == 0);
1223     s->id = t->next_stream_id;
1224     t->next_stream_id += 2;
1225 
1226     if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1227       connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE,
1228                              absl::Status(absl::StatusCode::kUnavailable,
1229                                           "Transport Stream IDs exhausted"),
1230                              "no_more_stream_ids");
1231     }
1232 
1233     t->stream_map.emplace(s->id, s);
1234     post_destructive_reclaimer(t);
1235     grpc_chttp2_mark_stream_writable(t, s);
1236     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
1237   }
1238   // cancel out streams that will never be started
1239   if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1240     while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1241       s->trailing_metadata_buffer.Set(
1242           grpc_core::GrpcStreamNetworkState(),
1243           grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1244       grpc_chttp2_cancel_stream(
1245           t, s,
1246           grpc_error_set_int(GRPC_ERROR_CREATE("Stream IDs exhausted"),
1247                              grpc_core::StatusIntProperty::kRpcStatus,
1248                              GRPC_STATUS_UNAVAILABLE),
1249           false);
1250     }
1251   }
1252 }
1253 
add_closure_barrier(grpc_closure * closure)1254 static grpc_closure* add_closure_barrier(grpc_closure* closure) {
1255   closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT;
1256   return closure;
1257 }
1258 
null_then_sched_closure(grpc_closure ** closure)1259 static void null_then_sched_closure(grpc_closure** closure) {
1260   grpc_closure* c = *closure;
1261   *closure = nullptr;
1262   // null_then_schedule_closure might be run during a start_batch which might
1263   // subsequently examine the batch for more operations contained within.
1264   // However, the closure run might make it back to the call object, push a
1265   // completion, have the application see it, and make a new operation on the
1266   // call which recycles the batch BEFORE the call to start_batch completes,
1267   // forcing a race.
1268   grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, absl::OkStatus());
1269 }
1270 
grpc_chttp2_complete_closure_step(grpc_chttp2_transport * t,grpc_closure ** pclosure,grpc_error_handle error,const char * desc,grpc_core::DebugLocation whence)1271 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
1272                                        grpc_closure** pclosure,
1273                                        grpc_error_handle error,
1274                                        const char* desc,
1275                                        grpc_core::DebugLocation whence) {
1276   grpc_closure* closure = *pclosure;
1277   *pclosure = nullptr;
1278   if (closure == nullptr) {
1279     return;
1280   }
1281   closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
1282   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1283     gpr_log(
1284         GPR_INFO,
1285         "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
1286         "write_state=%s whence=%s:%d",
1287         t, closure,
1288         static_cast<int>(closure->next_data.scratch /
1289                          CLOSURE_BARRIER_FIRST_REF_BIT),
1290         static_cast<int>(closure->next_data.scratch %
1291                          CLOSURE_BARRIER_FIRST_REF_BIT),
1292         desc, grpc_core::StatusToString(error).c_str(),
1293         write_state_name(t->write_state), whence.file(), whence.line());
1294   }
1295 
1296   if (!error.ok()) {
1297     grpc_error_handle cl_err =
1298         grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
1299     if (cl_err.ok()) {
1300       cl_err = GRPC_ERROR_CREATE(absl::StrCat(
1301           "Error in HTTP transport completing operation: ", desc,
1302           " write_state=", write_state_name(t->write_state), " refs=",
1303           closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT, " flags=",
1304           closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT));
1305       cl_err = grpc_error_set_str(cl_err,
1306                                   grpc_core::StatusStrProperty::kTargetAddress,
1307                                   std::string(t->peer_string.as_string_view()));
1308     }
1309     cl_err = grpc_error_add_child(cl_err, error);
1310     closure->error_data.error = grpc_core::internal::StatusAllocHeapPtr(cl_err);
1311   }
1312   if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
1313     if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
1314         !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
1315       // Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
1316       // closures earlier than when it is safe to do so.
1317       grpc_error_handle run_error =
1318           grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
1319       closure->error_data.error = 0;
1320       grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, run_error);
1321     } else {
1322       grpc_closure_list_append(&t->run_after_write, closure);
1323     }
1324   }
1325 }
1326 
contains_non_ok_status(grpc_metadata_batch * batch)1327 static bool contains_non_ok_status(grpc_metadata_batch* batch) {
1328   return batch->get(grpc_core::GrpcStatusMetadata()).value_or(GRPC_STATUS_OK) !=
1329          GRPC_STATUS_OK;
1330 }
1331 
log_metadata(const grpc_metadata_batch * md_batch,uint32_t id,bool is_client,bool is_initial)1332 static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
1333                          bool is_client, bool is_initial) {
1334   gpr_log(GPR_INFO, "--metadata--");
1335   const std::string prefix = absl::StrCat(
1336       "HTTP:", id, is_initial ? ":HDR" : ":TRL", is_client ? ":CLI:" : ":SVR:");
1337   md_batch->Log([&prefix](absl::string_view key, absl::string_view value) {
1338     gpr_log(GPR_INFO, "%s", absl::StrCat(prefix, key, ": ", value).c_str());
1339   });
1340 }
1341 
perform_stream_op_locked(void * stream_op,grpc_error_handle)1342 static void perform_stream_op_locked(void* stream_op,
1343                                      grpc_error_handle /*error_ignored*/) {
1344   grpc_transport_stream_op_batch* op =
1345       static_cast<grpc_transport_stream_op_batch*>(stream_op);
1346   grpc_chttp2_stream* s =
1347       static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg);
1348   grpc_transport_stream_op_batch_payload* op_payload = op->payload;
1349   grpc_chttp2_transport* t = s->t.get();
1350 
1351   s->context = op->payload->context;
1352   s->traced = op->is_traced;
1353   s->call_tracer = CallTracerIfSampled(s);
1354   s->tcp_tracer = TcpTracerIfSampled(s);
1355   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1356     gpr_log(GPR_INFO,
1357             "perform_stream_op_locked[s=%p; op=%p]: %s; on_complete = %p", s,
1358             op, grpc_transport_stream_op_batch_string(op, false).c_str(),
1359             op->on_complete);
1360     if (op->send_initial_metadata) {
1361       log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
1362                    s->id, t->is_client, true);
1363     }
1364     if (op->send_trailing_metadata) {
1365       log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
1366                    s->id, t->is_client, false);
1367     }
1368   }
1369 
1370   grpc_closure* on_complete = op->on_complete;
1371   // on_complete will be null if and only if there are no send ops in the batch.
1372   if (on_complete != nullptr) {
1373     // This batch has send ops. Use final_data as a barrier until enqueue time;
1374     // the initial counter is dropped at the end of this function.
1375     on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
1376     on_complete->error_data.error = 0;
1377   }
1378 
1379   if (op->cancel_stream) {
1380     grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error,
1381                               op_payload->cancel_stream.tarpit);
1382   }
1383 
1384   if (op->send_initial_metadata) {
1385     if (s->call_tracer) {
1386       s->call_tracer->RecordAnnotation(
1387           grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart,
1388                                     gpr_now(GPR_CLOCK_REALTIME))
1389               .Add(s->t->flow_control.stats())
1390               .Add(s->flow_control.stats()));
1391     }
1392     if (t->is_client && t->channelz_socket != nullptr) {
1393       t->channelz_socket->RecordStreamStartedFromLocal();
1394     }
1395     GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
1396     on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
1397 
1398     s->send_initial_metadata_finished = add_closure_barrier(on_complete);
1399     s->send_initial_metadata =
1400         op_payload->send_initial_metadata.send_initial_metadata;
1401     if (t->is_client) {
1402       s->deadline = std::min(
1403           s->deadline,
1404           s->send_initial_metadata->get(grpc_core::GrpcTimeoutMetadata())
1405               .value_or(grpc_core::Timestamp::InfFuture()));
1406     }
1407     if (contains_non_ok_status(s->send_initial_metadata)) {
1408       s->seen_error = true;
1409     }
1410     if (!s->write_closed) {
1411       if (t->is_client) {
1412         if (t->closed_with_error.ok()) {
1413           GPR_ASSERT(s->id == 0);
1414           grpc_chttp2_list_add_waiting_for_concurrency(t, s);
1415           maybe_start_some_streams(t);
1416         } else {
1417           s->trailing_metadata_buffer.Set(
1418               grpc_core::GrpcStreamNetworkState(),
1419               grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1420           grpc_chttp2_cancel_stream(
1421               t, s,
1422               grpc_error_set_int(
1423                   GRPC_ERROR_CREATE_REFERENCING("Transport closed",
1424                                                 &t->closed_with_error, 1),
1425                   grpc_core::StatusIntProperty::kRpcStatus,
1426                   GRPC_STATUS_UNAVAILABLE),
1427               false);
1428         }
1429       } else {
1430         GPR_ASSERT(s->id != 0);
1431         grpc_chttp2_mark_stream_writable(t, s);
1432         if (!(op->send_message &&
1433               (op->payload->send_message.flags & GRPC_WRITE_BUFFER_HINT))) {
1434           grpc_chttp2_initiate_write(
1435               t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
1436         }
1437       }
1438     } else {
1439       s->send_initial_metadata = nullptr;
1440       grpc_chttp2_complete_closure_step(
1441           t, &s->send_initial_metadata_finished,
1442           GRPC_ERROR_CREATE_REFERENCING(
1443               "Attempt to send initial metadata after stream was closed",
1444               &s->write_closed_error, 1),
1445           "send_initial_metadata_finished");
1446     }
1447   }
1448 
1449   if (op->send_message) {
1450     t->num_messages_in_next_write++;
1451     grpc_core::global_stats().IncrementHttp2SendMessageSize(
1452         op->payload->send_message.send_message->Length());
1453     on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
1454     s->send_message_finished = add_closure_barrier(op->on_complete);
1455     const uint32_t flags = op_payload->send_message.flags;
1456     if (s->write_closed) {
1457       op->payload->send_message.stream_write_closed = true;
1458       // We should NOT return an error here, so as to avoid a cancel OP being
1459       // started. The surface layer will notice that the stream has been closed
1460       // for writes and fail the send message op.
1461       grpc_chttp2_complete_closure_step(t, &s->send_message_finished,
1462                                         absl::OkStatus(),
1463                                         "fetching_send_message_finished");
1464     } else {
1465       uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
1466           &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
1467       frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
1468       size_t len = op_payload->send_message.send_message->Length();
1469       frame_hdr[1] = static_cast<uint8_t>(len >> 24);
1470       frame_hdr[2] = static_cast<uint8_t>(len >> 16);
1471       frame_hdr[3] = static_cast<uint8_t>(len >> 8);
1472       frame_hdr[4] = static_cast<uint8_t>(len);
1473 
1474       if (grpc_core::IsHttp2StatsFixEnabled()) {
1475         s->stats.outgoing.framing_bytes += GRPC_HEADER_SIZE_IN_BYTES;
1476         s->stats.outgoing.data_bytes +=
1477             op_payload->send_message.send_message->Length();
1478       }
1479       s->next_message_end_offset =
1480           s->flow_controlled_bytes_written +
1481           static_cast<int64_t>(s->flow_controlled_buffer.length) +
1482           static_cast<int64_t>(len);
1483       if (flags & GRPC_WRITE_BUFFER_HINT) {
1484         s->next_message_end_offset -= t->write_buffer_size;
1485         s->write_buffering = true;
1486       } else {
1487         s->write_buffering = false;
1488       }
1489 
1490       grpc_slice* const slices =
1491           op_payload->send_message.send_message->c_slice_buffer()->slices;
1492       grpc_slice* const end =
1493           slices + op_payload->send_message.send_message->Count();
1494       for (grpc_slice* slice = slices; slice != end; slice++) {
1495         grpc_slice_buffer_add(&s->flow_controlled_buffer,
1496                               grpc_core::CSliceRef(*slice));
1497       }
1498 
1499       int64_t notify_offset = s->next_message_end_offset;
1500       if (notify_offset <= s->flow_controlled_bytes_written) {
1501         grpc_chttp2_complete_closure_step(t, &s->send_message_finished,
1502                                           absl::OkStatus(),
1503                                           "fetching_send_message_finished");
1504       } else {
1505         grpc_chttp2_write_cb* cb = t->write_cb_pool;
1506         if (cb == nullptr) {
1507           cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb)));
1508         } else {
1509           t->write_cb_pool = cb->next;
1510         }
1511         cb->call_at_byte = notify_offset;
1512         cb->closure = s->send_message_finished;
1513         s->send_message_finished = nullptr;
1514         grpc_chttp2_write_cb** list = flags & GRPC_WRITE_THROUGH
1515                                           ? &s->on_write_finished_cbs
1516                                           : &s->on_flow_controlled_cbs;
1517         cb->next = *list;
1518         *list = cb;
1519       }
1520 
1521       if (s->id != 0 &&
1522           (!s->write_buffering ||
1523            s->flow_controlled_buffer.length > t->write_buffer_size)) {
1524         grpc_chttp2_mark_stream_writable(t, s);
1525         grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
1526       }
1527     }
1528   }
1529 
1530   if (op->send_trailing_metadata) {
1531     GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
1532     on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
1533     s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
1534     s->send_trailing_metadata =
1535         op_payload->send_trailing_metadata.send_trailing_metadata;
1536     s->sent_trailing_metadata_op = op_payload->send_trailing_metadata.sent;
1537     s->write_buffering = false;
1538     if (contains_non_ok_status(s->send_trailing_metadata)) {
1539       s->seen_error = true;
1540     }
1541     if (s->write_closed) {
1542       s->send_trailing_metadata = nullptr;
1543       s->sent_trailing_metadata_op = nullptr;
1544       grpc_chttp2_complete_closure_step(
1545           t, &s->send_trailing_metadata_finished,
1546           op->payload->send_trailing_metadata.send_trailing_metadata->empty()
1547               ? absl::OkStatus()
1548               : GRPC_ERROR_CREATE("Attempt to send trailing metadata after "
1549                                   "stream was closed"),
1550           "send_trailing_metadata_finished");
1551     } else if (s->id != 0) {
1552       // TODO(ctiller): check if there's flow control for any outstanding
1553       //   bytes before going writable
1554       grpc_chttp2_mark_stream_writable(t, s);
1555       grpc_chttp2_initiate_write(
1556           t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
1557     }
1558   }
1559 
1560   if (op->recv_initial_metadata) {
1561     GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
1562     s->recv_initial_metadata_ready =
1563         op_payload->recv_initial_metadata.recv_initial_metadata_ready;
1564     s->recv_initial_metadata =
1565         op_payload->recv_initial_metadata.recv_initial_metadata;
1566     s->trailing_metadata_available =
1567         op_payload->recv_initial_metadata.trailing_metadata_available;
1568     if (s->parsed_trailers_only && s->trailing_metadata_available != nullptr) {
1569       *s->trailing_metadata_available = true;
1570     }
1571     grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
1572   }
1573 
1574   if (op->recv_message) {
1575     GPR_ASSERT(s->recv_message_ready == nullptr);
1576     s->recv_message_ready = op_payload->recv_message.recv_message_ready;
1577     s->recv_message = op_payload->recv_message.recv_message;
1578     s->recv_message->emplace();
1579     s->recv_message_flags = op_payload->recv_message.flags;
1580     s->call_failed_before_recv_message =
1581         op_payload->recv_message.call_failed_before_recv_message;
1582     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1583   }
1584 
1585   if (op->recv_trailing_metadata) {
1586     GPR_ASSERT(s->collecting_stats == nullptr);
1587     s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
1588     GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
1589     s->recv_trailing_metadata_finished =
1590         op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1591     s->recv_trailing_metadata =
1592         op_payload->recv_trailing_metadata.recv_trailing_metadata;
1593     s->final_metadata_requested = true;
1594     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1595   }
1596 
1597   if (on_complete != nullptr) {
1598     grpc_chttp2_complete_closure_step(t, &on_complete, absl::OkStatus(),
1599                                       "op->on_complete");
1600   }
1601 
1602   GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
1603 }
1604 
PerformStreamOp(grpc_stream * gs,grpc_transport_stream_op_batch * op)1605 void grpc_chttp2_transport::PerformStreamOp(
1606     grpc_stream* gs, grpc_transport_stream_op_batch* op) {
1607   grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
1608 
1609   if (!is_client) {
1610     if (op->send_initial_metadata) {
1611       GPR_ASSERT(!op->payload->send_initial_metadata.send_initial_metadata
1612                       ->get(grpc_core::GrpcTimeoutMetadata())
1613                       .has_value());
1614     }
1615     if (op->send_trailing_metadata) {
1616       GPR_ASSERT(!op->payload->send_trailing_metadata.send_trailing_metadata
1617                       ->get(grpc_core::GrpcTimeoutMetadata())
1618                       .has_value());
1619     }
1620   }
1621 
1622   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1623     gpr_log(GPR_INFO, "perform_stream_op[s=%p; op=%p]: %s", s, op,
1624             grpc_transport_stream_op_batch_string(op, false).c_str());
1625   }
1626 
1627   GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
1628   op->handler_private.extra_arg = gs;
1629   combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1630                                   perform_stream_op_locked, op, nullptr),
1631                 absl::OkStatus());
1632 }
1633 
cancel_pings(grpc_chttp2_transport * t,grpc_error_handle error)1634 static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error) {
1635   GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "%p CANCEL PINGS: %s", t,
1636                                  grpc_core::StatusToString(error).c_str()));
1637   // callback remaining pings: they're not allowed to call into the transport,
1638   //   and maybe they hold resources that need to be freed
1639   t->ping_callbacks.CancelAll(t->event_engine.get());
1640 }
1641 
1642 namespace {
1643 class PingClosureWrapper {
1644  public:
PingClosureWrapper(grpc_closure * closure)1645   explicit PingClosureWrapper(grpc_closure* closure) : closure_(closure) {}
1646   PingClosureWrapper(const PingClosureWrapper&) = delete;
1647   PingClosureWrapper& operator=(const PingClosureWrapper&) = delete;
PingClosureWrapper(PingClosureWrapper && other)1648   PingClosureWrapper(PingClosureWrapper&& other) noexcept
1649       : closure_(other.Take()) {}
operator =(PingClosureWrapper && other)1650   PingClosureWrapper& operator=(PingClosureWrapper&& other) noexcept {
1651     std::swap(closure_, other.closure_);
1652     return *this;
1653   }
~PingClosureWrapper()1654   ~PingClosureWrapper() {
1655     if (closure_ != nullptr) {
1656       grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure_, absl::CancelledError());
1657     }
1658   }
1659 
operator ()()1660   void operator()() {
1661     grpc_core::ExecCtx::Run(DEBUG_LOCATION, Take(), absl::OkStatus());
1662   }
1663 
1664  private:
Take()1665   grpc_closure* Take() { return std::exchange(closure_, nullptr); }
1666 
1667   grpc_closure* closure_ = nullptr;
1668 };
1669 }  // namespace
1670 
send_ping_locked(grpc_chttp2_transport * t,grpc_closure * on_initiate,grpc_closure * on_ack)1671 static void send_ping_locked(grpc_chttp2_transport* t,
1672                              grpc_closure* on_initiate, grpc_closure* on_ack) {
1673   if (!t->closed_with_error.ok()) {
1674     grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_initiate, t->closed_with_error);
1675     grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_ack, t->closed_with_error);
1676     return;
1677   }
1678   t->ping_callbacks.OnPing(PingClosureWrapper(on_initiate),
1679                            PingClosureWrapper(on_ack));
1680 }
1681 
1682 // Specialized form of send_ping_locked for keepalive ping. If there is already
1683 // a ping in progress, the keepalive ping would piggyback onto that ping,
1684 // instead of waiting for that ping to complete and then starting a new ping.
send_keepalive_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1685 static void send_keepalive_ping_locked(
1686     grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1687   if (!t->closed_with_error.ok()) {
1688     t->combiner->Run(
1689         grpc_core::InitTransportClosure<finish_keepalive_ping_locked>(
1690             t->Ref(), &t->finish_keepalive_ping_locked),
1691         t->closed_with_error);
1692     return;
1693   }
1694   t->ping_callbacks.OnPingAck(
1695       PingClosureWrapper(grpc_core::InitTransportClosure<finish_keepalive_ping>(
1696           t->Ref(), &t->finish_keepalive_ping_locked)));
1697 }
1698 
grpc_chttp2_retry_initiate_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1699 void grpc_chttp2_retry_initiate_ping(
1700     grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1701   auto tp = t.get();
1702   tp->combiner->Run(grpc_core::InitTransportClosure<retry_initiate_ping_locked>(
1703                         std::move(t), &tp->retry_initiate_ping_locked),
1704                     absl::OkStatus());
1705 }
1706 
retry_initiate_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,GRPC_UNUSED grpc_error_handle error)1707 static void retry_initiate_ping_locked(
1708     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
1709     GRPC_UNUSED grpc_error_handle error) {
1710   GPR_DEBUG_ASSERT(error.ok());
1711   GPR_ASSERT(t->delayed_ping_timer_handle != TaskHandle::kInvalid);
1712   t->delayed_ping_timer_handle = TaskHandle::kInvalid;
1713   grpc_chttp2_initiate_write(t.get(),
1714                              GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
1715 }
1716 
grpc_chttp2_ack_ping(grpc_chttp2_transport * t,uint64_t id)1717 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
1718   if (!t->ping_callbacks.AckPing(id, t->event_engine.get())) {
1719     gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64,
1720             std::string(t->peer_string.as_string_view()).c_str(), id);
1721     return;
1722   }
1723   if (t->ping_callbacks.ping_requested()) {
1724     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
1725   }
1726 }
1727 
grpc_chttp2_keepalive_timeout(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1728 void grpc_chttp2_keepalive_timeout(
1729     grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1730   t->combiner->Run(
1731       grpc_core::NewClosure([t](grpc_error_handle) {
1732         gpr_log(GPR_INFO, "%s: Keepalive timeout. Closing transport.",
1733                 std::string(t->peer_string.as_string_view()).c_str());
1734         send_goaway(
1735             t.get(),
1736             grpc_error_set_int(GRPC_ERROR_CREATE("keepalive_timeout"),
1737                                grpc_core::StatusIntProperty::kHttp2Error,
1738                                GRPC_HTTP2_ENHANCE_YOUR_CALM),
1739             /*immediate_disconnect_hint=*/true);
1740         close_transport_locked(
1741             t.get(),
1742             grpc_error_set_int(GRPC_ERROR_CREATE("keepalive timeout"),
1743                                grpc_core::StatusIntProperty::kRpcStatus,
1744                                GRPC_STATUS_UNAVAILABLE));
1745       }),
1746       absl::OkStatus());
1747 }
1748 
grpc_chttp2_ping_timeout(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1749 void grpc_chttp2_ping_timeout(
1750     grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1751   t->combiner->Run(
1752       grpc_core::NewClosure([t](grpc_error_handle) {
1753         gpr_log(GPR_INFO, "%s: Ping timeout. Closing transport.",
1754                 std::string(t->peer_string.as_string_view()).c_str());
1755         send_goaway(
1756             t.get(),
1757             grpc_error_set_int(GRPC_ERROR_CREATE("ping_timeout"),
1758                                grpc_core::StatusIntProperty::kHttp2Error,
1759                                GRPC_HTTP2_ENHANCE_YOUR_CALM),
1760             /*immediate_disconnect_hint=*/true);
1761         close_transport_locked(
1762             t.get(),
1763             grpc_error_set_int(GRPC_ERROR_CREATE("ping timeout"),
1764                                grpc_core::StatusIntProperty::kRpcStatus,
1765                                GRPC_STATUS_UNAVAILABLE));
1766       }),
1767       absl::OkStatus());
1768 }
1769 
grpc_chttp2_settings_timeout(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1770 void grpc_chttp2_settings_timeout(
1771     grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1772   t->combiner->Run(
1773       grpc_core::NewClosure([t](grpc_error_handle) {
1774         gpr_log(GPR_INFO, "%s: Settings timeout. Closing transport.",
1775                 std::string(t->peer_string.as_string_view()).c_str());
1776         send_goaway(
1777             t.get(),
1778             grpc_error_set_int(GRPC_ERROR_CREATE("settings_timeout"),
1779                                grpc_core::StatusIntProperty::kHttp2Error,
1780                                GRPC_HTTP2_SETTINGS_TIMEOUT),
1781             /*immediate_disconnect_hint=*/true);
1782         close_transport_locked(
1783             t.get(),
1784             grpc_error_set_int(GRPC_ERROR_CREATE("settings timeout"),
1785                                grpc_core::StatusIntProperty::kRpcStatus,
1786                                GRPC_STATUS_UNAVAILABLE));
1787       }),
1788       absl::OkStatus());
1789 }
1790 
1791 namespace {
1792 
1793 // Fire and forget (deletes itself on completion). Does a graceful shutdown by
1794 // sending a GOAWAY frame with the last stream id set to 2^31-1, sending a ping
1795 // and waiting for an ack (effective waiting for an RTT) and then sending a
1796 // final GOAWAY frame with an updated last stream identifier. This helps ensure
1797 // that a connection can be cleanly shut down without losing requests.
1798 // In the event, that the client does not respond to the ping for some reason,
1799 // we add a 20 second deadline, after which we send the second goaway.
1800 class GracefulGoaway : public grpc_core::RefCounted<GracefulGoaway> {
1801  public:
Start(grpc_chttp2_transport * t)1802   static void Start(grpc_chttp2_transport* t) { new GracefulGoaway(t); }
1803 
1804  private:
1805   using TaskHandle = ::grpc_event_engine::experimental::EventEngine::TaskHandle;
1806 
GracefulGoaway(grpc_chttp2_transport * t)1807   explicit GracefulGoaway(grpc_chttp2_transport* t) : t_(t->Ref()) {
1808     t->sent_goaway_state = GRPC_CHTTP2_GRACEFUL_GOAWAY;
1809     grpc_chttp2_goaway_append((1u << 31) - 1, 0, grpc_empty_slice(), &t->qbuf);
1810     t->keepalive_timeout =
1811         std::min(t->keepalive_timeout, grpc_core::Duration::Seconds(20));
1812     t->ping_timeout =
1813         std::min(t->ping_timeout, grpc_core::Duration::Seconds(20));
1814     send_ping_locked(
1815         t, nullptr, GRPC_CLOSURE_INIT(&on_ping_ack_, OnPingAck, this, nullptr));
1816     grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1817   }
1818 
MaybeSendFinalGoawayLocked()1819   void MaybeSendFinalGoawayLocked() {
1820     if (t_->sent_goaway_state != GRPC_CHTTP2_GRACEFUL_GOAWAY) {
1821       // We already sent the final GOAWAY.
1822       return;
1823     }
1824     if (t_->destroying || !t_->closed_with_error.ok()) {
1825       GRPC_CHTTP2_IF_TRACING(
1826           gpr_log(GPR_INFO,
1827                   "transport:%p %s peer:%s Transport already shutting down. "
1828                   "Graceful GOAWAY abandoned.",
1829                   t_.get(), t_->is_client ? "CLIENT" : "SERVER",
1830                   std::string(t_->peer_string.as_string_view()).c_str()));
1831       return;
1832     }
1833     // Ping completed. Send final goaway.
1834     GRPC_CHTTP2_IF_TRACING(
1835         gpr_log(GPR_INFO,
1836                 "transport:%p %s peer:%s Graceful shutdown: Ping received. "
1837                 "Sending final GOAWAY with stream_id:%d",
1838                 t_.get(), t_->is_client ? "CLIENT" : "SERVER",
1839                 std::string(t_->peer_string.as_string_view()).c_str(),
1840                 t_->last_new_stream_id));
1841     t_->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED;
1842     grpc_chttp2_goaway_append(t_->last_new_stream_id, 0, grpc_empty_slice(),
1843                               &t_->qbuf);
1844     grpc_chttp2_initiate_write(t_.get(),
1845                                GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1846   }
1847 
OnPingAck(void * arg,grpc_error_handle)1848   static void OnPingAck(void* arg, grpc_error_handle /* error */) {
1849     auto* self = static_cast<GracefulGoaway*>(arg);
1850     self->t_->combiner->Run(
1851         GRPC_CLOSURE_INIT(&self->on_ping_ack_, OnPingAckLocked, self, nullptr),
1852         absl::OkStatus());
1853   }
1854 
OnPingAckLocked(void * arg,grpc_error_handle)1855   static void OnPingAckLocked(void* arg, grpc_error_handle /* error */) {
1856     auto* self = static_cast<GracefulGoaway*>(arg);
1857     self->MaybeSendFinalGoawayLocked();
1858     self->Unref();
1859   }
1860 
1861   const grpc_core::RefCountedPtr<grpc_chttp2_transport> t_;
1862   grpc_closure on_ping_ack_;
1863 };
1864 
1865 }  // namespace
1866 
send_goaway(grpc_chttp2_transport * t,grpc_error_handle error,bool immediate_disconnect_hint)1867 static void send_goaway(grpc_chttp2_transport* t, grpc_error_handle error,
1868                         bool immediate_disconnect_hint) {
1869   grpc_http2_error_code http_error;
1870   std::string message;
1871   grpc_error_get_status(error, grpc_core::Timestamp::InfFuture(), nullptr,
1872                         &message, &http_error, nullptr);
1873   if (!t->is_client && http_error == GRPC_HTTP2_NO_ERROR &&
1874       !immediate_disconnect_hint) {
1875     // Do a graceful shutdown.
1876     if (t->sent_goaway_state == GRPC_CHTTP2_NO_GOAWAY_SEND) {
1877       GracefulGoaway::Start(t);
1878     } else {
1879       // Graceful GOAWAY is already in progress.
1880     }
1881   } else if (t->sent_goaway_state == GRPC_CHTTP2_NO_GOAWAY_SEND ||
1882              t->sent_goaway_state == GRPC_CHTTP2_GRACEFUL_GOAWAY) {
1883     // We want to log this irrespective of whether http tracing is enabled
1884     gpr_log(GPR_DEBUG, "%s %s: Sending goaway last_new_stream_id=%d err=%s",
1885             std::string(t->peer_string.as_string_view()).c_str(),
1886             t->is_client ? "CLIENT" : "SERVER", t->last_new_stream_id,
1887             grpc_core::StatusToString(error).c_str());
1888     t->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED;
1889     grpc_chttp2_goaway_append(
1890         t->last_new_stream_id, static_cast<uint32_t>(http_error),
1891         grpc_slice_from_cpp_string(std::move(message)), &t->qbuf);
1892   } else {
1893     // Final GOAWAY has already been sent.
1894   }
1895   grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1896 }
1897 
grpc_chttp2_exceeded_ping_strikes(grpc_chttp2_transport * t)1898 void grpc_chttp2_exceeded_ping_strikes(grpc_chttp2_transport* t) {
1899   send_goaway(t,
1900               grpc_error_set_int(GRPC_ERROR_CREATE("too_many_pings"),
1901                                  grpc_core::StatusIntProperty::kHttp2Error,
1902                                  GRPC_HTTP2_ENHANCE_YOUR_CALM),
1903               /*immediate_disconnect_hint=*/true);
1904   // The transport will be closed after the write is done
1905   close_transport_locked(
1906       t, grpc_error_set_int(GRPC_ERROR_CREATE("Too many pings"),
1907                             grpc_core::StatusIntProperty::kRpcStatus,
1908                             GRPC_STATUS_UNAVAILABLE));
1909 }
1910 
grpc_chttp2_reset_ping_clock(grpc_chttp2_transport * t)1911 void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t) {
1912   if (!t->is_client) {
1913     t->ping_abuse_policy.ResetPingStrikes();
1914   }
1915   t->ping_rate_policy.ResetPingsBeforeDataRequired();
1916 }
1917 
perform_transport_op_locked(void * stream_op,grpc_error_handle)1918 static void perform_transport_op_locked(void* stream_op,
1919                                         grpc_error_handle /*error_ignored*/) {
1920   grpc_transport_op* op = static_cast<grpc_transport_op*>(stream_op);
1921   grpc_core::RefCountedPtr<grpc_chttp2_transport> t(
1922       static_cast<grpc_chttp2_transport*>(op->handler_private.extra_arg));
1923 
1924   if (!op->goaway_error.ok()) {
1925     send_goaway(t.get(), op->goaway_error, /*immediate_disconnect_hint=*/false);
1926   }
1927 
1928   if (op->set_accept_stream) {
1929     t->accept_stream_cb = op->set_accept_stream_fn;
1930     t->accept_stream_cb_user_data = op->set_accept_stream_user_data;
1931     t->registered_method_matcher_cb = op->set_registered_method_matcher_fn;
1932   }
1933 
1934   if (op->bind_pollset) {
1935     grpc_endpoint_add_to_pollset(t->ep, op->bind_pollset);
1936   }
1937 
1938   if (op->bind_pollset_set) {
1939     grpc_endpoint_add_to_pollset_set(t->ep, op->bind_pollset_set);
1940   }
1941 
1942   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1943     send_ping_locked(t.get(), op->send_ping.on_initiate, op->send_ping.on_ack);
1944     grpc_chttp2_initiate_write(t.get(),
1945                                GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
1946   }
1947 
1948   if (op->start_connectivity_watch != nullptr) {
1949     t->state_tracker.AddWatcher(op->start_connectivity_watch_state,
1950                                 std::move(op->start_connectivity_watch));
1951   }
1952   if (op->stop_connectivity_watch != nullptr) {
1953     t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
1954   }
1955 
1956   if (!op->disconnect_with_error.ok()) {
1957     send_goaway(t.get(), op->disconnect_with_error,
1958                 /*immediate_disconnect_hint=*/true);
1959     close_transport_locked(t.get(), op->disconnect_with_error);
1960   }
1961 
1962   grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
1963 }
1964 
PerformOp(grpc_transport_op * op)1965 void grpc_chttp2_transport::PerformOp(grpc_transport_op* op) {
1966   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1967     gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", this,
1968             grpc_transport_op_string(op).c_str());
1969   }
1970   op->handler_private.extra_arg = this;
1971   Ref().release()->combiner->Run(
1972       GRPC_CLOSURE_INIT(&op->handler_private.closure,
1973                         perform_transport_op_locked, op, nullptr),
1974       absl::OkStatus());
1975 }
1976 
1977 //
1978 // INPUT PROCESSING - GENERAL
1979 //
1980 
grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1981 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
1982                                                       grpc_chttp2_stream* s) {
1983   if (s->recv_initial_metadata_ready != nullptr &&
1984       s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
1985     if (s->seen_error) {
1986       grpc_slice_buffer_reset_and_unref(&s->frame_storage);
1987     }
1988     *s->recv_initial_metadata = std::move(s->initial_metadata_buffer);
1989     s->recv_initial_metadata->Set(grpc_core::PeerString(),
1990                                   t->peer_string.Ref());
1991     // If we didn't receive initial metadata from the wire and instead faked a
1992     // status (due to stream cancellations for example), let upper layers know
1993     // that trailing metadata is immediately available.
1994     if (s->trailing_metadata_available != nullptr &&
1995         s->published_metadata[0] != GRPC_METADATA_PUBLISHED_FROM_WIRE &&
1996         s->published_metadata[1] == GRPC_METADATA_SYNTHESIZED_FROM_FAKE) {
1997       *s->trailing_metadata_available = true;
1998       s->trailing_metadata_available = nullptr;
1999     }
2000     if (t->registered_method_matcher_cb != nullptr) {
2001       t->registered_method_matcher_cb(t->accept_stream_cb_user_data,
2002                                       s->recv_initial_metadata);
2003     }
2004     null_then_sched_closure(&s->recv_initial_metadata_ready);
2005   }
2006 }
2007 
grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport * t,grpc_chttp2_stream * s)2008 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
2009                                              grpc_chttp2_stream* s) {
2010   if (s->recv_message_ready == nullptr) return;
2011 
2012   grpc_core::chttp2::StreamFlowControl::IncomingUpdateContext upd(
2013       &s->flow_control);
2014   grpc_error_handle error;
2015 
2016   // Lambda is immediately invoked as a big scoped section that can be
2017   // exited out of at any point by returning.
2018   [&]() {
2019     if (grpc_http_trace.enabled()) {
2020       gpr_log(GPR_DEBUG,
2021               "maybe_complete_recv_message %p final_metadata_requested=%d "
2022               "seen_error=%d",
2023               s, s->final_metadata_requested, s->seen_error);
2024     }
2025     if (s->final_metadata_requested && s->seen_error) {
2026       grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2027       s->recv_message->reset();
2028     } else {
2029       if (s->frame_storage.length != 0) {
2030         while (true) {
2031           GPR_ASSERT(s->frame_storage.length > 0);
2032           int64_t min_progress_size;
2033           auto r = grpc_deframe_unprocessed_incoming_frames(
2034               s, &min_progress_size, &**s->recv_message, s->recv_message_flags);
2035           if (grpc_http_trace.enabled()) {
2036             gpr_log(GPR_DEBUG, "Deframe data frame: %s",
2037                     grpc_core::PollToString(r, [](absl::Status r) {
2038                       return r.ToString();
2039                     }).c_str());
2040           }
2041           if (r.pending()) {
2042             if (s->read_closed) {
2043               grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2044               s->recv_message->reset();
2045               break;
2046             } else {
2047               upd.SetMinProgressSize(min_progress_size);
2048               return;  // Out of lambda to enclosing function
2049             }
2050           } else {
2051             error = std::move(r.value());
2052             if (!error.ok()) {
2053               s->seen_error = true;
2054               grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2055               break;
2056             } else {
2057               if (t->channelz_socket != nullptr) {
2058                 t->channelz_socket->RecordMessageReceived();
2059               }
2060               break;
2061             }
2062           }
2063         }
2064       } else if (s->read_closed) {
2065         s->recv_message->reset();
2066       } else {
2067         upd.SetMinProgressSize(GRPC_HEADER_SIZE_IN_BYTES);
2068         return;  // Out of lambda to enclosing function
2069       }
2070     }
2071     // save the length of the buffer before handing control back to application
2072     // threads. Needed to support correct flow control bookkeeping
2073     if (error.ok() && s->recv_message->has_value()) {
2074       null_then_sched_closure(&s->recv_message_ready);
2075     } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
2076       if (s->call_failed_before_recv_message != nullptr) {
2077         *s->call_failed_before_recv_message =
2078             (s->published_metadata[1] != GRPC_METADATA_PUBLISHED_AT_CLOSE);
2079       }
2080       null_then_sched_closure(&s->recv_message_ready);
2081     }
2082   }();
2083 
2084   upd.SetPendingSize(s->frame_storage.length);
2085   grpc_chttp2_act_on_flowctl_action(upd.MakeAction(), t, s);
2086 }
2087 
grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)2088 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
2089                                                        grpc_chttp2_stream* s) {
2090   grpc_chttp2_maybe_complete_recv_message(t, s);
2091   if (grpc_http_trace.enabled()) {
2092     gpr_log(GPR_DEBUG,
2093             "maybe_complete_recv_trailing_metadata cli=%d s=%p closure=%p "
2094             "read_closed=%d "
2095             "write_closed=%d %" PRIdPTR,
2096             t->is_client, s, s->recv_trailing_metadata_finished, s->read_closed,
2097             s->write_closed, s->frame_storage.length);
2098   }
2099   if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
2100       s->write_closed) {
2101     if (s->seen_error || !t->is_client) {
2102       grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2103     }
2104     if (s->read_closed && s->frame_storage.length == 0 &&
2105         s->recv_trailing_metadata_finished != nullptr) {
2106       grpc_transport_move_stats(&s->stats, s->collecting_stats);
2107       s->collecting_stats = nullptr;
2108       *s->recv_trailing_metadata = std::move(s->trailing_metadata_buffer);
2109       null_then_sched_closure(&s->recv_trailing_metadata_finished);
2110     }
2111   }
2112 }
2113 
remove_stream(grpc_chttp2_transport * t,uint32_t id,grpc_error_handle error)2114 static grpc_chttp2_transport::RemovedStreamHandle remove_stream(
2115     grpc_chttp2_transport* t, uint32_t id, grpc_error_handle error) {
2116   grpc_chttp2_stream* s = t->stream_map.extract(id).mapped();
2117   GPR_DEBUG_ASSERT(s);
2118   if (t->incoming_stream == s) {
2119     t->incoming_stream = nullptr;
2120     grpc_chttp2_parsing_become_skip_parser(t);
2121   }
2122 
2123   if (t->stream_map.empty()) {
2124     post_benign_reclaimer(t);
2125     if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SENT) {
2126       close_transport_locked(
2127           t, GRPC_ERROR_CREATE_REFERENCING(
2128                  "Last stream closed after sending GOAWAY", &error, 1));
2129     }
2130   }
2131   if (grpc_chttp2_list_remove_writable_stream(t, s)) {
2132     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
2133   }
2134   grpc_chttp2_list_remove_stalled_by_stream(t, s);
2135   grpc_chttp2_list_remove_stalled_by_transport(t, s);
2136 
2137   maybe_start_some_streams(t);
2138 
2139   if (t->is_client) return grpc_chttp2_transport::RemovedStreamHandle();
2140   return grpc_chttp2_transport::RemovedStreamHandle(t->Ref());
2141 }
2142 
2143 namespace grpc_core {
2144 namespace {
2145 
TarpitDuration(grpc_chttp2_transport * t)2146 Duration TarpitDuration(grpc_chttp2_transport* t) {
2147   return Duration::Milliseconds(absl::LogUniform<int>(
2148       absl::BitGen(), t->min_tarpit_duration_ms, t->max_tarpit_duration_ms));
2149 }
2150 
2151 template <typename F>
MaybeTarpit(grpc_chttp2_transport * t,bool tarpit,F fn)2152 void MaybeTarpit(grpc_chttp2_transport* t, bool tarpit, F fn) {
2153   if (!tarpit || !t->allow_tarpit || t->is_client) {
2154     fn(t);
2155     return;
2156   }
2157   const auto duration = TarpitDuration(t);
2158   t->event_engine->RunAfter(
2159       duration, [t = t->Ref(), fn = std::move(fn)]() mutable {
2160         ApplicationCallbackExecCtx app_exec_ctx;
2161         ExecCtx exec_ctx;
2162         t->combiner->Run(
2163             NewClosure([t, fn = std::move(fn)](grpc_error_handle) mutable {
2164               // TODO(ctiller): this can result in not sending RST_STREAMS if a
2165               // request gets tarpit behind a transport close.
2166               if (!t->closed_with_error.ok()) return;
2167               fn(t.get());
2168             }),
2169             absl::OkStatus());
2170       });
2171 }
2172 
2173 }  // namespace
2174 }  // namespace grpc_core
2175 
grpc_chttp2_cancel_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle due_to_error,bool tarpit)2176 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2177                                grpc_error_handle due_to_error, bool tarpit) {
2178   if (!t->is_client && !s->sent_trailing_metadata &&
2179       grpc_error_has_clear_grpc_status(due_to_error) &&
2180       !(s->read_closed && s->write_closed)) {
2181     close_from_api(t, s, due_to_error, tarpit);
2182     return;
2183   }
2184 
2185   if (!due_to_error.ok() && !s->seen_error) {
2186     s->seen_error = true;
2187   }
2188   if (!s->read_closed || !s->write_closed) {
2189     if (s->id != 0) {
2190       grpc_http2_error_code http_error;
2191       grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
2192                             &http_error, nullptr);
2193       grpc_core::MaybeTarpit(
2194           t, tarpit,
2195           [id = s->id, http_error,
2196            remove_stream_handle = grpc_chttp2_mark_stream_closed(
2197                t, s, 1, 1, due_to_error)](grpc_chttp2_transport* t) {
2198             grpc_chttp2_add_rst_stream_to_next_write(
2199                 t, id, static_cast<uint32_t>(http_error), nullptr);
2200             grpc_chttp2_initiate_write(t,
2201                                        GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
2202           });
2203       return;
2204     }
2205   }
2206   grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error);
2207 }
2208 
grpc_chttp2_fake_status(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2209 void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2210                              grpc_error_handle error) {
2211   grpc_status_code status;
2212   std::string message;
2213   grpc_error_get_status(error, s->deadline, &status, &message, nullptr,
2214                         nullptr);
2215   if (status != GRPC_STATUS_OK) {
2216     s->seen_error = true;
2217   }
2218   // stream_global->recv_trailing_metadata_finished gives us a
2219   //   last chance replacement: we've received trailing metadata,
2220   //   but something more important has become available to signal
2221   //   to the upper layers - drop what we've got, and then publish
2222   //   what we want - which is safe because we haven't told anyone
2223   //   about the metadata yet
2224   if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
2225       s->recv_trailing_metadata_finished != nullptr ||
2226       !s->final_metadata_requested) {
2227     s->trailing_metadata_buffer.Set(grpc_core::GrpcStatusMetadata(), status);
2228     if (!message.empty()) {
2229       s->trailing_metadata_buffer.Set(
2230           grpc_core::GrpcMessageMetadata(),
2231           grpc_core::Slice::FromCopiedBuffer(message));
2232     }
2233     s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
2234     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2235   }
2236 }
2237 
add_error(grpc_error_handle error,grpc_error_handle * refs,size_t * nrefs)2238 static void add_error(grpc_error_handle error, grpc_error_handle* refs,
2239                       size_t* nrefs) {
2240   if (error.ok()) return;
2241   for (size_t i = 0; i < *nrefs; i++) {
2242     if (error == refs[i]) {
2243       return;
2244     }
2245   }
2246   refs[*nrefs] = error;
2247   ++*nrefs;
2248 }
2249 
removal_error(grpc_error_handle extra_error,grpc_chttp2_stream * s,const char * main_error_msg)2250 static grpc_error_handle removal_error(grpc_error_handle extra_error,
2251                                        grpc_chttp2_stream* s,
2252                                        const char* main_error_msg) {
2253   grpc_error_handle refs[3];
2254   size_t nrefs = 0;
2255   add_error(s->read_closed_error, refs, &nrefs);
2256   add_error(s->write_closed_error, refs, &nrefs);
2257   add_error(extra_error, refs, &nrefs);
2258   grpc_error_handle error;
2259   if (nrefs > 0) {
2260     error = GRPC_ERROR_CREATE_REFERENCING(main_error_msg, refs, nrefs);
2261   }
2262   return error;
2263 }
2264 
flush_write_list(grpc_chttp2_transport * t,grpc_chttp2_write_cb ** list,grpc_error_handle error)2265 static void flush_write_list(grpc_chttp2_transport* t,
2266                              grpc_chttp2_write_cb** list,
2267                              grpc_error_handle error) {
2268   while (*list) {
2269     grpc_chttp2_write_cb* cb = *list;
2270     *list = cb->next;
2271     grpc_chttp2_complete_closure_step(t, &cb->closure, error,
2272                                       "on_write_finished_cb");
2273     cb->next = t->write_cb_pool;
2274     t->write_cb_pool = cb;
2275   }
2276 }
2277 
grpc_chttp2_fail_pending_writes(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2278 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
2279                                      grpc_chttp2_stream* s,
2280                                      grpc_error_handle error) {
2281   error =
2282       removal_error(error, s, "Pending writes failed due to stream closure");
2283   s->send_initial_metadata = nullptr;
2284   grpc_chttp2_complete_closure_step(t, &s->send_initial_metadata_finished,
2285                                     error, "send_initial_metadata_finished");
2286 
2287   s->send_trailing_metadata = nullptr;
2288   s->sent_trailing_metadata_op = nullptr;
2289   grpc_chttp2_complete_closure_step(t, &s->send_trailing_metadata_finished,
2290                                     error, "send_trailing_metadata_finished");
2291 
2292   grpc_chttp2_complete_closure_step(t, &s->send_message_finished, error,
2293                                     "fetching_send_message_finished");
2294   flush_write_list(t, &s->on_write_finished_cbs, error);
2295   flush_write_list(t, &s->on_flow_controlled_cbs, error);
2296 }
2297 
grpc_chttp2_mark_stream_closed(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int close_reads,int close_writes,grpc_error_handle error)2298 grpc_chttp2_transport::RemovedStreamHandle grpc_chttp2_mark_stream_closed(
2299     grpc_chttp2_transport* t, grpc_chttp2_stream* s, int close_reads,
2300     int close_writes, grpc_error_handle error) {
2301   grpc_chttp2_transport::RemovedStreamHandle rsh;
2302   if (grpc_http_trace.enabled()) {
2303     gpr_log(
2304         GPR_DEBUG, "MARK_STREAM_CLOSED: t=%p s=%p(id=%d) %s [%s]", t, s, s->id,
2305         (close_reads && close_writes)
2306             ? "read+write"
2307             : (close_reads ? "read" : (close_writes ? "write" : "nothing??")),
2308         grpc_core::StatusToString(error).c_str());
2309   }
2310   if (s->read_closed && s->write_closed) {
2311     // already closed, but we should still fake the status if needed.
2312     grpc_error_handle overall_error = removal_error(error, s, "Stream removed");
2313     if (!overall_error.ok()) {
2314       grpc_chttp2_fake_status(t, s, overall_error);
2315     }
2316     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2317     return rsh;
2318   }
2319   bool closed_read = false;
2320   bool became_closed = false;
2321   if (close_reads && !s->read_closed) {
2322     s->read_closed_error = error;
2323     s->read_closed = true;
2324     closed_read = true;
2325   }
2326   if (close_writes && !s->write_closed) {
2327     s->write_closed_error = error;
2328     s->write_closed = true;
2329     grpc_chttp2_fail_pending_writes(t, s, error);
2330   }
2331   if (s->read_closed && s->write_closed) {
2332     became_closed = true;
2333     grpc_error_handle overall_error = removal_error(error, s, "Stream removed");
2334     if (s->id != 0) {
2335       rsh = remove_stream(t, s->id, overall_error);
2336     } else {
2337       // Purge streams waiting on concurrency still waiting for id assignment
2338       grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
2339     }
2340     if (!overall_error.ok()) {
2341       grpc_chttp2_fake_status(t, s, overall_error);
2342     }
2343   }
2344   if (closed_read) {
2345     for (int i = 0; i < 2; i++) {
2346       if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
2347         s->published_metadata[i] = GRPC_METADATA_PUBLISHED_AT_CLOSE;
2348       }
2349     }
2350     grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
2351     grpc_chttp2_maybe_complete_recv_message(t, s);
2352   }
2353   if (became_closed) {
2354     s->stats.latency =
2355         gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), s->creation_time);
2356     grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2357     GRPC_CHTTP2_STREAM_UNREF(s, "chttp2");
2358   }
2359   return rsh;
2360 }
2361 
close_from_api(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error,bool tarpit)2362 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2363                            grpc_error_handle error, bool tarpit) {
2364   grpc_status_code grpc_status;
2365   std::string message;
2366   grpc_error_get_status(error, s->deadline, &grpc_status, &message, nullptr,
2367                         nullptr);
2368 
2369   GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
2370 
2371   auto remove_stream_handle = grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
2372   grpc_core::MaybeTarpit(
2373       t, tarpit,
2374       [error = std::move(error),
2375        sent_initial_metadata = s->sent_initial_metadata, id = s->id,
2376        grpc_status, message = std::move(message),
2377        remove_stream_handle =
2378            std::move(remove_stream_handle)](grpc_chttp2_transport* t) mutable {
2379         grpc_slice hdr;
2380         grpc_slice status_hdr;
2381         grpc_slice http_status_hdr;
2382         grpc_slice content_type_hdr;
2383         grpc_slice message_pfx;
2384         uint8_t* p;
2385         uint32_t len = 0;
2386 
2387         // Hand roll a header block.
2388         //   This is unnecessarily ugly - at some point we should find a more
2389         //   elegant solution.
2390         //   It's complicated by the fact that our send machinery would be dead
2391         //   by the time we got around to sending this, so instead we ignore
2392         //   HPACK compression and just write the uncompressed bytes onto the
2393         //   wire.
2394         if (!sent_initial_metadata) {
2395           http_status_hdr = GRPC_SLICE_MALLOC(13);
2396           p = GRPC_SLICE_START_PTR(http_status_hdr);
2397           *p++ = 0x00;
2398           *p++ = 7;
2399           *p++ = ':';
2400           *p++ = 's';
2401           *p++ = 't';
2402           *p++ = 'a';
2403           *p++ = 't';
2404           *p++ = 'u';
2405           *p++ = 's';
2406           *p++ = 3;
2407           *p++ = '2';
2408           *p++ = '0';
2409           *p++ = '0';
2410           GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
2411           len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr);
2412 
2413           content_type_hdr = GRPC_SLICE_MALLOC(31);
2414           p = GRPC_SLICE_START_PTR(content_type_hdr);
2415           *p++ = 0x00;
2416           *p++ = 12;
2417           *p++ = 'c';
2418           *p++ = 'o';
2419           *p++ = 'n';
2420           *p++ = 't';
2421           *p++ = 'e';
2422           *p++ = 'n';
2423           *p++ = 't';
2424           *p++ = '-';
2425           *p++ = 't';
2426           *p++ = 'y';
2427           *p++ = 'p';
2428           *p++ = 'e';
2429           *p++ = 16;
2430           *p++ = 'a';
2431           *p++ = 'p';
2432           *p++ = 'p';
2433           *p++ = 'l';
2434           *p++ = 'i';
2435           *p++ = 'c';
2436           *p++ = 'a';
2437           *p++ = 't';
2438           *p++ = 'i';
2439           *p++ = 'o';
2440           *p++ = 'n';
2441           *p++ = '/';
2442           *p++ = 'g';
2443           *p++ = 'r';
2444           *p++ = 'p';
2445           *p++ = 'c';
2446           GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr));
2447           len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr);
2448         }
2449 
2450         status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10));
2451         p = GRPC_SLICE_START_PTR(status_hdr);
2452         *p++ = 0x00;  // literal header, not indexed
2453         *p++ = 11;    // len(grpc-status)
2454         *p++ = 'g';
2455         *p++ = 'r';
2456         *p++ = 'p';
2457         *p++ = 'c';
2458         *p++ = '-';
2459         *p++ = 's';
2460         *p++ = 't';
2461         *p++ = 'a';
2462         *p++ = 't';
2463         *p++ = 'u';
2464         *p++ = 's';
2465         if (grpc_status < 10) {
2466           *p++ = 1;
2467           *p++ = static_cast<uint8_t>('0' + grpc_status);
2468         } else {
2469           *p++ = 2;
2470           *p++ = static_cast<uint8_t>('0' + (grpc_status / 10));
2471           *p++ = static_cast<uint8_t>('0' + (grpc_status % 10));
2472         }
2473         GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
2474         len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr);
2475 
2476         size_t msg_len = message.length();
2477         GPR_ASSERT(msg_len <= UINT32_MAX);
2478         grpc_core::VarintWriter<1> msg_len_writer(
2479             static_cast<uint32_t>(msg_len));
2480         message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_writer.length());
2481         p = GRPC_SLICE_START_PTR(message_pfx);
2482         *p++ = 0x00;  // literal header, not indexed
2483         *p++ = 12;    // len(grpc-message)
2484         *p++ = 'g';
2485         *p++ = 'r';
2486         *p++ = 'p';
2487         *p++ = 'c';
2488         *p++ = '-';
2489         *p++ = 'm';
2490         *p++ = 'e';
2491         *p++ = 's';
2492         *p++ = 's';
2493         *p++ = 'a';
2494         *p++ = 'g';
2495         *p++ = 'e';
2496         msg_len_writer.Write(0, p);
2497         p += msg_len_writer.length();
2498         GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
2499         len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx);
2500         len += static_cast<uint32_t>(msg_len);
2501 
2502         hdr = GRPC_SLICE_MALLOC(9);
2503         p = GRPC_SLICE_START_PTR(hdr);
2504         *p++ = static_cast<uint8_t>(len >> 16);
2505         *p++ = static_cast<uint8_t>(len >> 8);
2506         *p++ = static_cast<uint8_t>(len);
2507         *p++ = GRPC_CHTTP2_FRAME_HEADER;
2508         *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM |
2509                GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
2510         *p++ = static_cast<uint8_t>(id >> 24);
2511         *p++ = static_cast<uint8_t>(id >> 16);
2512         *p++ = static_cast<uint8_t>(id >> 8);
2513         *p++ = static_cast<uint8_t>(id);
2514         GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
2515 
2516         grpc_slice_buffer_add(&t->qbuf, hdr);
2517         if (!sent_initial_metadata) {
2518           grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
2519           grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
2520         }
2521         grpc_slice_buffer_add(&t->qbuf, status_hdr);
2522         grpc_slice_buffer_add(&t->qbuf, message_pfx);
2523         grpc_slice_buffer_add(&t->qbuf,
2524                               grpc_slice_from_cpp_string(std::move(message)));
2525         grpc_chttp2_reset_ping_clock(t);
2526         grpc_chttp2_add_rst_stream_to_next_write(t, id, GRPC_HTTP2_NO_ERROR,
2527                                                  nullptr);
2528 
2529         grpc_chttp2_initiate_write(t,
2530                                    GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
2531       });
2532 }
2533 
end_all_the_calls(grpc_chttp2_transport * t,grpc_error_handle error)2534 static void end_all_the_calls(grpc_chttp2_transport* t,
2535                               grpc_error_handle error) {
2536   intptr_t http2_error;
2537   // If there is no explicit grpc or HTTP/2 error, set to UNAVAILABLE on server.
2538   if (!t->is_client && !grpc_error_has_clear_grpc_status(error) &&
2539       !grpc_error_get_int(error, grpc_core::StatusIntProperty::kHttp2Error,
2540                           &http2_error)) {
2541     error = grpc_error_set_int(error, grpc_core::StatusIntProperty::kRpcStatus,
2542                                GRPC_STATUS_UNAVAILABLE);
2543   }
2544   cancel_unstarted_streams(t, error, false);
2545   std::vector<grpc_chttp2_stream*> to_cancel;
2546   for (auto id_stream : t->stream_map) {
2547     to_cancel.push_back(id_stream.second);
2548   }
2549   for (auto s : to_cancel) {
2550     grpc_chttp2_cancel_stream(t, s, error, false);
2551   }
2552 }
2553 
2554 //
2555 // INPUT PROCESSING - PARSING
2556 //
2557 
2558 template <class F>
WithUrgency(grpc_chttp2_transport * t,grpc_core::chttp2::FlowControlAction::Urgency urgency,grpc_chttp2_initiate_write_reason reason,F action)2559 static void WithUrgency(grpc_chttp2_transport* t,
2560                         grpc_core::chttp2::FlowControlAction::Urgency urgency,
2561                         grpc_chttp2_initiate_write_reason reason, F action) {
2562   switch (urgency) {
2563     case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
2564       break;
2565     case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
2566       grpc_chttp2_initiate_write(t, reason);
2567       ABSL_FALLTHROUGH_INTENDED;
2568     case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
2569       action();
2570       break;
2571   }
2572 }
2573 
grpc_chttp2_act_on_flowctl_action(const grpc_core::chttp2::FlowControlAction & action,grpc_chttp2_transport * t,grpc_chttp2_stream * s)2574 void grpc_chttp2_act_on_flowctl_action(
2575     const grpc_core::chttp2::FlowControlAction& action,
2576     grpc_chttp2_transport* t, grpc_chttp2_stream* s) {
2577   WithUrgency(t, action.send_stream_update(),
2578               GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, [t, s]() {
2579                 if (s->id != 0 && !s->read_closed) {
2580                   grpc_chttp2_mark_stream_writable(t, s);
2581                 }
2582               });
2583   WithUrgency(t, action.send_transport_update(),
2584               GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
2585   WithUrgency(t, action.send_initial_window_update(),
2586               GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2587                 t->settings.mutable_local().SetInitialWindowSize(
2588                     action.initial_window_size());
2589               });
2590   WithUrgency(
2591       t, action.send_max_frame_size_update(),
2592       GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2593         t->settings.mutable_local().SetMaxFrameSize(action.max_frame_size());
2594       });
2595   if (t->enable_preferred_rx_crypto_frame_advertisement) {
2596     WithUrgency(
2597         t, action.preferred_rx_crypto_frame_size_update(),
2598         GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2599           t->settings.mutable_local().SetPreferredReceiveCryptoMessageSize(
2600               action.preferred_rx_crypto_frame_size());
2601         });
2602   }
2603 }
2604 
try_http_parsing(grpc_chttp2_transport * t)2605 static grpc_error_handle try_http_parsing(grpc_chttp2_transport* t) {
2606   grpc_http_parser parser;
2607   size_t i = 0;
2608   grpc_error_handle error;
2609   grpc_http_response response;
2610 
2611   grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
2612 
2613   grpc_error_handle parse_error;
2614   for (; i < t->read_buffer.count && parse_error.ok(); i++) {
2615     parse_error =
2616         grpc_http_parser_parse(&parser, t->read_buffer.slices[i], nullptr);
2617   }
2618   if (parse_error.ok() &&
2619       (parse_error = grpc_http_parser_eof(&parser)) == absl::OkStatus()) {
2620     error = grpc_error_set_int(
2621         grpc_error_set_int(
2622             GRPC_ERROR_CREATE("Trying to connect an http1.x server"),
2623             grpc_core::StatusIntProperty::kHttpStatus, response.status),
2624         grpc_core::StatusIntProperty::kRpcStatus,
2625         grpc_http2_status_to_grpc_status(response.status));
2626   }
2627 
2628   grpc_http_parser_destroy(&parser);
2629   grpc_http_response_destroy(&response);
2630   return error;
2631 }
2632 
read_action(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2633 static void read_action(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2634                         grpc_error_handle error) {
2635   auto* tp = t.get();
2636   tp->combiner->Run(grpc_core::InitTransportClosure<read_action_locked>(
2637                         std::move(t), &tp->read_action_locked),
2638                     error);
2639 }
2640 
read_action_parse_loop_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2641 static void read_action_parse_loop_locked(
2642     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2643     grpc_error_handle error) {
2644   if (t->closed_with_error.ok()) {
2645     grpc_error_handle errors[3] = {error, absl::OkStatus(), absl::OkStatus()};
2646     size_t requests_started = 0;
2647     for (size_t i = 0;
2648          i < t->read_buffer.count && errors[1] == absl::OkStatus(); i++) {
2649       auto r = grpc_chttp2_perform_read(t.get(), t->read_buffer.slices[i],
2650                                         requests_started);
2651       if (auto* partial_read_size = absl::get_if<size_t>(&r)) {
2652         for (size_t j = 0; j < i; j++) {
2653           grpc_core::CSliceUnref(grpc_slice_buffer_take_first(&t->read_buffer));
2654         }
2655         grpc_slice_buffer_sub_first(
2656             &t->read_buffer, *partial_read_size,
2657             GRPC_SLICE_LENGTH(t->read_buffer.slices[0]));
2658         t->combiner->ForceOffload();
2659         auto* tp = t.get();
2660         tp->combiner->Run(
2661             grpc_core::InitTransportClosure<read_action_parse_loop_locked>(
2662                 std::move(t), &tp->read_action_locked),
2663             std::move(errors[0]));
2664         // Early return: we queued to retry later.
2665         return;
2666       } else {
2667         errors[1] = std::move(absl::get<absl::Status>(r));
2668       }
2669     }
2670     if (errors[1] != absl::OkStatus()) {
2671       errors[2] = try_http_parsing(t.get());
2672       error = GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors,
2673                                             GPR_ARRAY_SIZE(errors));
2674     }
2675 
2676     if (t->initial_window_update != 0) {
2677       if (t->initial_window_update > 0) {
2678         grpc_chttp2_stream* s;
2679         while (grpc_chttp2_list_pop_stalled_by_stream(t.get(), &s)) {
2680           grpc_chttp2_mark_stream_writable(t.get(), s);
2681           grpc_chttp2_initiate_write(
2682               t.get(),
2683               GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
2684         }
2685       }
2686       t->initial_window_update = 0;
2687     }
2688   }
2689 
2690   bool keep_reading = false;
2691   if (error.ok() && !t->closed_with_error.ok()) {
2692     error = GRPC_ERROR_CREATE_REFERENCING("Transport closed",
2693                                           &t->closed_with_error, 1);
2694   }
2695   if (!error.ok()) {
2696     // If a goaway frame was received, this might be the reason why the read
2697     // failed. Add this info to the error
2698     if (!t->goaway_error.ok()) {
2699       error = grpc_error_add_child(error, t->goaway_error);
2700     }
2701 
2702     close_transport_locked(t.get(), error);
2703   } else if (t->closed_with_error.ok()) {
2704     keep_reading = true;
2705     // Since we have read a byte, reset the keepalive timer
2706     if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2707       maybe_reset_keepalive_ping_timer_locked(t.get());
2708     }
2709   }
2710   grpc_slice_buffer_reset_and_unref(&t->read_buffer);
2711 
2712   if (keep_reading) {
2713     if (t->num_pending_induced_frames >= DEFAULT_MAX_PENDING_INDUCED_FRAMES) {
2714       t->reading_paused_on_pending_induced_frames = true;
2715       GRPC_CHTTP2_IF_TRACING(
2716           gpr_log(GPR_INFO,
2717                   "transport %p : Pausing reading due to too "
2718                   "many unwritten SETTINGS ACK and RST_STREAM frames",
2719                   t.get()));
2720     } else {
2721       continue_read_action_locked(std::move(t));
2722     }
2723   }
2724 }
2725 
read_action_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2726 static void read_action_locked(
2727     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2728     grpc_error_handle error) {
2729   // got an incoming read, cancel any pending keepalive timers
2730   t->keepalive_incoming_data_wanted = false;
2731   if (t->keepalive_ping_timeout_handle != TaskHandle::kInvalid) {
2732     if (GRPC_TRACE_FLAG_ENABLED(grpc_ping_trace) ||
2733         GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2734       gpr_log(GPR_INFO,
2735               "%s[%p]: Clear keepalive timer because data was received",
2736               t->is_client ? "CLIENT" : "SERVER", t.get());
2737     }
2738     t->event_engine->Cancel(
2739         std::exchange(t->keepalive_ping_timeout_handle, TaskHandle::kInvalid));
2740   }
2741   grpc_error_handle err = error;
2742   if (!err.ok()) {
2743     err = grpc_error_set_int(
2744         GRPC_ERROR_CREATE_REFERENCING("Endpoint read failed", &err, 1),
2745         grpc_core::StatusIntProperty::kOccurredDuringWrite, t->write_state);
2746   }
2747   std::swap(err, error);
2748   read_action_parse_loop_locked(std::move(t), std::move(err));
2749 }
2750 
continue_read_action_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)2751 static void continue_read_action_locked(
2752     grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
2753   const bool urgent = !t->goaway_error.ok();
2754   auto* tp = t.get();
2755   grpc_endpoint_read(tp->ep, &tp->read_buffer,
2756                      grpc_core::InitTransportClosure<read_action>(
2757                          std::move(t), &tp->read_action_locked),
2758                      urgent, grpc_chttp2_min_read_progress_size(tp));
2759 }
2760 
2761 // t is reffed prior to calling the first time, and once the callback chain
2762 // that kicks off finishes, it's unreffed
schedule_bdp_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)2763 void schedule_bdp_ping_locked(
2764     grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
2765   auto* tp = t.get();
2766   tp->flow_control.bdp_estimator()->SchedulePing();
2767   send_ping_locked(tp,
2768                    grpc_core::InitTransportClosure<start_bdp_ping>(
2769                        tp->Ref(), &tp->start_bdp_ping_locked),
2770                    grpc_core::InitTransportClosure<finish_bdp_ping>(
2771                        std::move(t), &tp->finish_bdp_ping_locked));
2772   grpc_chttp2_initiate_write(tp, GRPC_CHTTP2_INITIATE_WRITE_BDP_PING);
2773 }
2774 
start_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2775 static void start_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2776                            grpc_error_handle error) {
2777   grpc_chttp2_transport* tp = t.get();
2778   tp->combiner->Run(grpc_core::InitTransportClosure<start_bdp_ping_locked>(
2779                         std::move(t), &tp->start_bdp_ping_locked),
2780                     error);
2781 }
2782 
start_bdp_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2783 static void start_bdp_ping_locked(
2784     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2785     grpc_error_handle error) {
2786   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2787     gpr_log(GPR_INFO, "%s: Start BDP ping err=%s",
2788             std::string(t->peer_string.as_string_view()).c_str(),
2789             grpc_core::StatusToString(error).c_str());
2790   }
2791   if (!error.ok() || !t->closed_with_error.ok()) {
2792     return;
2793   }
2794   // Reset the keepalive ping timer
2795   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2796     maybe_reset_keepalive_ping_timer_locked(t.get());
2797   }
2798   t->flow_control.bdp_estimator()->StartPing();
2799   t->bdp_ping_started = true;
2800 }
2801 
finish_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2802 static void finish_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2803                             grpc_error_handle error) {
2804   grpc_chttp2_transport* tp = t.get();
2805   tp->combiner->Run(grpc_core::InitTransportClosure<finish_bdp_ping_locked>(
2806                         std::move(t), &tp->finish_bdp_ping_locked),
2807                     error);
2808 }
2809 
finish_bdp_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2810 static void finish_bdp_ping_locked(
2811     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2812     grpc_error_handle error) {
2813   if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2814     gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s",
2815             std::string(t->peer_string.as_string_view()).c_str(),
2816             grpc_core::StatusToString(error).c_str());
2817   }
2818   if (!error.ok() || !t->closed_with_error.ok()) {
2819     return;
2820   }
2821   if (!t->bdp_ping_started) {
2822     // start_bdp_ping_locked has not been run yet. Schedule
2823     // finish_bdp_ping_locked to be run later.
2824     finish_bdp_ping(std::move(t), std::move(error));
2825     return;
2826   }
2827   t->bdp_ping_started = false;
2828   grpc_core::Timestamp next_ping =
2829       t->flow_control.bdp_estimator()->CompletePing();
2830   grpc_chttp2_act_on_flowctl_action(t->flow_control.PeriodicUpdate(), t.get(),
2831                                     nullptr);
2832   GPR_ASSERT(t->next_bdp_ping_timer_handle == TaskHandle::kInvalid);
2833   t->next_bdp_ping_timer_handle =
2834       t->event_engine->RunAfter(next_ping - grpc_core::Timestamp::Now(), [t] {
2835         grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2836         grpc_core::ExecCtx exec_ctx;
2837         next_bdp_ping_timer_expired(t.get());
2838       });
2839 }
2840 
next_bdp_ping_timer_expired(grpc_chttp2_transport * t)2841 static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t) {
2842   t->combiner->Run(
2843       grpc_core::InitTransportClosure<next_bdp_ping_timer_expired_locked>(
2844           t->Ref(), &t->next_bdp_ping_timer_expired_locked),
2845       absl::OkStatus());
2846 }
2847 
next_bdp_ping_timer_expired_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,GRPC_UNUSED grpc_error_handle error)2848 static void next_bdp_ping_timer_expired_locked(
2849     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2850     GRPC_UNUSED grpc_error_handle error) {
2851   GPR_DEBUG_ASSERT(error.ok());
2852   t->next_bdp_ping_timer_handle = TaskHandle::kInvalid;
2853   if (t->flow_control.bdp_estimator()->accumulator() == 0) {
2854     // Block the bdp ping till we receive more data.
2855     t->bdp_ping_blocked = true;
2856   } else {
2857     schedule_bdp_ping_locked(std::move(t));
2858   }
2859 }
2860 
grpc_chttp2_config_default_keepalive_args(grpc_channel_args * args,bool is_client)2861 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
2862                                                bool is_client) {
2863   grpc_chttp2_config_default_keepalive_args(grpc_core::ChannelArgs::FromC(args),
2864                                             is_client);
2865 }
2866 
grpc_chttp2_config_default_keepalive_args(const grpc_core::ChannelArgs & channel_args,bool is_client)2867 void grpc_chttp2_config_default_keepalive_args(
2868     const grpc_core::ChannelArgs& channel_args, bool is_client) {
2869   const auto keepalive_time =
2870       std::max(grpc_core::Duration::Milliseconds(1),
2871                channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIME_MS)
2872                    .value_or(is_client ? g_default_client_keepalive_time
2873                                        : g_default_server_keepalive_time));
2874   if (is_client) {
2875     g_default_client_keepalive_time = keepalive_time;
2876   } else {
2877     g_default_server_keepalive_time = keepalive_time;
2878   }
2879 
2880   const auto keepalive_timeout = std::max(
2881       grpc_core::Duration::Zero(),
2882       channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIMEOUT_MS)
2883           .value_or(is_client ? g_default_client_keepalive_timeout
2884                               : g_default_server_keepalive_timeout));
2885   if (is_client) {
2886     g_default_client_keepalive_timeout = keepalive_timeout;
2887   } else {
2888     g_default_server_keepalive_timeout = keepalive_timeout;
2889   }
2890 
2891   const bool keepalive_permit_without_calls =
2892       channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)
2893           .value_or(is_client
2894                         ? g_default_client_keepalive_permit_without_calls
2895                         : g_default_server_keepalive_permit_without_calls);
2896   if (is_client) {
2897     g_default_client_keepalive_permit_without_calls =
2898         keepalive_permit_without_calls;
2899   } else {
2900     g_default_server_keepalive_permit_without_calls =
2901         keepalive_permit_without_calls;
2902   }
2903 
2904   grpc_core::Chttp2PingAbusePolicy::SetDefaults(channel_args);
2905   grpc_core::Chttp2PingRatePolicy::SetDefaults(channel_args);
2906 }
2907 
init_keepalive_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)2908 static void init_keepalive_ping(
2909     grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
2910   auto* tp = t.get();
2911   tp->combiner->Run(grpc_core::InitTransportClosure<init_keepalive_ping_locked>(
2912                         std::move(t), &tp->init_keepalive_ping_locked),
2913                     absl::OkStatus());
2914 }
2915 
init_keepalive_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,GRPC_UNUSED grpc_error_handle error)2916 static void init_keepalive_ping_locked(
2917     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2918     GRPC_UNUSED grpc_error_handle error) {
2919   GPR_DEBUG_ASSERT(error.ok());
2920   GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
2921   GPR_ASSERT(t->keepalive_ping_timer_handle != TaskHandle::kInvalid);
2922   t->keepalive_ping_timer_handle = TaskHandle::kInvalid;
2923   if (t->destroying || !t->closed_with_error.ok()) {
2924     t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2925   } else {
2926     if (t->keepalive_permit_without_calls || !t->stream_map.empty()) {
2927       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
2928       send_keepalive_ping_locked(t);
2929       grpc_chttp2_initiate_write(t.get(),
2930                                  GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
2931     } else {
2932       t->keepalive_ping_timer_handle =
2933           t->event_engine->RunAfter(t->keepalive_time, [t] {
2934             grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2935             grpc_core::ExecCtx exec_ctx;
2936             init_keepalive_ping(t);
2937           });
2938     }
2939   }
2940 }
2941 
finish_keepalive_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2942 static void finish_keepalive_ping(
2943     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2944     grpc_error_handle error) {
2945   auto* tp = t.get();
2946   tp->combiner->Run(
2947       grpc_core::InitTransportClosure<finish_keepalive_ping_locked>(
2948           std::move(t), &tp->finish_keepalive_ping_locked),
2949       error);
2950 }
2951 
finish_keepalive_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2952 static void finish_keepalive_ping_locked(
2953     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2954     grpc_error_handle error) {
2955   if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2956     if (error.ok()) {
2957       if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2958           GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2959         gpr_log(GPR_INFO, "%s: Finish keepalive ping",
2960                 std::string(t->peer_string.as_string_view()).c_str());
2961       }
2962       t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
2963       GPR_ASSERT(t->keepalive_ping_timer_handle == TaskHandle::kInvalid);
2964       t->keepalive_ping_timer_handle =
2965           t->event_engine->RunAfter(t->keepalive_time, [t] {
2966             grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2967             grpc_core::ExecCtx exec_ctx;
2968             init_keepalive_ping(t);
2969           });
2970     }
2971   }
2972 }
2973 
maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport * t)2974 static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t) {
2975   if (t->keepalive_ping_timer_handle != TaskHandle::kInvalid &&
2976       t->event_engine->Cancel(t->keepalive_ping_timer_handle)) {
2977     // Cancel succeeds, resets the keepalive ping timer. Note that we don't
2978     // need to Ref or Unref here since we still hold the Ref.
2979     if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2980         GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2981       gpr_log(GPR_INFO, "%s: Keepalive ping cancelled. Resetting timer.",
2982               std::string(t->peer_string.as_string_view()).c_str());
2983     }
2984     t->keepalive_ping_timer_handle =
2985         t->event_engine->RunAfter(t->keepalive_time, [t = t->Ref()]() mutable {
2986           grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2987           grpc_core::ExecCtx exec_ctx;
2988           init_keepalive_ping(std::move(t));
2989         });
2990   }
2991 }
2992 
2993 //
2994 // CALLBACK LOOP
2995 //
2996 
connectivity_state_set(grpc_chttp2_transport * t,grpc_connectivity_state state,const absl::Status & status,const char * reason)2997 static void connectivity_state_set(grpc_chttp2_transport* t,
2998                                    grpc_connectivity_state state,
2999                                    const absl::Status& status,
3000                                    const char* reason) {
3001   GRPC_CHTTP2_IF_TRACING(gpr_log(
3002       GPR_INFO, "transport %p set connectivity_state=%d; status=%s; reason=%s",
3003       t, state, status.ToString().c_str(), reason));
3004   t->state_tracker.SetState(state, status, reason);
3005 }
3006 
3007 //
3008 // POLLSET STUFF
3009 //
3010 
SetPollset(grpc_stream *,grpc_pollset * pollset)3011 void grpc_chttp2_transport::SetPollset(grpc_stream* /*gs*/,
3012                                        grpc_pollset* pollset) {
3013   grpc_endpoint_add_to_pollset(ep, pollset);
3014 }
3015 
SetPollsetSet(grpc_stream *,grpc_pollset_set * pollset_set)3016 void grpc_chttp2_transport::SetPollsetSet(grpc_stream* /*gs*/,
3017                                           grpc_pollset_set* pollset_set) {
3018   grpc_endpoint_add_to_pollset_set(ep, pollset_set);
3019 }
3020 
3021 //
3022 // RESOURCE QUOTAS
3023 //
3024 
post_benign_reclaimer(grpc_chttp2_transport * t)3025 static void post_benign_reclaimer(grpc_chttp2_transport* t) {
3026   if (!t->benign_reclaimer_registered) {
3027     t->benign_reclaimer_registered = true;
3028     t->memory_owner.PostReclaimer(
3029         grpc_core::ReclamationPass::kBenign,
3030         [t = t->Ref()](
3031             absl::optional<grpc_core::ReclamationSweep> sweep) mutable {
3032           if (sweep.has_value()) {
3033             auto* tp = t.get();
3034             tp->active_reclamation = std::move(*sweep);
3035             tp->combiner->Run(
3036                 grpc_core::InitTransportClosure<benign_reclaimer_locked>(
3037                     std::move(t), &tp->benign_reclaimer_locked),
3038                 absl::OkStatus());
3039           }
3040         });
3041   }
3042 }
3043 
post_destructive_reclaimer(grpc_chttp2_transport * t)3044 static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
3045   if (!t->destructive_reclaimer_registered) {
3046     t->destructive_reclaimer_registered = true;
3047     t->memory_owner.PostReclaimer(
3048         grpc_core::ReclamationPass::kDestructive,
3049         [t = t->Ref()](
3050             absl::optional<grpc_core::ReclamationSweep> sweep) mutable {
3051           if (sweep.has_value()) {
3052             auto* tp = t.get();
3053             tp->active_reclamation = std::move(*sweep);
3054             tp->combiner->Run(
3055                 grpc_core::InitTransportClosure<destructive_reclaimer_locked>(
3056                     std::move(t), &tp->destructive_reclaimer_locked),
3057                 absl::OkStatus());
3058           }
3059         });
3060   }
3061 }
3062 
benign_reclaimer_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)3063 static void benign_reclaimer_locked(
3064     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
3065     grpc_error_handle error) {
3066   if (error.ok() && t->stream_map.empty()) {
3067     // Channel with no active streams: send a goaway to try and make it
3068     // disconnect cleanly
3069     if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3070       gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
3071               std::string(t->peer_string.as_string_view()).c_str());
3072     }
3073     send_goaway(t.get(),
3074                 grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
3075                                    grpc_core::StatusIntProperty::kHttp2Error,
3076                                    GRPC_HTTP2_ENHANCE_YOUR_CALM),
3077                 /*immediate_disconnect_hint=*/true);
3078   } else if (error.ok() && GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3079     gpr_log(GPR_INFO,
3080             "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
3081             " streams",
3082             std::string(t->peer_string.as_string_view()).c_str(),
3083             t->stream_map.size());
3084   }
3085   t->benign_reclaimer_registered = false;
3086   if (error != absl::CancelledError()) {
3087     t->active_reclamation.Finish();
3088   }
3089 }
3090 
destructive_reclaimer_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)3091 static void destructive_reclaimer_locked(
3092     grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
3093     grpc_error_handle error) {
3094   t->destructive_reclaimer_registered = false;
3095   if (error.ok() && !t->stream_map.empty()) {
3096     // As stream_map is a hash map, this selects effectively a random stream.
3097     grpc_chttp2_stream* s = t->stream_map.begin()->second;
3098     if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3099       gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d",
3100               std::string(t->peer_string.as_string_view()).c_str(), s->id);
3101     }
3102     grpc_chttp2_cancel_stream(
3103         t.get(), s,
3104         grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
3105                            grpc_core::StatusIntProperty::kHttp2Error,
3106                            GRPC_HTTP2_ENHANCE_YOUR_CALM),
3107         false);
3108     if (!t->stream_map.empty()) {
3109       // Since we cancel one stream per destructive reclamation, if
3110       //   there are more streams left, we can immediately post a new
3111       //   reclaimer in case the resource quota needs to free more
3112       //   memory
3113       post_destructive_reclaimer(t.get());
3114     }
3115   }
3116   if (error != absl::CancelledError()) {
3117     t->active_reclamation.Finish();
3118   }
3119 }
3120 
3121 //
3122 // MONITORING
3123 //
3124 
grpc_chttp2_initiate_write_reason_string(grpc_chttp2_initiate_write_reason reason)3125 const char* grpc_chttp2_initiate_write_reason_string(
3126     grpc_chttp2_initiate_write_reason reason) {
3127   switch (reason) {
3128     case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
3129       return "INITIAL_WRITE";
3130     case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
3131       return "START_NEW_STREAM";
3132     case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
3133       return "SEND_MESSAGE";
3134     case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
3135       return "SEND_INITIAL_METADATA";
3136     case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
3137       return "SEND_TRAILING_METADATA";
3138     case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
3139       return "RETRY_SEND_PING";
3140     case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
3141       return "CONTINUE_PINGS";
3142     case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
3143       return "GOAWAY_SENT";
3144     case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
3145       return "RST_STREAM";
3146     case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
3147       return "CLOSE_FROM_API";
3148     case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
3149       return "STREAM_FLOW_CONTROL";
3150     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
3151       return "TRANSPORT_FLOW_CONTROL";
3152     case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
3153       return "SEND_SETTINGS";
3154     case GRPC_CHTTP2_INITIATE_WRITE_SETTINGS_ACK:
3155       return "SETTINGS_ACK";
3156     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
3157       return "FLOW_CONTROL_UNSTALLED_BY_SETTING";
3158     case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
3159       return "FLOW_CONTROL_UNSTALLED_BY_UPDATE";
3160     case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
3161       return "APPLICATION_PING";
3162     case GRPC_CHTTP2_INITIATE_WRITE_BDP_PING:
3163       return "BDP_PING";
3164     case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
3165       return "KEEPALIVE_PING";
3166     case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
3167       return "TRANSPORT_FLOW_CONTROL_UNSTALLED";
3168     case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
3169       return "PING_RESPONSE";
3170     case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
3171       return "FORCE_RST_STREAM";
3172   }
3173   GPR_UNREACHABLE_CODE(return "unknown");
3174 }
3175 
GetEndpoint()3176 grpc_endpoint* grpc_chttp2_transport::GetEndpoint() { return ep; }
3177 
SizeOfStream() const3178 size_t grpc_chttp2_transport::SizeOfStream() const {
3179   return sizeof(grpc_chttp2_stream);
3180 }
3181 
3182 bool grpc_chttp2_transport::
HackyDisableStreamOpBatchCoalescingInConnectedChannel() const3183     HackyDisableStreamOpBatchCoalescingInConnectedChannel() const {
3184   return false;
3185 }
3186 
GetTransportName() const3187 absl::string_view grpc_chttp2_transport::GetTransportName() const {
3188   return "chttp2";
3189 }
3190 
3191 grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>
grpc_chttp2_transport_get_socket_node(grpc_core::Transport * transport)3192 grpc_chttp2_transport_get_socket_node(grpc_core::Transport* transport) {
3193   grpc_chttp2_transport* t =
3194       reinterpret_cast<grpc_chttp2_transport*>(transport);
3195   return t->channelz_socket;
3196 }
3197 
grpc_create_chttp2_transport(const grpc_core::ChannelArgs & channel_args,grpc_endpoint * ep,bool is_client)3198 grpc_core::Transport* grpc_create_chttp2_transport(
3199     const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep,
3200     bool is_client) {
3201   return new grpc_chttp2_transport(channel_args, ep, is_client);
3202 }
3203 
grpc_chttp2_transport_start_reading(grpc_core::Transport * transport,grpc_slice_buffer * read_buffer,grpc_closure * notify_on_receive_settings,grpc_closure * notify_on_close)3204 void grpc_chttp2_transport_start_reading(
3205     grpc_core::Transport* transport, grpc_slice_buffer* read_buffer,
3206     grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close) {
3207   auto t = reinterpret_cast<grpc_chttp2_transport*>(transport)->Ref();
3208   if (read_buffer != nullptr) {
3209     grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
3210     gpr_free(read_buffer);
3211   }
3212   auto* tp = t.get();
3213   tp->combiner->Run(
3214       grpc_core::NewClosure([t = std::move(t), notify_on_receive_settings,
3215                              notify_on_close](grpc_error_handle) mutable {
3216         if (!t->closed_with_error.ok()) {
3217           if (notify_on_receive_settings != nullptr) {
3218             grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_receive_settings,
3219                                     t->closed_with_error);
3220           }
3221           if (notify_on_close != nullptr) {
3222             grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_close,
3223                                     t->closed_with_error);
3224           }
3225           return;
3226         }
3227         t->notify_on_receive_settings = notify_on_receive_settings;
3228         t->notify_on_close = notify_on_close;
3229         read_action_locked(std::move(t), absl::OkStatus());
3230       }),
3231       absl::OkStatus());
3232 }
3233