1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
20
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <string.h>
24
25 #include <algorithm>
26 #include <atomic>
27 #include <cstddef>
28 #include <limits>
29 #include <memory>
30 #include <new>
31 #include <string>
32 #include <type_traits>
33 #include <utility>
34 #include <vector>
35
36 #include "absl/base/attributes.h"
37 #include "absl/container/flat_hash_map.h"
38 #include "absl/hash/hash.h"
39 #include "absl/meta/type_traits.h"
40 #include "absl/random/random.h"
41 #include "absl/status/status.h"
42 #include "absl/strings/cord.h"
43 #include "absl/strings/str_cat.h"
44 #include "absl/strings/str_format.h"
45 #include "absl/strings/string_view.h"
46 #include "absl/types/optional.h"
47 #include "absl/types/variant.h"
48
49 #include <grpc/event_engine/event_engine.h>
50 #include <grpc/grpc.h>
51 #include <grpc/impl/channel_arg_names.h>
52 #include <grpc/impl/connectivity_state.h>
53 #include <grpc/slice_buffer.h>
54 #include <grpc/status.h>
55 #include <grpc/support/alloc.h>
56 #include <grpc/support/log.h>
57 #include <grpc/support/time.h>
58
59 #include "src/core/ext/transport/chttp2/transport/context_list_entry.h"
60 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
61 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
62 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
63 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
64 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
65 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
66 #include "src/core/ext/transport/chttp2/transport/http_trace.h"
67 #include "src/core/ext/transport/chttp2/transport/internal.h"
68 #include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
69 #include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h"
70 #include "src/core/ext/transport/chttp2/transport/ping_abuse_policy.h"
71 #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
72 #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
73 #include "src/core/ext/transport/chttp2/transport/varint.h"
74 #include "src/core/ext/transport/chttp2/transport/write_size_policy.h"
75 #include "src/core/lib/channel/call_tracer.h"
76 #include "src/core/lib/channel/channel_args.h"
77 #include "src/core/lib/channel/context.h"
78 #include "src/core/lib/channel/tcp_tracer.h"
79 #include "src/core/lib/debug/stats.h"
80 #include "src/core/lib/debug/stats_data.h"
81 #include "src/core/lib/experiments/experiments.h"
82 #include "src/core/lib/gpr/string.h"
83 #include "src/core/lib/gpr/useful.h"
84 #include "src/core/lib/gprpp/bitset.h"
85 #include "src/core/lib/gprpp/crash.h"
86 #include "src/core/lib/gprpp/debug_location.h"
87 #include "src/core/lib/gprpp/ref_counted.h"
88 #include "src/core/lib/gprpp/status_helper.h"
89 #include "src/core/lib/gprpp/time.h"
90 #include "src/core/lib/http/parser.h"
91 #include "src/core/lib/iomgr/combiner.h"
92 #include "src/core/lib/iomgr/error.h"
93 #include "src/core/lib/iomgr/exec_ctx.h"
94 #include "src/core/lib/iomgr/iomgr_fwd.h"
95 #include "src/core/lib/iomgr/port.h"
96 #include "src/core/lib/promise/poll.h"
97 #include "src/core/lib/resource_quota/arena.h"
98 #include "src/core/lib/resource_quota/memory_quota.h"
99 #include "src/core/lib/resource_quota/resource_quota.h"
100 #include "src/core/lib/resource_quota/trace.h"
101 #include "src/core/lib/slice/slice.h"
102 #include "src/core/lib/slice/slice_buffer.h"
103 #include "src/core/lib/slice/slice_internal.h"
104 #include "src/core/lib/transport/bdp_estimator.h"
105 #include "src/core/lib/transport/connectivity_state.h"
106 #include "src/core/lib/transport/error_utils.h"
107 #include "src/core/lib/transport/http2_errors.h"
108 #include "src/core/lib/transport/metadata_batch.h"
109 #include "src/core/lib/transport/metadata_info.h"
110 #include "src/core/lib/transport/status_conversion.h"
111 #include "src/core/lib/transport/transport.h"
112
113 #ifdef GRPC_POSIX_SOCKET_TCP
114 #include "src/core/lib/iomgr/ev_posix.h"
115 #endif
116
117 #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
118 #define MAX_WINDOW 0x7fffffffu
119 #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024)
120
121 #define KEEPALIVE_TIME_BACKOFF_MULTIPLIER 2
122
123 #define DEFAULT_MAX_PENDING_INDUCED_FRAMES 10000
124
125 #define GRPC_ARG_HTTP2_PING_ON_RST_STREAM_PERCENT \
126 "grpc.http2.ping_on_rst_stream_percent"
127
128 static grpc_core::Duration g_default_client_keepalive_time =
129 grpc_core::Duration::Infinity();
130 static grpc_core::Duration g_default_client_keepalive_timeout =
131 grpc_core::Duration::Seconds(20);
132 static grpc_core::Duration g_default_server_keepalive_time =
133 grpc_core::Duration::Hours(2);
134 static grpc_core::Duration g_default_server_keepalive_timeout =
135 grpc_core::Duration::Seconds(20);
136 static bool g_default_client_keepalive_permit_without_calls = false;
137 static bool g_default_server_keepalive_permit_without_calls = false;
138
139 // EXPERIMENTAL: control tarpitting in chttp2
140 #define GRPC_ARG_HTTP_ALLOW_TARPIT "grpc.http.tarpit"
141 #define GRPC_ARG_HTTP_TARPIT_MIN_DURATION_MS "grpc.http.tarpit_min_duration_ms"
142 #define GRPC_ARG_HTTP_TARPIT_MAX_DURATION_MS "grpc.http.tarpit_max_duration_ms"
143
144 #define MAX_CLIENT_STREAM_ID 0x7fffffffu
145 grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive");
146 grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
147 "chttp2_refcount");
148
149 // forward declarations of various callbacks that we'll build closures around
150 static void write_action_begin_locked(
151 grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
152 static void write_action(grpc_chttp2_transport* t);
153 static void write_action_end(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
154 grpc_error_handle error);
155 static void write_action_end_locked(
156 grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
157
158 static void read_action(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
159 grpc_error_handle error);
160 static void read_action_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
161 grpc_error_handle error);
162 static void continue_read_action_locked(
163 grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
164
165 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
166 grpc_error_handle error, bool tarpit);
167
168 // Start new streams that have been created if we can
169 static void maybe_start_some_streams(grpc_chttp2_transport* t);
170
171 static void connectivity_state_set(grpc_chttp2_transport* t,
172 grpc_connectivity_state state,
173 const absl::Status& status,
174 const char* reason);
175
176 static void benign_reclaimer_locked(
177 grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
178 static void destructive_reclaimer_locked(
179 grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
180
181 static void post_benign_reclaimer(grpc_chttp2_transport* t);
182 static void post_destructive_reclaimer(grpc_chttp2_transport* t);
183
184 static void close_transport_locked(grpc_chttp2_transport* t,
185 grpc_error_handle error);
186 static void end_all_the_calls(grpc_chttp2_transport* t,
187 grpc_error_handle error);
188
189 static void start_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
190 grpc_error_handle error);
191 static void finish_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
192 grpc_error_handle error);
193 static void start_bdp_ping_locked(
194 grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
195 static void finish_bdp_ping_locked(
196 grpc_core::RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle error);
197 static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t);
198 static void next_bdp_ping_timer_expired_locked(
199 grpc_core::RefCountedPtr<grpc_chttp2_transport> tp,
200 GRPC_UNUSED grpc_error_handle error);
201
202 static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error);
203 static void send_ping_locked(grpc_chttp2_transport* t,
204 grpc_closure* on_initiate, grpc_closure* on_ack);
205 static void retry_initiate_ping_locked(
206 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
207 GRPC_UNUSED grpc_error_handle error);
208
209 // keepalive-relevant functions
210 static void init_keepalive_ping(
211 grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
212 static void init_keepalive_ping_locked(
213 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
214 GRPC_UNUSED grpc_error_handle error);
215 static void finish_keepalive_ping(
216 grpc_core::RefCountedPtr<grpc_chttp2_transport> t, grpc_error_handle error);
217 static void finish_keepalive_ping_locked(
218 grpc_core::RefCountedPtr<grpc_chttp2_transport> t, grpc_error_handle error);
219 static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t);
220
221 static void send_goaway(grpc_chttp2_transport* t, grpc_error_handle error,
222 bool immediate_disconnect_hint);
223
224 // Timeout for getting an ack back on settings changes
225 #define GRPC_ARG_SETTINGS_TIMEOUT "grpc.http2.settings_timeout"
226
227 namespace {
228
229 using TaskHandle = ::grpc_event_engine::experimental::EventEngine::TaskHandle;
230
CallTracerIfSampled(grpc_chttp2_stream * s)231 grpc_core::CallTracerInterface* CallTracerIfSampled(grpc_chttp2_stream* s) {
232 if (s->context == nullptr || !grpc_core::IsTraceRecordCallopsEnabled()) {
233 return nullptr;
234 }
235 auto* call_tracer = static_cast<grpc_core::CallTracerInterface*>(
236 static_cast<grpc_call_context_element*>(
237 s->context)[GRPC_CONTEXT_CALL_TRACER_ANNOTATION_INTERFACE]
238 .value);
239 if (call_tracer == nullptr || !call_tracer->IsSampled()) {
240 return nullptr;
241 }
242 return call_tracer;
243 }
244
TcpTracerIfSampled(grpc_chttp2_stream * s)245 std::shared_ptr<grpc_core::TcpTracerInterface> TcpTracerIfSampled(
246 grpc_chttp2_stream* s) {
247 if (s->context == nullptr || !grpc_core::IsTraceRecordCallopsEnabled()) {
248 return nullptr;
249 }
250 auto* call_attempt_tracer = static_cast<grpc_core::CallTracerInterface*>(
251 static_cast<grpc_call_context_element*>(
252 s->context)[GRPC_CONTEXT_CALL_TRACER]
253 .value);
254 if (call_attempt_tracer == nullptr || !call_attempt_tracer->IsSampled()) {
255 return nullptr;
256 }
257 return call_attempt_tracer->StartNewTcpTrace();
258 }
259
260 grpc_core::WriteTimestampsCallback g_write_timestamps_callback = nullptr;
261 grpc_core::CopyContextFn g_get_copied_context_fn = nullptr;
262 } // namespace
263
264 namespace grpc_core {
265
266 namespace {
267 // Initialize a grpc_closure \a c to call \a Fn with \a t and \a error. Holds
268 // the passed in reference to \a t until it's moved into Fn.
269 template <void (*Fn)(RefCountedPtr<grpc_chttp2_transport>, grpc_error_handle)>
InitTransportClosure(RefCountedPtr<grpc_chttp2_transport> t,grpc_closure * c)270 grpc_closure* InitTransportClosure(RefCountedPtr<grpc_chttp2_transport> t,
271 grpc_closure* c) {
272 GRPC_CLOSURE_INIT(
273 c,
274 [](void* tp, grpc_error_handle error) {
275 Fn(RefCountedPtr<grpc_chttp2_transport>(
276 static_cast<grpc_chttp2_transport*>(tp)),
277 std::move(error));
278 },
279 t.release(), nullptr);
280 return c;
281 }
282 } // namespace
283
284 namespace {
285 TestOnlyGlobalHttp2TransportInitCallback test_only_init_callback = nullptr;
286 TestOnlyGlobalHttp2TransportDestructCallback test_only_destruct_callback =
287 nullptr;
288 bool test_only_disable_transient_failure_state_notification = false;
289 } // namespace
290
TestOnlySetGlobalHttp2TransportInitCallback(TestOnlyGlobalHttp2TransportInitCallback callback)291 void TestOnlySetGlobalHttp2TransportInitCallback(
292 TestOnlyGlobalHttp2TransportInitCallback callback) {
293 test_only_init_callback = callback;
294 }
295
TestOnlySetGlobalHttp2TransportDestructCallback(TestOnlyGlobalHttp2TransportDestructCallback callback)296 void TestOnlySetGlobalHttp2TransportDestructCallback(
297 TestOnlyGlobalHttp2TransportDestructCallback callback) {
298 test_only_destruct_callback = callback;
299 }
300
TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(bool disable)301 void TestOnlyGlobalHttp2TransportDisableTransientFailureStateNotification(
302 bool disable) {
303 test_only_disable_transient_failure_state_notification = disable;
304 }
305
GrpcHttp2SetWriteTimestampsCallback(WriteTimestampsCallback fn)306 void GrpcHttp2SetWriteTimestampsCallback(WriteTimestampsCallback fn) {
307 g_write_timestamps_callback = fn;
308 }
309
GrpcHttp2SetCopyContextFn(CopyContextFn fn)310 void GrpcHttp2SetCopyContextFn(CopyContextFn fn) {
311 g_get_copied_context_fn = fn;
312 }
313
GrpcHttp2GetWriteTimestampsCallback()314 WriteTimestampsCallback GrpcHttp2GetWriteTimestampsCallback() {
315 return g_write_timestamps_callback;
316 }
317
GrpcHttp2GetCopyContextFn()318 CopyContextFn GrpcHttp2GetCopyContextFn() { return g_get_copied_context_fn; }
319
320 // For each entry in the passed ContextList, it executes the function set using
321 // GrpcHttp2SetWriteTimestampsCallback method with each context in the list
322 // and \a ts. It also deletes/frees up the passed ContextList after this
323 // operation.
ForEachContextListEntryExecute(void * arg,Timestamps * ts,grpc_error_handle error)324 void ForEachContextListEntryExecute(void* arg, Timestamps* ts,
325 grpc_error_handle error) {
326 ContextList* context_list = reinterpret_cast<ContextList*>(arg);
327 if (!context_list) {
328 return;
329 }
330 for (auto it = context_list->begin(); it != context_list->end(); it++) {
331 ContextListEntry& entry = (*it);
332 if (ts) {
333 ts->byte_offset = static_cast<uint32_t>(entry.ByteOffsetInStream());
334 }
335 g_write_timestamps_callback(entry.TraceContext(), ts, error);
336 }
337 delete context_list;
338 }
339
HttpAnnotation(Type type,gpr_timespec time)340 HttpAnnotation::HttpAnnotation(Type type, gpr_timespec time)
341 : CallTracerAnnotationInterface::Annotation(
342 CallTracerAnnotationInterface::AnnotationType::kHttpTransport),
343 type_(type),
344 time_(time) {}
345
ToString() const346 std::string HttpAnnotation::ToString() const {
347 std::string s = "HttpAnnotation type: ";
348 switch (type_) {
349 case Type::kStart:
350 absl::StrAppend(&s, "Start");
351 break;
352 case Type::kHeadWritten:
353 absl::StrAppend(&s, "HeadWritten");
354 break;
355 case Type::kEnd:
356 absl::StrAppend(&s, "End");
357 break;
358 default:
359 absl::StrAppend(&s, "Unknown");
360 }
361 absl::StrAppend(&s, " time: ", gpr_format_timespec(time_));
362 if (transport_stats_.has_value()) {
363 absl::StrAppend(&s, " transport:[", transport_stats_->ToString(), "]");
364 }
365 if (stream_stats_.has_value()) {
366 absl::StrAppend(&s, " stream:[", stream_stats_->ToString(), "]");
367 }
368 return s;
369 }
370
371 } // namespace grpc_core
372
373 //
374 // CONSTRUCTION/DESTRUCTION/REFCOUNTING
375 //
376
~grpc_chttp2_transport()377 grpc_chttp2_transport::~grpc_chttp2_transport() {
378 size_t i;
379
380 cancel_pings(this, GRPC_ERROR_CREATE("Transport destroyed"));
381
382 event_engine.reset();
383
384 if (channelz_socket != nullptr) {
385 channelz_socket.reset();
386 }
387
388 grpc_endpoint_destroy(ep);
389
390 grpc_slice_buffer_destroy(&qbuf);
391
392 grpc_error_handle error = GRPC_ERROR_CREATE("Transport destroyed");
393 // ContextList::Execute follows semantics of a callback function and does not
394 // take a ref on error
395 if (cl != nullptr) {
396 grpc_core::ForEachContextListEntryExecute(cl, nullptr, error);
397 }
398 cl = nullptr;
399
400 grpc_slice_buffer_destroy(&read_buffer);
401 grpc_chttp2_goaway_parser_destroy(&goaway_parser);
402
403 for (i = 0; i < STREAM_LIST_COUNT; i++) {
404 GPR_ASSERT(lists[i].head == nullptr);
405 GPR_ASSERT(lists[i].tail == nullptr);
406 }
407
408 GPR_ASSERT(stream_map.empty());
409 GRPC_COMBINER_UNREF(combiner, "chttp2_transport");
410
411 while (write_cb_pool) {
412 grpc_chttp2_write_cb* next = write_cb_pool->next;
413 gpr_free(write_cb_pool);
414 write_cb_pool = next;
415 }
416
417 gpr_free(ping_acks);
418 if (grpc_core::test_only_destruct_callback != nullptr) {
419 grpc_core::test_only_destruct_callback();
420 }
421 }
422
read_channel_args(grpc_chttp2_transport * t,const grpc_core::ChannelArgs & channel_args,bool is_client)423 static void read_channel_args(grpc_chttp2_transport* t,
424 const grpc_core::ChannelArgs& channel_args,
425 bool is_client) {
426 const int initial_sequence_number =
427 channel_args.GetInt(GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER).value_or(-1);
428 if (initial_sequence_number > 0) {
429 if ((t->next_stream_id & 1) != (initial_sequence_number & 1)) {
430 gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
431 GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER, t->next_stream_id & 1,
432 is_client ? "client" : "server");
433 } else {
434 t->next_stream_id = static_cast<uint32_t>(initial_sequence_number);
435 }
436 }
437
438 const int max_hpack_table_size =
439 channel_args.GetInt(GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER).value_or(-1);
440 if (max_hpack_table_size >= 0) {
441 t->hpack_compressor.SetMaxUsableSize(max_hpack_table_size);
442 }
443
444 t->write_buffer_size =
445 std::max(0, channel_args.GetInt(GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE)
446 .value_or(grpc_core::chttp2::kDefaultWindow));
447 t->keepalive_time =
448 std::max(grpc_core::Duration::Milliseconds(1),
449 channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIME_MS)
450 .value_or(t->is_client ? g_default_client_keepalive_time
451 : g_default_server_keepalive_time));
452 t->keepalive_timeout = std::max(
453 grpc_core::Duration::Zero(),
454 channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIMEOUT_MS)
455 .value_or(t->keepalive_time == grpc_core::Duration::Infinity()
456 ? grpc_core::Duration::Infinity()
457 : (t->is_client ? g_default_client_keepalive_timeout
458 : g_default_server_keepalive_timeout)));
459 t->ping_timeout = std::max(
460 grpc_core::Duration::Zero(),
461 channel_args.GetDurationFromIntMillis(GRPC_ARG_PING_TIMEOUT_MS)
462 .value_or(t->keepalive_time == grpc_core::Duration::Infinity()
463 ? grpc_core::Duration::Infinity()
464 : grpc_core::Duration::Minutes(1)));
465 if (t->is_client) {
466 t->keepalive_permit_without_calls =
467 channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)
468 .value_or(grpc_core::IsKeepaliveFixEnabled()
469 ? g_default_client_keepalive_permit_without_calls
470 : false);
471 } else {
472 t->keepalive_permit_without_calls =
473 channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)
474 .value_or(grpc_core::IsKeepaliveServerFixEnabled()
475 ? g_default_server_keepalive_permit_without_calls
476 : false);
477 }
478
479 t->settings_timeout =
480 channel_args.GetDurationFromIntMillis(GRPC_ARG_SETTINGS_TIMEOUT)
481 .value_or(std::max(t->keepalive_timeout * 2,
482 grpc_core::Duration::Minutes(1)));
483
484 // Only send the prefered rx frame size http2 setting if we are instructed
485 // to auto size the buffers allocated at tcp level and we also can adjust
486 // sending frame size.
487 t->enable_preferred_rx_crypto_frame_advertisement =
488 channel_args
489 .GetBool(GRPC_ARG_EXPERIMENTAL_HTTP2_PREFERRED_CRYPTO_FRAME_SIZE)
490 .value_or(false);
491
492 const auto max_requests_per_read =
493 channel_args.GetInt("grpc.http2.max_requests_per_read");
494 if (max_requests_per_read.has_value()) {
495 t->max_requests_per_read =
496 grpc_core::Clamp(*max_requests_per_read, 1, 10000);
497 } else {
498 t->max_requests_per_read = 32;
499 }
500
501 if (channel_args.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
502 .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT)) {
503 t->channelz_socket =
504 grpc_core::MakeRefCounted<grpc_core::channelz::SocketNode>(
505 std::string(grpc_endpoint_get_local_address(t->ep)),
506 std::string(t->peer_string.as_string_view()),
507 absl::StrCat(t->GetTransportName(), " ",
508 t->peer_string.as_string_view()),
509 channel_args
510 .GetObjectRef<grpc_core::channelz::SocketNode::Security>());
511 }
512
513 t->ack_pings = channel_args.GetBool("grpc.http2.ack_pings").value_or(true);
514
515 t->allow_tarpit =
516 channel_args.GetBool(GRPC_ARG_HTTP_ALLOW_TARPIT).value_or(true);
517 t->min_tarpit_duration_ms =
518 channel_args
519 .GetDurationFromIntMillis(GRPC_ARG_HTTP_TARPIT_MIN_DURATION_MS)
520 .value_or(grpc_core::Duration::Milliseconds(100))
521 .millis();
522 t->max_tarpit_duration_ms =
523 channel_args
524 .GetDurationFromIntMillis(GRPC_ARG_HTTP_TARPIT_MAX_DURATION_MS)
525 .value_or(grpc_core::Duration::Seconds(1))
526 .millis();
527 t->max_header_list_size_soft_limit =
528 grpc_core::GetSoftLimitFromChannelArgs(channel_args);
529
530 int value;
531 if (!is_client) {
532 value = channel_args.GetInt(GRPC_ARG_MAX_CONCURRENT_STREAMS).value_or(-1);
533 if (value >= 0) {
534 t->settings.mutable_local().SetMaxConcurrentStreams(value);
535 t->max_concurrent_streams_policy.SetTarget(value);
536 }
537 } else if (channel_args.Contains(GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
538 gpr_log(GPR_DEBUG, "%s is not available on clients",
539 GRPC_ARG_MAX_CONCURRENT_STREAMS);
540 }
541 value =
542 channel_args.GetInt(GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER).value_or(-1);
543 if (value >= 0) {
544 t->settings.mutable_local().SetHeaderTableSize(value);
545 }
546 t->settings.mutable_local().SetMaxHeaderListSize(
547 grpc_core::GetHardLimitFromChannelArgs(channel_args));
548 value = channel_args.GetInt(GRPC_ARG_HTTP2_MAX_FRAME_SIZE).value_or(-1);
549 if (value >= 0) {
550 t->settings.mutable_local().SetMaxFrameSize(value);
551 }
552 value =
553 channel_args.GetInt(GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES).value_or(-1);
554 if (value >= 0) {
555 t->settings.mutable_local().SetInitialWindowSize(value);
556 }
557 value = channel_args.GetInt(GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY).value_or(-1);
558 if (value >= 0) {
559 t->settings.mutable_local().SetAllowTrueBinaryMetadata(value != 0);
560 }
561
562 if (t->enable_preferred_rx_crypto_frame_advertisement) {
563 t->settings.mutable_local().SetPreferredReceiveCryptoMessageSize(INT_MAX);
564 }
565
566 t->ping_on_rst_stream_percent = grpc_core::Clamp(
567 channel_args.GetInt(GRPC_ARG_HTTP2_PING_ON_RST_STREAM_PERCENT)
568 .value_or(1),
569 0, 100);
570
571 t->max_concurrent_streams_overload_protection =
572 channel_args.GetBool(GRPC_ARG_MAX_CONCURRENT_STREAMS_OVERLOAD_PROTECTION)
573 .value_or(true);
574 }
575
init_keepalive_pings_if_enabled_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,GRPC_UNUSED grpc_error_handle error)576 static void init_keepalive_pings_if_enabled_locked(
577 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
578 GRPC_UNUSED grpc_error_handle error) {
579 GPR_DEBUG_ASSERT(error.ok());
580 if (t->keepalive_time != grpc_core::Duration::Infinity()) {
581 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
582 t->keepalive_ping_timer_handle =
583 t->event_engine->RunAfter(t->keepalive_time, [t = t->Ref()]() mutable {
584 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
585 grpc_core::ExecCtx exec_ctx;
586 init_keepalive_ping(std::move(t));
587 });
588 } else {
589 // Use GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED to indicate there are no
590 // inflight keepalive timers
591 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED;
592 }
593 }
594
grpc_chttp2_transport(const grpc_core::ChannelArgs & channel_args,grpc_endpoint * ep,bool is_client)595 grpc_chttp2_transport::grpc_chttp2_transport(
596 const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep,
597 bool is_client)
598 : grpc_core::RefCounted<grpc_chttp2_transport,
599 grpc_core::NonPolymorphicRefCount>(
600 GRPC_TRACE_FLAG_ENABLED(grpc_trace_chttp2_refcount)
601 ? "chttp2_refcount"
602 : nullptr),
603 ep(ep),
604 peer_string(
605 grpc_core::Slice::FromCopiedString(grpc_endpoint_get_peer(ep))),
606 memory_owner(channel_args.GetObject<grpc_core::ResourceQuota>()
607 ->memory_quota()
608 ->CreateMemoryOwner()),
609 self_reservation(
610 memory_owner.MakeReservation(sizeof(grpc_chttp2_transport))),
611 event_engine(
612 channel_args
613 .GetObjectRef<grpc_event_engine::experimental::EventEngine>()),
614 combiner(grpc_combiner_create(event_engine)),
615 state_tracker(is_client ? "client_transport" : "server_transport",
616 GRPC_CHANNEL_READY),
617 next_stream_id(is_client ? 1 : 2),
618 ping_abuse_policy(channel_args),
619 ping_rate_policy(channel_args, is_client),
620 flow_control(
621 peer_string.as_string_view(),
622 channel_args.GetBool(GRPC_ARG_HTTP2_BDP_PROBE).value_or(true),
623 &memory_owner),
624 deframe_state(is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0),
625 is_client(is_client) {
626 cl = new grpc_core::ContextList();
627 GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
628 GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
629
630 grpc_slice_buffer_init(&read_buffer);
631 if (is_client) {
632 grpc_slice_buffer_add(
633 outbuf.c_slice_buffer(),
634 grpc_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
635 }
636 grpc_slice_buffer_init(&qbuf);
637 grpc_chttp2_goaway_parser_init(&goaway_parser);
638
639 // configure http2 the way we like it
640 if (is_client) {
641 settings.mutable_local().SetEnablePush(false);
642 settings.mutable_local().SetMaxConcurrentStreams(0);
643 }
644 settings.mutable_local().SetMaxHeaderListSize(DEFAULT_MAX_HEADER_LIST_SIZE);
645 settings.mutable_local().SetAllowTrueBinaryMetadata(true);
646
647 read_channel_args(this, channel_args, is_client);
648
649 // Initially allow *UP TO* MAX_CONCURRENT_STREAMS incoming before we start
650 // blanket cancelling them.
651 num_incoming_streams_before_settings_ack =
652 settings.local().max_concurrent_streams();
653
654 grpc_core::ExecCtx exec_ctx;
655 combiner->Run(
656 grpc_core::InitTransportClosure<init_keepalive_pings_if_enabled_locked>(
657 Ref(), &init_keepalive_ping_locked),
658 absl::OkStatus());
659
660 if (flow_control.bdp_probe()) {
661 bdp_ping_blocked = true;
662 grpc_chttp2_act_on_flowctl_action(flow_control.PeriodicUpdate(), this,
663 nullptr);
664 }
665
666 grpc_chttp2_initiate_write(this, GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE);
667 post_benign_reclaimer(this);
668 if (grpc_core::test_only_init_callback != nullptr) {
669 grpc_core::test_only_init_callback();
670 }
671
672 #ifdef GRPC_POSIX_SOCKET_TCP
673 closure_barrier_may_cover_write =
674 grpc_event_engine_run_in_background() &&
675 grpc_core::IsScheduleCancellationOverWriteEnabled()
676 ? 0
677 : CLOSURE_BARRIER_MAY_COVER_WRITE;
678 #endif
679 }
680
destroy_transport_locked(void * tp,grpc_error_handle)681 static void destroy_transport_locked(void* tp, grpc_error_handle /*error*/) {
682 grpc_core::RefCountedPtr<grpc_chttp2_transport> t(
683 static_cast<grpc_chttp2_transport*>(tp));
684 t->destroying = 1;
685 close_transport_locked(
686 t.get(),
687 grpc_error_set_int(GRPC_ERROR_CREATE("Transport destroyed"),
688 grpc_core::StatusIntProperty::kOccurredDuringWrite,
689 t->write_state));
690 t->memory_owner.Reset();
691 }
692
Orphan()693 void grpc_chttp2_transport::Orphan() {
694 combiner->Run(GRPC_CLOSURE_CREATE(destroy_transport_locked, this, nullptr),
695 absl::OkStatus());
696 }
697
close_transport_locked(grpc_chttp2_transport * t,grpc_error_handle error)698 static void close_transport_locked(grpc_chttp2_transport* t,
699 grpc_error_handle error) {
700 end_all_the_calls(t, error);
701 cancel_pings(t, error);
702 if (t->closed_with_error.ok()) {
703 if (!grpc_error_has_clear_grpc_status(error)) {
704 error =
705 grpc_error_set_int(error, grpc_core::StatusIntProperty::kRpcStatus,
706 GRPC_STATUS_UNAVAILABLE);
707 }
708 if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
709 if (t->close_transport_on_writes_finished.ok()) {
710 t->close_transport_on_writes_finished =
711 GRPC_ERROR_CREATE("Delayed close due to in-progress write");
712 }
713 t->close_transport_on_writes_finished =
714 grpc_error_add_child(t->close_transport_on_writes_finished, error);
715 return;
716 }
717 GPR_ASSERT(!error.ok());
718 t->closed_with_error = error;
719 connectivity_state_set(t, GRPC_CHANNEL_SHUTDOWN, absl::Status(),
720 "close_transport");
721 if (t->keepalive_ping_timeout_handle != TaskHandle::kInvalid) {
722 t->event_engine->Cancel(std::exchange(t->keepalive_ping_timeout_handle,
723 TaskHandle::kInvalid));
724 }
725 if (t->settings_ack_watchdog != TaskHandle::kInvalid) {
726 t->event_engine->Cancel(
727 std::exchange(t->settings_ack_watchdog, TaskHandle::kInvalid));
728 }
729 if (t->delayed_ping_timer_handle != TaskHandle::kInvalid &&
730 t->event_engine->Cancel(t->delayed_ping_timer_handle)) {
731 t->delayed_ping_timer_handle = TaskHandle::kInvalid;
732 }
733 if (t->next_bdp_ping_timer_handle != TaskHandle::kInvalid &&
734 t->event_engine->Cancel(t->next_bdp_ping_timer_handle)) {
735 t->next_bdp_ping_timer_handle = TaskHandle::kInvalid;
736 }
737 switch (t->keepalive_state) {
738 case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING:
739 if (t->keepalive_ping_timer_handle != TaskHandle::kInvalid &&
740 t->event_engine->Cancel(t->keepalive_ping_timer_handle)) {
741 t->keepalive_ping_timer_handle = TaskHandle::kInvalid;
742 }
743 break;
744 case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING:
745 if (t->keepalive_ping_timer_handle != TaskHandle::kInvalid &&
746 t->event_engine->Cancel(t->keepalive_ping_timer_handle)) {
747 t->keepalive_ping_timer_handle = TaskHandle::kInvalid;
748 }
749 break;
750 case GRPC_CHTTP2_KEEPALIVE_STATE_DYING:
751 case GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED:
752 // keepalive timers are not set in these two states
753 break;
754 }
755
756 // flush writable stream list to avoid dangling references
757 grpc_chttp2_stream* s;
758 while (grpc_chttp2_list_pop_writable_stream(t, &s)) {
759 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:close");
760 }
761 GPR_ASSERT(t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE);
762 grpc_endpoint_shutdown(t->ep, error);
763 }
764 if (t->notify_on_receive_settings != nullptr) {
765 grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_receive_settings,
766 error);
767 t->notify_on_receive_settings = nullptr;
768 }
769 if (t->notify_on_close != nullptr) {
770 grpc_core::ExecCtx::Run(DEBUG_LOCATION, t->notify_on_close, error);
771 t->notify_on_close = nullptr;
772 }
773 }
774
775 #ifndef NDEBUG
grpc_chttp2_stream_ref(grpc_chttp2_stream * s,const char * reason)776 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason) {
777 grpc_stream_ref(s->refcount, reason);
778 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s,const char * reason)779 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason) {
780 grpc_stream_unref(s->refcount, reason);
781 }
782 #else
grpc_chttp2_stream_ref(grpc_chttp2_stream * s)783 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s) {
784 grpc_stream_ref(s->refcount);
785 }
grpc_chttp2_stream_unref(grpc_chttp2_stream * s)786 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s) {
787 grpc_stream_unref(s->refcount);
788 }
789 #endif
790
grpc_chttp2_stream(grpc_chttp2_transport * t,grpc_stream_refcount * refcount,const void * server_data)791 grpc_chttp2_stream::grpc_chttp2_stream(grpc_chttp2_transport* t,
792 grpc_stream_refcount* refcount,
793 const void* server_data)
794 : t(t->Ref()),
795 refcount([refcount]() {
796 // We reserve one 'active stream' that's dropped when the stream is
797 // read-closed. The others are for Chttp2IncomingByteStreams that are
798 // actively reading
799 // We do this here to avoid cache misses.
800 #ifndef NDEBUG
801 grpc_stream_ref(refcount, "chttp2");
802 #else
803 grpc_stream_ref(refcount);
804 #endif
805 return refcount;
806 }()),
807 flow_control(&t->flow_control) {
808 t->streams_allocated.fetch_add(1, std::memory_order_relaxed);
809 if (server_data) {
810 id = static_cast<uint32_t>(reinterpret_cast<uintptr_t>(server_data));
811 if (grpc_http_trace.enabled()) {
812 gpr_log(GPR_DEBUG, "HTTP:%p/%p creating accept stream %d [from %p]", t,
813 this, id, server_data);
814 }
815 *t->accepting_stream = this;
816 t->stream_map.emplace(id, this);
817 post_destructive_reclaimer(t);
818 }
819
820 grpc_slice_buffer_init(&frame_storage);
821 grpc_slice_buffer_init(&flow_controlled_buffer);
822 }
823
~grpc_chttp2_stream()824 grpc_chttp2_stream::~grpc_chttp2_stream() {
825 t->streams_allocated.fetch_sub(1, std::memory_order_relaxed);
826 grpc_chttp2_list_remove_stalled_by_stream(t.get(), this);
827 grpc_chttp2_list_remove_stalled_by_transport(t.get(), this);
828
829 if (t->channelz_socket != nullptr) {
830 if ((t->is_client && eos_received) || (!t->is_client && eos_sent)) {
831 t->channelz_socket->RecordStreamSucceeded();
832 } else {
833 t->channelz_socket->RecordStreamFailed();
834 }
835 }
836
837 GPR_ASSERT((write_closed && read_closed) || id == 0);
838 if (id != 0) {
839 GPR_ASSERT(t->stream_map.count(id) == 0);
840 }
841
842 grpc_slice_buffer_destroy(&frame_storage);
843
844 for (int i = 0; i < STREAM_LIST_COUNT; i++) {
845 if (GPR_UNLIKELY(included.is_set(i))) {
846 grpc_core::Crash(absl::StrFormat("%s stream %d still included in list %d",
847 t->is_client ? "client" : "server", id,
848 i));
849 }
850 }
851
852 GPR_ASSERT(send_initial_metadata_finished == nullptr);
853 GPR_ASSERT(send_trailing_metadata_finished == nullptr);
854 GPR_ASSERT(recv_initial_metadata_ready == nullptr);
855 GPR_ASSERT(recv_message_ready == nullptr);
856 GPR_ASSERT(recv_trailing_metadata_finished == nullptr);
857 grpc_slice_buffer_destroy(&flow_controlled_buffer);
858 grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_stream_arg, absl::OkStatus());
859 }
860
InitStream(grpc_stream * gs,grpc_stream_refcount * refcount,const void * server_data,grpc_core::Arena *)861 void grpc_chttp2_transport::InitStream(grpc_stream* gs,
862 grpc_stream_refcount* refcount,
863 const void* server_data,
864 grpc_core::Arena*) {
865 new (gs) grpc_chttp2_stream(this, refcount, server_data);
866 }
867
destroy_stream_locked(void * sp,grpc_error_handle)868 static void destroy_stream_locked(void* sp, grpc_error_handle /*error*/) {
869 grpc_chttp2_stream* s = static_cast<grpc_chttp2_stream*>(sp);
870 s->~grpc_chttp2_stream();
871 }
872
DestroyStream(grpc_stream * gs,grpc_closure * then_schedule_closure)873 void grpc_chttp2_transport::DestroyStream(grpc_stream* gs,
874 grpc_closure* then_schedule_closure) {
875 grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
876
877 s->destroy_stream_arg = then_schedule_closure;
878 combiner->Run(
879 GRPC_CLOSURE_INIT(&s->destroy_stream, destroy_stream_locked, s, nullptr),
880 absl::OkStatus());
881 }
882
grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport * t,uint32_t id)883 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
884 uint32_t id) {
885 if (t->accept_stream_cb == nullptr) {
886 return nullptr;
887 }
888 grpc_chttp2_stream* accepting = nullptr;
889 GPR_ASSERT(t->accepting_stream == nullptr);
890 t->accepting_stream = &accepting;
891 t->accept_stream_cb(t->accept_stream_cb_user_data, t,
892 reinterpret_cast<void*>(id));
893 t->accepting_stream = nullptr;
894 return accepting;
895 }
896
897 //
898 // OUTPUT PROCESSING
899 //
900
write_state_name(grpc_chttp2_write_state st)901 static const char* write_state_name(grpc_chttp2_write_state st) {
902 switch (st) {
903 case GRPC_CHTTP2_WRITE_STATE_IDLE:
904 return "IDLE";
905 case GRPC_CHTTP2_WRITE_STATE_WRITING:
906 return "WRITING";
907 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
908 return "WRITING+MORE";
909 }
910 GPR_UNREACHABLE_CODE(return "UNKNOWN");
911 }
912
set_write_state(grpc_chttp2_transport * t,grpc_chttp2_write_state st,const char * reason)913 static void set_write_state(grpc_chttp2_transport* t,
914 grpc_chttp2_write_state st, const char* reason) {
915 GRPC_CHTTP2_IF_TRACING(
916 gpr_log(GPR_INFO, "W:%p %s [%s] state %s -> %s [%s]", t,
917 t->is_client ? "CLIENT" : "SERVER",
918 std::string(t->peer_string.as_string_view()).c_str(),
919 write_state_name(t->write_state), write_state_name(st), reason));
920 t->write_state = st;
921 // If the state is being reset back to idle, it means a write was just
922 // finished. Make sure all the run_after_write closures are scheduled.
923 //
924 // This is also our chance to close the transport if the transport was marked
925 // to be closed after all writes finish (for example, if we received a go-away
926 // from peer while we had some pending writes)
927 if (st == GRPC_CHTTP2_WRITE_STATE_IDLE) {
928 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
929 if (!t->close_transport_on_writes_finished.ok()) {
930 grpc_error_handle err = t->close_transport_on_writes_finished;
931 t->close_transport_on_writes_finished = absl::OkStatus();
932 close_transport_locked(t, err);
933 }
934 }
935 }
936
grpc_chttp2_initiate_write(grpc_chttp2_transport * t,grpc_chttp2_initiate_write_reason reason)937 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
938 grpc_chttp2_initiate_write_reason reason) {
939 switch (t->write_state) {
940 case GRPC_CHTTP2_WRITE_STATE_IDLE:
941 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING,
942 grpc_chttp2_initiate_write_reason_string(reason));
943 // Note that the 'write_action_begin_locked' closure is being scheduled
944 // on the 'finally_scheduler' of t->combiner. This means that
945 // 'write_action_begin_locked' is called only *after* all the other
946 // closures (some of which are potentially initiating more writes on the
947 // transport) are executed on the t->combiner.
948 //
949 // The reason for scheduling on finally_scheduler is to make sure we batch
950 // as many writes as possible. 'write_action_begin_locked' is the function
951 // that gathers all the relevant bytes (which are at various places in the
952 // grpc_chttp2_transport structure) and append them to 'outbuf' field in
953 // grpc_chttp2_transport thereby batching what would have been potentially
954 // multiple write operations.
955 //
956 // Also, 'write_action_begin_locked' only gathers the bytes into outbuf.
957 // It does not call the endpoint to write the bytes. That is done by the
958 // 'write_action' (which is scheduled by 'write_action_begin_locked')
959 t->combiner->FinallyRun(
960 grpc_core::InitTransportClosure<write_action_begin_locked>(
961 t->Ref(), &t->write_action_begin_locked),
962 absl::OkStatus());
963 break;
964 case GRPC_CHTTP2_WRITE_STATE_WRITING:
965 set_write_state(t, GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
966 grpc_chttp2_initiate_write_reason_string(reason));
967 break;
968 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
969 break;
970 }
971 }
972
grpc_chttp2_mark_stream_writable(grpc_chttp2_transport * t,grpc_chttp2_stream * s)973 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
974 grpc_chttp2_stream* s) {
975 if (t->closed_with_error.ok() && grpc_chttp2_list_add_writable_stream(t, s)) {
976 GRPC_CHTTP2_STREAM_REF(s, "chttp2_writing:become");
977 }
978 }
979
begin_writing_desc(bool partial)980 static const char* begin_writing_desc(bool partial) {
981 if (partial) {
982 return "begin partial write in background";
983 } else {
984 return "begin write in current thread";
985 }
986 }
987
write_action_begin_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle)988 static void write_action_begin_locked(
989 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
990 grpc_error_handle /*error_ignored*/) {
991 GPR_ASSERT(t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE);
992 grpc_chttp2_begin_write_result r;
993 if (!t->closed_with_error.ok()) {
994 r.writing = false;
995 } else {
996 r = grpc_chttp2_begin_write(t.get());
997 }
998 if (r.writing) {
999 set_write_state(t.get(),
1000 r.partial ? GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE
1001 : GRPC_CHTTP2_WRITE_STATE_WRITING,
1002 begin_writing_desc(r.partial));
1003 write_action(t.get());
1004 if (t->reading_paused_on_pending_induced_frames) {
1005 GPR_ASSERT(t->num_pending_induced_frames == 0);
1006 // We had paused reading, because we had many induced frames (SETTINGS
1007 // ACK, PINGS ACK and RST_STREAMS) pending in t->qbuf. Now that we have
1008 // been able to flush qbuf, we can resume reading.
1009 GRPC_CHTTP2_IF_TRACING(gpr_log(
1010 GPR_INFO,
1011 "transport %p : Resuming reading after being paused due to too "
1012 "many unwritten SETTINGS ACK, PINGS ACK and RST_STREAM frames",
1013 t.get()));
1014 t->reading_paused_on_pending_induced_frames = false;
1015 continue_read_action_locked(std::move(t));
1016 }
1017 } else {
1018 set_write_state(t.get(), GRPC_CHTTP2_WRITE_STATE_IDLE,
1019 "begin writing nothing");
1020 }
1021 }
1022
write_action(grpc_chttp2_transport * t)1023 static void write_action(grpc_chttp2_transport* t) {
1024 void* cl = t->cl;
1025 if (!t->cl->empty()) {
1026 // Transfer the ownership of the context list to the endpoint and create and
1027 // associate a new context list with the transport.
1028 // The old context list is stored in the cl local variable which is passed
1029 // to the endpoint. Its upto the endpoint to manage its lifetime.
1030 t->cl = new grpc_core::ContextList();
1031 } else {
1032 // t->cl is Empty. There is nothing to trace in this endpoint_write. set cl
1033 // to nullptr.
1034 cl = nullptr;
1035 }
1036 // Choose max_frame_size as the prefered rx crypto frame size indicated by the
1037 // peer.
1038 int max_frame_size =
1039 t->settings.peer().preferred_receive_crypto_message_size();
1040 // Note: max frame size is 0 if the remote peer does not support adjusting the
1041 // sending frame size.
1042 if (max_frame_size == 0) {
1043 max_frame_size = INT_MAX;
1044 }
1045 if (GRPC_TRACE_FLAG_ENABLED(grpc_ping_trace)) {
1046 gpr_log(GPR_INFO, "%s[%p]: Write %" PRIdPTR " bytes",
1047 t->is_client ? "CLIENT" : "SERVER", t, t->outbuf.Length());
1048 }
1049 t->write_size_policy.BeginWrite(t->outbuf.Length());
1050 grpc_endpoint_write(t->ep, t->outbuf.c_slice_buffer(),
1051 grpc_core::InitTransportClosure<write_action_end>(
1052 t->Ref(), &t->write_action_end_locked),
1053 cl, max_frame_size);
1054 }
1055
write_action_end(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)1056 static void write_action_end(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
1057 grpc_error_handle error) {
1058 auto* tp = t.get();
1059 if (GRPC_TRACE_FLAG_ENABLED(grpc_ping_trace)) {
1060 gpr_log(GPR_INFO, "%s[%p]: Finish write",
1061 t->is_client ? "CLIENT" : "SERVER", t.get());
1062 }
1063 tp->combiner->Run(grpc_core::InitTransportClosure<write_action_end_locked>(
1064 std::move(t), &tp->write_action_end_locked),
1065 error);
1066 }
1067
1068 // Callback from the grpc_endpoint after bytes have been written by calling
1069 // sendmsg
write_action_end_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)1070 static void write_action_end_locked(
1071 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
1072 grpc_error_handle error) {
1073 t->write_size_policy.EndWrite(error.ok());
1074
1075 bool closed = false;
1076 if (!error.ok()) {
1077 close_transport_locked(t.get(), error);
1078 closed = true;
1079 }
1080
1081 if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED) {
1082 t->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SENT;
1083 closed = true;
1084 if (t->stream_map.empty()) {
1085 close_transport_locked(t.get(), GRPC_ERROR_CREATE("goaway sent"));
1086 }
1087 }
1088
1089 switch (t->write_state) {
1090 case GRPC_CHTTP2_WRITE_STATE_IDLE:
1091 GPR_UNREACHABLE_CODE(break);
1092 case GRPC_CHTTP2_WRITE_STATE_WRITING:
1093 set_write_state(t.get(), GRPC_CHTTP2_WRITE_STATE_IDLE, "finish writing");
1094 break;
1095 case GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE:
1096 set_write_state(t.get(), GRPC_CHTTP2_WRITE_STATE_WRITING,
1097 "continue writing");
1098 // If the transport is closed, we will retry writing on the endpoint
1099 // and next write may contain part of the currently serialized frames.
1100 // So, we should only call the run_after_write callbacks when the next
1101 // write finishes, or the callbacks will be invoked when the stream is
1102 // closed.
1103 if (!closed) {
1104 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &t->run_after_write);
1105 }
1106 t->combiner->FinallyRun(
1107 grpc_core::InitTransportClosure<write_action_begin_locked>(
1108 t, &t->write_action_begin_locked),
1109 absl::OkStatus());
1110 break;
1111 }
1112
1113 grpc_chttp2_end_write(t.get(), error);
1114 }
1115
1116 // Cancel out streams that haven't yet started if we have received a GOAWAY
cancel_unstarted_streams(grpc_chttp2_transport * t,grpc_error_handle error,bool tarpit)1117 static void cancel_unstarted_streams(grpc_chttp2_transport* t,
1118 grpc_error_handle error, bool tarpit) {
1119 grpc_chttp2_stream* s;
1120 while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1121 s->trailing_metadata_buffer.Set(
1122 grpc_core::GrpcStreamNetworkState(),
1123 grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1124 grpc_chttp2_cancel_stream(t, s, error, tarpit);
1125 }
1126 }
1127
grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport * t,uint32_t goaway_error,uint32_t last_stream_id,absl::string_view goaway_text)1128 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
1129 uint32_t goaway_error,
1130 uint32_t last_stream_id,
1131 absl::string_view goaway_text) {
1132 t->goaway_error = grpc_error_set_str(
1133 grpc_error_set_int(
1134 grpc_error_set_int(
1135 grpc_core::StatusCreate(
1136 absl::StatusCode::kUnavailable,
1137 absl::StrFormat(
1138 "GOAWAY received; Error code: %u; Debug Text: %s",
1139 goaway_error, goaway_text),
1140 DEBUG_LOCATION, {}),
1141 grpc_core::StatusIntProperty::kHttp2Error,
1142 static_cast<intptr_t>(goaway_error)),
1143 grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE),
1144 grpc_core::StatusStrProperty::kRawBytes, goaway_text);
1145
1146 GRPC_CHTTP2_IF_TRACING(
1147 gpr_log(GPR_INFO, "transport %p got goaway with last stream id %d", t,
1148 last_stream_id));
1149 // We want to log this irrespective of whether http tracing is enabled if we
1150 // received a GOAWAY with a non NO_ERROR code.
1151 if (goaway_error != GRPC_HTTP2_NO_ERROR) {
1152 gpr_log(GPR_INFO, "%s: Got goaway [%d] err=%s",
1153 std::string(t->peer_string.as_string_view()).c_str(), goaway_error,
1154 grpc_core::StatusToString(t->goaway_error).c_str());
1155 }
1156 if (t->is_client) {
1157 cancel_unstarted_streams(t, t->goaway_error, false);
1158 // Cancel all unseen streams
1159 std::vector<grpc_chttp2_stream*> to_cancel;
1160 for (auto id_stream : t->stream_map) {
1161 if (id_stream.first > last_stream_id) {
1162 to_cancel.push_back(id_stream.second);
1163 }
1164 }
1165 for (auto s : to_cancel) {
1166 s->trailing_metadata_buffer.Set(
1167 grpc_core::GrpcStreamNetworkState(),
1168 grpc_core::GrpcStreamNetworkState::kNotSeenByServer);
1169 grpc_chttp2_cancel_stream(s->t.get(), s, s->t->goaway_error, false);
1170 }
1171 }
1172 absl::Status status = grpc_error_to_absl_status(t->goaway_error);
1173 // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
1174 // data equal to "too_many_pings", it should log the occurrence at a log level
1175 // that is enabled by default and double the configured KEEPALIVE_TIME used
1176 // for new connections on that channel.
1177 if (GPR_UNLIKELY(t->is_client &&
1178 goaway_error == GRPC_HTTP2_ENHANCE_YOUR_CALM &&
1179 goaway_text == "too_many_pings")) {
1180 gpr_log(GPR_ERROR,
1181 "%s: Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug "
1182 "data equal to \"too_many_pings\". Current keepalive time (before "
1183 "throttling): %s",
1184 std::string(t->peer_string.as_string_view()).c_str(),
1185 t->keepalive_time.ToString().c_str());
1186 constexpr int max_keepalive_time_millis =
1187 INT_MAX / KEEPALIVE_TIME_BACKOFF_MULTIPLIER;
1188 int64_t throttled_keepalive_time =
1189 t->keepalive_time.millis() > max_keepalive_time_millis
1190 ? INT_MAX
1191 : t->keepalive_time.millis() * KEEPALIVE_TIME_BACKOFF_MULTIPLIER;
1192 status.SetPayload(grpc_core::kKeepaliveThrottlingKey,
1193 absl::Cord(std::to_string(throttled_keepalive_time)));
1194 }
1195 // lie: use transient failure from the transport to indicate goaway has been
1196 // received.
1197 if (!grpc_core::test_only_disable_transient_failure_state_notification) {
1198 connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1199 "got_goaway");
1200 }
1201 }
1202
maybe_start_some_streams(grpc_chttp2_transport * t)1203 static void maybe_start_some_streams(grpc_chttp2_transport* t) {
1204 grpc_chttp2_stream* s;
1205 // maybe cancel out streams that haven't yet started if we have received a
1206 // GOAWAY
1207 if (!t->goaway_error.ok()) {
1208 cancel_unstarted_streams(t, t->goaway_error, false);
1209 return;
1210 }
1211 // start streams where we have free grpc_chttp2_stream ids and free
1212 // * concurrency
1213 while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
1214 t->stream_map.size() < t->settings.peer().max_concurrent_streams() &&
1215 grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1216 // safe since we can't (legally) be parsing this stream yet
1217 GRPC_CHTTP2_IF_TRACING(gpr_log(
1218 GPR_INFO,
1219 "HTTP:%s: Transport %p allocating new grpc_chttp2_stream %p to id %d",
1220 t->is_client ? "CLI" : "SVR", t, s, t->next_stream_id));
1221
1222 GPR_ASSERT(s->id == 0);
1223 s->id = t->next_stream_id;
1224 t->next_stream_id += 2;
1225
1226 if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1227 connectivity_state_set(t, GRPC_CHANNEL_TRANSIENT_FAILURE,
1228 absl::Status(absl::StatusCode::kUnavailable,
1229 "Transport Stream IDs exhausted"),
1230 "no_more_stream_ids");
1231 }
1232
1233 t->stream_map.emplace(s->id, s);
1234 post_destructive_reclaimer(t);
1235 grpc_chttp2_mark_stream_writable(t, s);
1236 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM);
1237 }
1238 // cancel out streams that will never be started
1239 if (t->next_stream_id >= MAX_CLIENT_STREAM_ID) {
1240 while (grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
1241 s->trailing_metadata_buffer.Set(
1242 grpc_core::GrpcStreamNetworkState(),
1243 grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1244 grpc_chttp2_cancel_stream(
1245 t, s,
1246 grpc_error_set_int(GRPC_ERROR_CREATE("Stream IDs exhausted"),
1247 grpc_core::StatusIntProperty::kRpcStatus,
1248 GRPC_STATUS_UNAVAILABLE),
1249 false);
1250 }
1251 }
1252 }
1253
add_closure_barrier(grpc_closure * closure)1254 static grpc_closure* add_closure_barrier(grpc_closure* closure) {
1255 closure->next_data.scratch += CLOSURE_BARRIER_FIRST_REF_BIT;
1256 return closure;
1257 }
1258
null_then_sched_closure(grpc_closure ** closure)1259 static void null_then_sched_closure(grpc_closure** closure) {
1260 grpc_closure* c = *closure;
1261 *closure = nullptr;
1262 // null_then_schedule_closure might be run during a start_batch which might
1263 // subsequently examine the batch for more operations contained within.
1264 // However, the closure run might make it back to the call object, push a
1265 // completion, have the application see it, and make a new operation on the
1266 // call which recycles the batch BEFORE the call to start_batch completes,
1267 // forcing a race.
1268 grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, absl::OkStatus());
1269 }
1270
grpc_chttp2_complete_closure_step(grpc_chttp2_transport * t,grpc_closure ** pclosure,grpc_error_handle error,const char * desc,grpc_core::DebugLocation whence)1271 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
1272 grpc_closure** pclosure,
1273 grpc_error_handle error,
1274 const char* desc,
1275 grpc_core::DebugLocation whence) {
1276 grpc_closure* closure = *pclosure;
1277 *pclosure = nullptr;
1278 if (closure == nullptr) {
1279 return;
1280 }
1281 closure->next_data.scratch -= CLOSURE_BARRIER_FIRST_REF_BIT;
1282 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1283 gpr_log(
1284 GPR_INFO,
1285 "complete_closure_step: t=%p %p refs=%d flags=0x%04x desc=%s err=%s "
1286 "write_state=%s whence=%s:%d",
1287 t, closure,
1288 static_cast<int>(closure->next_data.scratch /
1289 CLOSURE_BARRIER_FIRST_REF_BIT),
1290 static_cast<int>(closure->next_data.scratch %
1291 CLOSURE_BARRIER_FIRST_REF_BIT),
1292 desc, grpc_core::StatusToString(error).c_str(),
1293 write_state_name(t->write_state), whence.file(), whence.line());
1294 }
1295
1296 if (!error.ok()) {
1297 grpc_error_handle cl_err =
1298 grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
1299 if (cl_err.ok()) {
1300 cl_err = GRPC_ERROR_CREATE(absl::StrCat(
1301 "Error in HTTP transport completing operation: ", desc,
1302 " write_state=", write_state_name(t->write_state), " refs=",
1303 closure->next_data.scratch / CLOSURE_BARRIER_FIRST_REF_BIT, " flags=",
1304 closure->next_data.scratch % CLOSURE_BARRIER_FIRST_REF_BIT));
1305 cl_err = grpc_error_set_str(cl_err,
1306 grpc_core::StatusStrProperty::kTargetAddress,
1307 std::string(t->peer_string.as_string_view()));
1308 }
1309 cl_err = grpc_error_add_child(cl_err, error);
1310 closure->error_data.error = grpc_core::internal::StatusAllocHeapPtr(cl_err);
1311 }
1312 if (closure->next_data.scratch < CLOSURE_BARRIER_FIRST_REF_BIT) {
1313 if ((t->write_state == GRPC_CHTTP2_WRITE_STATE_IDLE) ||
1314 !(closure->next_data.scratch & CLOSURE_BARRIER_MAY_COVER_WRITE)) {
1315 // Using GRPC_CLOSURE_SCHED instead of GRPC_CLOSURE_RUN to avoid running
1316 // closures earlier than when it is safe to do so.
1317 grpc_error_handle run_error =
1318 grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error);
1319 closure->error_data.error = 0;
1320 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, run_error);
1321 } else {
1322 grpc_closure_list_append(&t->run_after_write, closure);
1323 }
1324 }
1325 }
1326
contains_non_ok_status(grpc_metadata_batch * batch)1327 static bool contains_non_ok_status(grpc_metadata_batch* batch) {
1328 return batch->get(grpc_core::GrpcStatusMetadata()).value_or(GRPC_STATUS_OK) !=
1329 GRPC_STATUS_OK;
1330 }
1331
log_metadata(const grpc_metadata_batch * md_batch,uint32_t id,bool is_client,bool is_initial)1332 static void log_metadata(const grpc_metadata_batch* md_batch, uint32_t id,
1333 bool is_client, bool is_initial) {
1334 gpr_log(GPR_INFO, "--metadata--");
1335 const std::string prefix = absl::StrCat(
1336 "HTTP:", id, is_initial ? ":HDR" : ":TRL", is_client ? ":CLI:" : ":SVR:");
1337 md_batch->Log([&prefix](absl::string_view key, absl::string_view value) {
1338 gpr_log(GPR_INFO, "%s", absl::StrCat(prefix, key, ": ", value).c_str());
1339 });
1340 }
1341
perform_stream_op_locked(void * stream_op,grpc_error_handle)1342 static void perform_stream_op_locked(void* stream_op,
1343 grpc_error_handle /*error_ignored*/) {
1344 grpc_transport_stream_op_batch* op =
1345 static_cast<grpc_transport_stream_op_batch*>(stream_op);
1346 grpc_chttp2_stream* s =
1347 static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg);
1348 grpc_transport_stream_op_batch_payload* op_payload = op->payload;
1349 grpc_chttp2_transport* t = s->t.get();
1350
1351 s->context = op->payload->context;
1352 s->traced = op->is_traced;
1353 s->call_tracer = CallTracerIfSampled(s);
1354 s->tcp_tracer = TcpTracerIfSampled(s);
1355 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1356 gpr_log(GPR_INFO,
1357 "perform_stream_op_locked[s=%p; op=%p]: %s; on_complete = %p", s,
1358 op, grpc_transport_stream_op_batch_string(op, false).c_str(),
1359 op->on_complete);
1360 if (op->send_initial_metadata) {
1361 log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
1362 s->id, t->is_client, true);
1363 }
1364 if (op->send_trailing_metadata) {
1365 log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
1366 s->id, t->is_client, false);
1367 }
1368 }
1369
1370 grpc_closure* on_complete = op->on_complete;
1371 // on_complete will be null if and only if there are no send ops in the batch.
1372 if (on_complete != nullptr) {
1373 // This batch has send ops. Use final_data as a barrier until enqueue time;
1374 // the initial counter is dropped at the end of this function.
1375 on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
1376 on_complete->error_data.error = 0;
1377 }
1378
1379 if (op->cancel_stream) {
1380 grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error,
1381 op_payload->cancel_stream.tarpit);
1382 }
1383
1384 if (op->send_initial_metadata) {
1385 if (s->call_tracer) {
1386 s->call_tracer->RecordAnnotation(
1387 grpc_core::HttpAnnotation(grpc_core::HttpAnnotation::Type::kStart,
1388 gpr_now(GPR_CLOCK_REALTIME))
1389 .Add(s->t->flow_control.stats())
1390 .Add(s->flow_control.stats()));
1391 }
1392 if (t->is_client && t->channelz_socket != nullptr) {
1393 t->channelz_socket->RecordStreamStartedFromLocal();
1394 }
1395 GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
1396 on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
1397
1398 s->send_initial_metadata_finished = add_closure_barrier(on_complete);
1399 s->send_initial_metadata =
1400 op_payload->send_initial_metadata.send_initial_metadata;
1401 if (t->is_client) {
1402 s->deadline = std::min(
1403 s->deadline,
1404 s->send_initial_metadata->get(grpc_core::GrpcTimeoutMetadata())
1405 .value_or(grpc_core::Timestamp::InfFuture()));
1406 }
1407 if (contains_non_ok_status(s->send_initial_metadata)) {
1408 s->seen_error = true;
1409 }
1410 if (!s->write_closed) {
1411 if (t->is_client) {
1412 if (t->closed_with_error.ok()) {
1413 GPR_ASSERT(s->id == 0);
1414 grpc_chttp2_list_add_waiting_for_concurrency(t, s);
1415 maybe_start_some_streams(t);
1416 } else {
1417 s->trailing_metadata_buffer.Set(
1418 grpc_core::GrpcStreamNetworkState(),
1419 grpc_core::GrpcStreamNetworkState::kNotSentOnWire);
1420 grpc_chttp2_cancel_stream(
1421 t, s,
1422 grpc_error_set_int(
1423 GRPC_ERROR_CREATE_REFERENCING("Transport closed",
1424 &t->closed_with_error, 1),
1425 grpc_core::StatusIntProperty::kRpcStatus,
1426 GRPC_STATUS_UNAVAILABLE),
1427 false);
1428 }
1429 } else {
1430 GPR_ASSERT(s->id != 0);
1431 grpc_chttp2_mark_stream_writable(t, s);
1432 if (!(op->send_message &&
1433 (op->payload->send_message.flags & GRPC_WRITE_BUFFER_HINT))) {
1434 grpc_chttp2_initiate_write(
1435 t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
1436 }
1437 }
1438 } else {
1439 s->send_initial_metadata = nullptr;
1440 grpc_chttp2_complete_closure_step(
1441 t, &s->send_initial_metadata_finished,
1442 GRPC_ERROR_CREATE_REFERENCING(
1443 "Attempt to send initial metadata after stream was closed",
1444 &s->write_closed_error, 1),
1445 "send_initial_metadata_finished");
1446 }
1447 }
1448
1449 if (op->send_message) {
1450 t->num_messages_in_next_write++;
1451 grpc_core::global_stats().IncrementHttp2SendMessageSize(
1452 op->payload->send_message.send_message->Length());
1453 on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
1454 s->send_message_finished = add_closure_barrier(op->on_complete);
1455 const uint32_t flags = op_payload->send_message.flags;
1456 if (s->write_closed) {
1457 op->payload->send_message.stream_write_closed = true;
1458 // We should NOT return an error here, so as to avoid a cancel OP being
1459 // started. The surface layer will notice that the stream has been closed
1460 // for writes and fail the send message op.
1461 grpc_chttp2_complete_closure_step(t, &s->send_message_finished,
1462 absl::OkStatus(),
1463 "fetching_send_message_finished");
1464 } else {
1465 uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
1466 &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
1467 frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
1468 size_t len = op_payload->send_message.send_message->Length();
1469 frame_hdr[1] = static_cast<uint8_t>(len >> 24);
1470 frame_hdr[2] = static_cast<uint8_t>(len >> 16);
1471 frame_hdr[3] = static_cast<uint8_t>(len >> 8);
1472 frame_hdr[4] = static_cast<uint8_t>(len);
1473
1474 if (grpc_core::IsHttp2StatsFixEnabled()) {
1475 s->stats.outgoing.framing_bytes += GRPC_HEADER_SIZE_IN_BYTES;
1476 s->stats.outgoing.data_bytes +=
1477 op_payload->send_message.send_message->Length();
1478 }
1479 s->next_message_end_offset =
1480 s->flow_controlled_bytes_written +
1481 static_cast<int64_t>(s->flow_controlled_buffer.length) +
1482 static_cast<int64_t>(len);
1483 if (flags & GRPC_WRITE_BUFFER_HINT) {
1484 s->next_message_end_offset -= t->write_buffer_size;
1485 s->write_buffering = true;
1486 } else {
1487 s->write_buffering = false;
1488 }
1489
1490 grpc_slice* const slices =
1491 op_payload->send_message.send_message->c_slice_buffer()->slices;
1492 grpc_slice* const end =
1493 slices + op_payload->send_message.send_message->Count();
1494 for (grpc_slice* slice = slices; slice != end; slice++) {
1495 grpc_slice_buffer_add(&s->flow_controlled_buffer,
1496 grpc_core::CSliceRef(*slice));
1497 }
1498
1499 int64_t notify_offset = s->next_message_end_offset;
1500 if (notify_offset <= s->flow_controlled_bytes_written) {
1501 grpc_chttp2_complete_closure_step(t, &s->send_message_finished,
1502 absl::OkStatus(),
1503 "fetching_send_message_finished");
1504 } else {
1505 grpc_chttp2_write_cb* cb = t->write_cb_pool;
1506 if (cb == nullptr) {
1507 cb = static_cast<grpc_chttp2_write_cb*>(gpr_malloc(sizeof(*cb)));
1508 } else {
1509 t->write_cb_pool = cb->next;
1510 }
1511 cb->call_at_byte = notify_offset;
1512 cb->closure = s->send_message_finished;
1513 s->send_message_finished = nullptr;
1514 grpc_chttp2_write_cb** list = flags & GRPC_WRITE_THROUGH
1515 ? &s->on_write_finished_cbs
1516 : &s->on_flow_controlled_cbs;
1517 cb->next = *list;
1518 *list = cb;
1519 }
1520
1521 if (s->id != 0 &&
1522 (!s->write_buffering ||
1523 s->flow_controlled_buffer.length > t->write_buffer_size)) {
1524 grpc_chttp2_mark_stream_writable(t, s);
1525 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE);
1526 }
1527 }
1528 }
1529
1530 if (op->send_trailing_metadata) {
1531 GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
1532 on_complete->next_data.scratch |= t->closure_barrier_may_cover_write;
1533 s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
1534 s->send_trailing_metadata =
1535 op_payload->send_trailing_metadata.send_trailing_metadata;
1536 s->sent_trailing_metadata_op = op_payload->send_trailing_metadata.sent;
1537 s->write_buffering = false;
1538 if (contains_non_ok_status(s->send_trailing_metadata)) {
1539 s->seen_error = true;
1540 }
1541 if (s->write_closed) {
1542 s->send_trailing_metadata = nullptr;
1543 s->sent_trailing_metadata_op = nullptr;
1544 grpc_chttp2_complete_closure_step(
1545 t, &s->send_trailing_metadata_finished,
1546 op->payload->send_trailing_metadata.send_trailing_metadata->empty()
1547 ? absl::OkStatus()
1548 : GRPC_ERROR_CREATE("Attempt to send trailing metadata after "
1549 "stream was closed"),
1550 "send_trailing_metadata_finished");
1551 } else if (s->id != 0) {
1552 // TODO(ctiller): check if there's flow control for any outstanding
1553 // bytes before going writable
1554 grpc_chttp2_mark_stream_writable(t, s);
1555 grpc_chttp2_initiate_write(
1556 t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
1557 }
1558 }
1559
1560 if (op->recv_initial_metadata) {
1561 GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
1562 s->recv_initial_metadata_ready =
1563 op_payload->recv_initial_metadata.recv_initial_metadata_ready;
1564 s->recv_initial_metadata =
1565 op_payload->recv_initial_metadata.recv_initial_metadata;
1566 s->trailing_metadata_available =
1567 op_payload->recv_initial_metadata.trailing_metadata_available;
1568 if (s->parsed_trailers_only && s->trailing_metadata_available != nullptr) {
1569 *s->trailing_metadata_available = true;
1570 }
1571 grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
1572 }
1573
1574 if (op->recv_message) {
1575 GPR_ASSERT(s->recv_message_ready == nullptr);
1576 s->recv_message_ready = op_payload->recv_message.recv_message_ready;
1577 s->recv_message = op_payload->recv_message.recv_message;
1578 s->recv_message->emplace();
1579 s->recv_message_flags = op_payload->recv_message.flags;
1580 s->call_failed_before_recv_message =
1581 op_payload->recv_message.call_failed_before_recv_message;
1582 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1583 }
1584
1585 if (op->recv_trailing_metadata) {
1586 GPR_ASSERT(s->collecting_stats == nullptr);
1587 s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
1588 GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
1589 s->recv_trailing_metadata_finished =
1590 op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
1591 s->recv_trailing_metadata =
1592 op_payload->recv_trailing_metadata.recv_trailing_metadata;
1593 s->final_metadata_requested = true;
1594 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
1595 }
1596
1597 if (on_complete != nullptr) {
1598 grpc_chttp2_complete_closure_step(t, &on_complete, absl::OkStatus(),
1599 "op->on_complete");
1600 }
1601
1602 GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
1603 }
1604
PerformStreamOp(grpc_stream * gs,grpc_transport_stream_op_batch * op)1605 void grpc_chttp2_transport::PerformStreamOp(
1606 grpc_stream* gs, grpc_transport_stream_op_batch* op) {
1607 grpc_chttp2_stream* s = reinterpret_cast<grpc_chttp2_stream*>(gs);
1608
1609 if (!is_client) {
1610 if (op->send_initial_metadata) {
1611 GPR_ASSERT(!op->payload->send_initial_metadata.send_initial_metadata
1612 ->get(grpc_core::GrpcTimeoutMetadata())
1613 .has_value());
1614 }
1615 if (op->send_trailing_metadata) {
1616 GPR_ASSERT(!op->payload->send_trailing_metadata.send_trailing_metadata
1617 ->get(grpc_core::GrpcTimeoutMetadata())
1618 .has_value());
1619 }
1620 }
1621
1622 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1623 gpr_log(GPR_INFO, "perform_stream_op[s=%p; op=%p]: %s", s, op,
1624 grpc_transport_stream_op_batch_string(op, false).c_str());
1625 }
1626
1627 GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op");
1628 op->handler_private.extra_arg = gs;
1629 combiner->Run(GRPC_CLOSURE_INIT(&op->handler_private.closure,
1630 perform_stream_op_locked, op, nullptr),
1631 absl::OkStatus());
1632 }
1633
cancel_pings(grpc_chttp2_transport * t,grpc_error_handle error)1634 static void cancel_pings(grpc_chttp2_transport* t, grpc_error_handle error) {
1635 GRPC_CHTTP2_IF_TRACING(gpr_log(GPR_INFO, "%p CANCEL PINGS: %s", t,
1636 grpc_core::StatusToString(error).c_str()));
1637 // callback remaining pings: they're not allowed to call into the transport,
1638 // and maybe they hold resources that need to be freed
1639 t->ping_callbacks.CancelAll(t->event_engine.get());
1640 }
1641
1642 namespace {
1643 class PingClosureWrapper {
1644 public:
PingClosureWrapper(grpc_closure * closure)1645 explicit PingClosureWrapper(grpc_closure* closure) : closure_(closure) {}
1646 PingClosureWrapper(const PingClosureWrapper&) = delete;
1647 PingClosureWrapper& operator=(const PingClosureWrapper&) = delete;
PingClosureWrapper(PingClosureWrapper && other)1648 PingClosureWrapper(PingClosureWrapper&& other) noexcept
1649 : closure_(other.Take()) {}
operator =(PingClosureWrapper && other)1650 PingClosureWrapper& operator=(PingClosureWrapper&& other) noexcept {
1651 std::swap(closure_, other.closure_);
1652 return *this;
1653 }
~PingClosureWrapper()1654 ~PingClosureWrapper() {
1655 if (closure_ != nullptr) {
1656 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure_, absl::CancelledError());
1657 }
1658 }
1659
operator ()()1660 void operator()() {
1661 grpc_core::ExecCtx::Run(DEBUG_LOCATION, Take(), absl::OkStatus());
1662 }
1663
1664 private:
Take()1665 grpc_closure* Take() { return std::exchange(closure_, nullptr); }
1666
1667 grpc_closure* closure_ = nullptr;
1668 };
1669 } // namespace
1670
send_ping_locked(grpc_chttp2_transport * t,grpc_closure * on_initiate,grpc_closure * on_ack)1671 static void send_ping_locked(grpc_chttp2_transport* t,
1672 grpc_closure* on_initiate, grpc_closure* on_ack) {
1673 if (!t->closed_with_error.ok()) {
1674 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_initiate, t->closed_with_error);
1675 grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_ack, t->closed_with_error);
1676 return;
1677 }
1678 t->ping_callbacks.OnPing(PingClosureWrapper(on_initiate),
1679 PingClosureWrapper(on_ack));
1680 }
1681
1682 // Specialized form of send_ping_locked for keepalive ping. If there is already
1683 // a ping in progress, the keepalive ping would piggyback onto that ping,
1684 // instead of waiting for that ping to complete and then starting a new ping.
send_keepalive_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1685 static void send_keepalive_ping_locked(
1686 grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1687 if (!t->closed_with_error.ok()) {
1688 t->combiner->Run(
1689 grpc_core::InitTransportClosure<finish_keepalive_ping_locked>(
1690 t->Ref(), &t->finish_keepalive_ping_locked),
1691 t->closed_with_error);
1692 return;
1693 }
1694 t->ping_callbacks.OnPingAck(
1695 PingClosureWrapper(grpc_core::InitTransportClosure<finish_keepalive_ping>(
1696 t->Ref(), &t->finish_keepalive_ping_locked)));
1697 }
1698
grpc_chttp2_retry_initiate_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1699 void grpc_chttp2_retry_initiate_ping(
1700 grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1701 auto tp = t.get();
1702 tp->combiner->Run(grpc_core::InitTransportClosure<retry_initiate_ping_locked>(
1703 std::move(t), &tp->retry_initiate_ping_locked),
1704 absl::OkStatus());
1705 }
1706
retry_initiate_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,GRPC_UNUSED grpc_error_handle error)1707 static void retry_initiate_ping_locked(
1708 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
1709 GRPC_UNUSED grpc_error_handle error) {
1710 GPR_DEBUG_ASSERT(error.ok());
1711 GPR_ASSERT(t->delayed_ping_timer_handle != TaskHandle::kInvalid);
1712 t->delayed_ping_timer_handle = TaskHandle::kInvalid;
1713 grpc_chttp2_initiate_write(t.get(),
1714 GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING);
1715 }
1716
grpc_chttp2_ack_ping(grpc_chttp2_transport * t,uint64_t id)1717 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id) {
1718 if (!t->ping_callbacks.AckPing(id, t->event_engine.get())) {
1719 gpr_log(GPR_DEBUG, "Unknown ping response from %s: %" PRIx64,
1720 std::string(t->peer_string.as_string_view()).c_str(), id);
1721 return;
1722 }
1723 if (t->ping_callbacks.ping_requested()) {
1724 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS);
1725 }
1726 }
1727
grpc_chttp2_keepalive_timeout(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1728 void grpc_chttp2_keepalive_timeout(
1729 grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1730 t->combiner->Run(
1731 grpc_core::NewClosure([t](grpc_error_handle) {
1732 gpr_log(GPR_INFO, "%s: Keepalive timeout. Closing transport.",
1733 std::string(t->peer_string.as_string_view()).c_str());
1734 send_goaway(
1735 t.get(),
1736 grpc_error_set_int(GRPC_ERROR_CREATE("keepalive_timeout"),
1737 grpc_core::StatusIntProperty::kHttp2Error,
1738 GRPC_HTTP2_ENHANCE_YOUR_CALM),
1739 /*immediate_disconnect_hint=*/true);
1740 close_transport_locked(
1741 t.get(),
1742 grpc_error_set_int(GRPC_ERROR_CREATE("keepalive timeout"),
1743 grpc_core::StatusIntProperty::kRpcStatus,
1744 GRPC_STATUS_UNAVAILABLE));
1745 }),
1746 absl::OkStatus());
1747 }
1748
grpc_chttp2_ping_timeout(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1749 void grpc_chttp2_ping_timeout(
1750 grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1751 t->combiner->Run(
1752 grpc_core::NewClosure([t](grpc_error_handle) {
1753 gpr_log(GPR_INFO, "%s: Ping timeout. Closing transport.",
1754 std::string(t->peer_string.as_string_view()).c_str());
1755 send_goaway(
1756 t.get(),
1757 grpc_error_set_int(GRPC_ERROR_CREATE("ping_timeout"),
1758 grpc_core::StatusIntProperty::kHttp2Error,
1759 GRPC_HTTP2_ENHANCE_YOUR_CALM),
1760 /*immediate_disconnect_hint=*/true);
1761 close_transport_locked(
1762 t.get(),
1763 grpc_error_set_int(GRPC_ERROR_CREATE("ping timeout"),
1764 grpc_core::StatusIntProperty::kRpcStatus,
1765 GRPC_STATUS_UNAVAILABLE));
1766 }),
1767 absl::OkStatus());
1768 }
1769
grpc_chttp2_settings_timeout(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)1770 void grpc_chttp2_settings_timeout(
1771 grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
1772 t->combiner->Run(
1773 grpc_core::NewClosure([t](grpc_error_handle) {
1774 gpr_log(GPR_INFO, "%s: Settings timeout. Closing transport.",
1775 std::string(t->peer_string.as_string_view()).c_str());
1776 send_goaway(
1777 t.get(),
1778 grpc_error_set_int(GRPC_ERROR_CREATE("settings_timeout"),
1779 grpc_core::StatusIntProperty::kHttp2Error,
1780 GRPC_HTTP2_SETTINGS_TIMEOUT),
1781 /*immediate_disconnect_hint=*/true);
1782 close_transport_locked(
1783 t.get(),
1784 grpc_error_set_int(GRPC_ERROR_CREATE("settings timeout"),
1785 grpc_core::StatusIntProperty::kRpcStatus,
1786 GRPC_STATUS_UNAVAILABLE));
1787 }),
1788 absl::OkStatus());
1789 }
1790
1791 namespace {
1792
1793 // Fire and forget (deletes itself on completion). Does a graceful shutdown by
1794 // sending a GOAWAY frame with the last stream id set to 2^31-1, sending a ping
1795 // and waiting for an ack (effective waiting for an RTT) and then sending a
1796 // final GOAWAY frame with an updated last stream identifier. This helps ensure
1797 // that a connection can be cleanly shut down without losing requests.
1798 // In the event, that the client does not respond to the ping for some reason,
1799 // we add a 20 second deadline, after which we send the second goaway.
1800 class GracefulGoaway : public grpc_core::RefCounted<GracefulGoaway> {
1801 public:
Start(grpc_chttp2_transport * t)1802 static void Start(grpc_chttp2_transport* t) { new GracefulGoaway(t); }
1803
1804 private:
1805 using TaskHandle = ::grpc_event_engine::experimental::EventEngine::TaskHandle;
1806
GracefulGoaway(grpc_chttp2_transport * t)1807 explicit GracefulGoaway(grpc_chttp2_transport* t) : t_(t->Ref()) {
1808 t->sent_goaway_state = GRPC_CHTTP2_GRACEFUL_GOAWAY;
1809 grpc_chttp2_goaway_append((1u << 31) - 1, 0, grpc_empty_slice(), &t->qbuf);
1810 t->keepalive_timeout =
1811 std::min(t->keepalive_timeout, grpc_core::Duration::Seconds(20));
1812 t->ping_timeout =
1813 std::min(t->ping_timeout, grpc_core::Duration::Seconds(20));
1814 send_ping_locked(
1815 t, nullptr, GRPC_CLOSURE_INIT(&on_ping_ack_, OnPingAck, this, nullptr));
1816 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1817 }
1818
MaybeSendFinalGoawayLocked()1819 void MaybeSendFinalGoawayLocked() {
1820 if (t_->sent_goaway_state != GRPC_CHTTP2_GRACEFUL_GOAWAY) {
1821 // We already sent the final GOAWAY.
1822 return;
1823 }
1824 if (t_->destroying || !t_->closed_with_error.ok()) {
1825 GRPC_CHTTP2_IF_TRACING(
1826 gpr_log(GPR_INFO,
1827 "transport:%p %s peer:%s Transport already shutting down. "
1828 "Graceful GOAWAY abandoned.",
1829 t_.get(), t_->is_client ? "CLIENT" : "SERVER",
1830 std::string(t_->peer_string.as_string_view()).c_str()));
1831 return;
1832 }
1833 // Ping completed. Send final goaway.
1834 GRPC_CHTTP2_IF_TRACING(
1835 gpr_log(GPR_INFO,
1836 "transport:%p %s peer:%s Graceful shutdown: Ping received. "
1837 "Sending final GOAWAY with stream_id:%d",
1838 t_.get(), t_->is_client ? "CLIENT" : "SERVER",
1839 std::string(t_->peer_string.as_string_view()).c_str(),
1840 t_->last_new_stream_id));
1841 t_->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED;
1842 grpc_chttp2_goaway_append(t_->last_new_stream_id, 0, grpc_empty_slice(),
1843 &t_->qbuf);
1844 grpc_chttp2_initiate_write(t_.get(),
1845 GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1846 }
1847
OnPingAck(void * arg,grpc_error_handle)1848 static void OnPingAck(void* arg, grpc_error_handle /* error */) {
1849 auto* self = static_cast<GracefulGoaway*>(arg);
1850 self->t_->combiner->Run(
1851 GRPC_CLOSURE_INIT(&self->on_ping_ack_, OnPingAckLocked, self, nullptr),
1852 absl::OkStatus());
1853 }
1854
OnPingAckLocked(void * arg,grpc_error_handle)1855 static void OnPingAckLocked(void* arg, grpc_error_handle /* error */) {
1856 auto* self = static_cast<GracefulGoaway*>(arg);
1857 self->MaybeSendFinalGoawayLocked();
1858 self->Unref();
1859 }
1860
1861 const grpc_core::RefCountedPtr<grpc_chttp2_transport> t_;
1862 grpc_closure on_ping_ack_;
1863 };
1864
1865 } // namespace
1866
send_goaway(grpc_chttp2_transport * t,grpc_error_handle error,bool immediate_disconnect_hint)1867 static void send_goaway(grpc_chttp2_transport* t, grpc_error_handle error,
1868 bool immediate_disconnect_hint) {
1869 grpc_http2_error_code http_error;
1870 std::string message;
1871 grpc_error_get_status(error, grpc_core::Timestamp::InfFuture(), nullptr,
1872 &message, &http_error, nullptr);
1873 if (!t->is_client && http_error == GRPC_HTTP2_NO_ERROR &&
1874 !immediate_disconnect_hint) {
1875 // Do a graceful shutdown.
1876 if (t->sent_goaway_state == GRPC_CHTTP2_NO_GOAWAY_SEND) {
1877 GracefulGoaway::Start(t);
1878 } else {
1879 // Graceful GOAWAY is already in progress.
1880 }
1881 } else if (t->sent_goaway_state == GRPC_CHTTP2_NO_GOAWAY_SEND ||
1882 t->sent_goaway_state == GRPC_CHTTP2_GRACEFUL_GOAWAY) {
1883 // We want to log this irrespective of whether http tracing is enabled
1884 gpr_log(GPR_DEBUG, "%s %s: Sending goaway last_new_stream_id=%d err=%s",
1885 std::string(t->peer_string.as_string_view()).c_str(),
1886 t->is_client ? "CLIENT" : "SERVER", t->last_new_stream_id,
1887 grpc_core::StatusToString(error).c_str());
1888 t->sent_goaway_state = GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED;
1889 grpc_chttp2_goaway_append(
1890 t->last_new_stream_id, static_cast<uint32_t>(http_error),
1891 grpc_slice_from_cpp_string(std::move(message)), &t->qbuf);
1892 } else {
1893 // Final GOAWAY has already been sent.
1894 }
1895 grpc_chttp2_initiate_write(t, GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT);
1896 }
1897
grpc_chttp2_exceeded_ping_strikes(grpc_chttp2_transport * t)1898 void grpc_chttp2_exceeded_ping_strikes(grpc_chttp2_transport* t) {
1899 send_goaway(t,
1900 grpc_error_set_int(GRPC_ERROR_CREATE("too_many_pings"),
1901 grpc_core::StatusIntProperty::kHttp2Error,
1902 GRPC_HTTP2_ENHANCE_YOUR_CALM),
1903 /*immediate_disconnect_hint=*/true);
1904 // The transport will be closed after the write is done
1905 close_transport_locked(
1906 t, grpc_error_set_int(GRPC_ERROR_CREATE("Too many pings"),
1907 grpc_core::StatusIntProperty::kRpcStatus,
1908 GRPC_STATUS_UNAVAILABLE));
1909 }
1910
grpc_chttp2_reset_ping_clock(grpc_chttp2_transport * t)1911 void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t) {
1912 if (!t->is_client) {
1913 t->ping_abuse_policy.ResetPingStrikes();
1914 }
1915 t->ping_rate_policy.ResetPingsBeforeDataRequired();
1916 }
1917
perform_transport_op_locked(void * stream_op,grpc_error_handle)1918 static void perform_transport_op_locked(void* stream_op,
1919 grpc_error_handle /*error_ignored*/) {
1920 grpc_transport_op* op = static_cast<grpc_transport_op*>(stream_op);
1921 grpc_core::RefCountedPtr<grpc_chttp2_transport> t(
1922 static_cast<grpc_chttp2_transport*>(op->handler_private.extra_arg));
1923
1924 if (!op->goaway_error.ok()) {
1925 send_goaway(t.get(), op->goaway_error, /*immediate_disconnect_hint=*/false);
1926 }
1927
1928 if (op->set_accept_stream) {
1929 t->accept_stream_cb = op->set_accept_stream_fn;
1930 t->accept_stream_cb_user_data = op->set_accept_stream_user_data;
1931 t->registered_method_matcher_cb = op->set_registered_method_matcher_fn;
1932 }
1933
1934 if (op->bind_pollset) {
1935 grpc_endpoint_add_to_pollset(t->ep, op->bind_pollset);
1936 }
1937
1938 if (op->bind_pollset_set) {
1939 grpc_endpoint_add_to_pollset_set(t->ep, op->bind_pollset_set);
1940 }
1941
1942 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1943 send_ping_locked(t.get(), op->send_ping.on_initiate, op->send_ping.on_ack);
1944 grpc_chttp2_initiate_write(t.get(),
1945 GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING);
1946 }
1947
1948 if (op->start_connectivity_watch != nullptr) {
1949 t->state_tracker.AddWatcher(op->start_connectivity_watch_state,
1950 std::move(op->start_connectivity_watch));
1951 }
1952 if (op->stop_connectivity_watch != nullptr) {
1953 t->state_tracker.RemoveWatcher(op->stop_connectivity_watch);
1954 }
1955
1956 if (!op->disconnect_with_error.ok()) {
1957 send_goaway(t.get(), op->disconnect_with_error,
1958 /*immediate_disconnect_hint=*/true);
1959 close_transport_locked(t.get(), op->disconnect_with_error);
1960 }
1961
1962 grpc_core::ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
1963 }
1964
PerformOp(grpc_transport_op * op)1965 void grpc_chttp2_transport::PerformOp(grpc_transport_op* op) {
1966 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
1967 gpr_log(GPR_INFO, "perform_transport_op[t=%p]: %s", this,
1968 grpc_transport_op_string(op).c_str());
1969 }
1970 op->handler_private.extra_arg = this;
1971 Ref().release()->combiner->Run(
1972 GRPC_CLOSURE_INIT(&op->handler_private.closure,
1973 perform_transport_op_locked, op, nullptr),
1974 absl::OkStatus());
1975 }
1976
1977 //
1978 // INPUT PROCESSING - GENERAL
1979 //
1980
grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)1981 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
1982 grpc_chttp2_stream* s) {
1983 if (s->recv_initial_metadata_ready != nullptr &&
1984 s->published_metadata[0] != GRPC_METADATA_NOT_PUBLISHED) {
1985 if (s->seen_error) {
1986 grpc_slice_buffer_reset_and_unref(&s->frame_storage);
1987 }
1988 *s->recv_initial_metadata = std::move(s->initial_metadata_buffer);
1989 s->recv_initial_metadata->Set(grpc_core::PeerString(),
1990 t->peer_string.Ref());
1991 // If we didn't receive initial metadata from the wire and instead faked a
1992 // status (due to stream cancellations for example), let upper layers know
1993 // that trailing metadata is immediately available.
1994 if (s->trailing_metadata_available != nullptr &&
1995 s->published_metadata[0] != GRPC_METADATA_PUBLISHED_FROM_WIRE &&
1996 s->published_metadata[1] == GRPC_METADATA_SYNTHESIZED_FROM_FAKE) {
1997 *s->trailing_metadata_available = true;
1998 s->trailing_metadata_available = nullptr;
1999 }
2000 if (t->registered_method_matcher_cb != nullptr) {
2001 t->registered_method_matcher_cb(t->accept_stream_cb_user_data,
2002 s->recv_initial_metadata);
2003 }
2004 null_then_sched_closure(&s->recv_initial_metadata_ready);
2005 }
2006 }
2007
grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport * t,grpc_chttp2_stream * s)2008 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
2009 grpc_chttp2_stream* s) {
2010 if (s->recv_message_ready == nullptr) return;
2011
2012 grpc_core::chttp2::StreamFlowControl::IncomingUpdateContext upd(
2013 &s->flow_control);
2014 grpc_error_handle error;
2015
2016 // Lambda is immediately invoked as a big scoped section that can be
2017 // exited out of at any point by returning.
2018 [&]() {
2019 if (grpc_http_trace.enabled()) {
2020 gpr_log(GPR_DEBUG,
2021 "maybe_complete_recv_message %p final_metadata_requested=%d "
2022 "seen_error=%d",
2023 s, s->final_metadata_requested, s->seen_error);
2024 }
2025 if (s->final_metadata_requested && s->seen_error) {
2026 grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2027 s->recv_message->reset();
2028 } else {
2029 if (s->frame_storage.length != 0) {
2030 while (true) {
2031 GPR_ASSERT(s->frame_storage.length > 0);
2032 int64_t min_progress_size;
2033 auto r = grpc_deframe_unprocessed_incoming_frames(
2034 s, &min_progress_size, &**s->recv_message, s->recv_message_flags);
2035 if (grpc_http_trace.enabled()) {
2036 gpr_log(GPR_DEBUG, "Deframe data frame: %s",
2037 grpc_core::PollToString(r, [](absl::Status r) {
2038 return r.ToString();
2039 }).c_str());
2040 }
2041 if (r.pending()) {
2042 if (s->read_closed) {
2043 grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2044 s->recv_message->reset();
2045 break;
2046 } else {
2047 upd.SetMinProgressSize(min_progress_size);
2048 return; // Out of lambda to enclosing function
2049 }
2050 } else {
2051 error = std::move(r.value());
2052 if (!error.ok()) {
2053 s->seen_error = true;
2054 grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2055 break;
2056 } else {
2057 if (t->channelz_socket != nullptr) {
2058 t->channelz_socket->RecordMessageReceived();
2059 }
2060 break;
2061 }
2062 }
2063 }
2064 } else if (s->read_closed) {
2065 s->recv_message->reset();
2066 } else {
2067 upd.SetMinProgressSize(GRPC_HEADER_SIZE_IN_BYTES);
2068 return; // Out of lambda to enclosing function
2069 }
2070 }
2071 // save the length of the buffer before handing control back to application
2072 // threads. Needed to support correct flow control bookkeeping
2073 if (error.ok() && s->recv_message->has_value()) {
2074 null_then_sched_closure(&s->recv_message_ready);
2075 } else if (s->published_metadata[1] != GRPC_METADATA_NOT_PUBLISHED) {
2076 if (s->call_failed_before_recv_message != nullptr) {
2077 *s->call_failed_before_recv_message =
2078 (s->published_metadata[1] != GRPC_METADATA_PUBLISHED_AT_CLOSE);
2079 }
2080 null_then_sched_closure(&s->recv_message_ready);
2081 }
2082 }();
2083
2084 upd.SetPendingSize(s->frame_storage.length);
2085 grpc_chttp2_act_on_flowctl_action(upd.MakeAction(), t, s);
2086 }
2087
grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport * t,grpc_chttp2_stream * s)2088 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
2089 grpc_chttp2_stream* s) {
2090 grpc_chttp2_maybe_complete_recv_message(t, s);
2091 if (grpc_http_trace.enabled()) {
2092 gpr_log(GPR_DEBUG,
2093 "maybe_complete_recv_trailing_metadata cli=%d s=%p closure=%p "
2094 "read_closed=%d "
2095 "write_closed=%d %" PRIdPTR,
2096 t->is_client, s, s->recv_trailing_metadata_finished, s->read_closed,
2097 s->write_closed, s->frame_storage.length);
2098 }
2099 if (s->recv_trailing_metadata_finished != nullptr && s->read_closed &&
2100 s->write_closed) {
2101 if (s->seen_error || !t->is_client) {
2102 grpc_slice_buffer_reset_and_unref(&s->frame_storage);
2103 }
2104 if (s->read_closed && s->frame_storage.length == 0 &&
2105 s->recv_trailing_metadata_finished != nullptr) {
2106 grpc_transport_move_stats(&s->stats, s->collecting_stats);
2107 s->collecting_stats = nullptr;
2108 *s->recv_trailing_metadata = std::move(s->trailing_metadata_buffer);
2109 null_then_sched_closure(&s->recv_trailing_metadata_finished);
2110 }
2111 }
2112 }
2113
remove_stream(grpc_chttp2_transport * t,uint32_t id,grpc_error_handle error)2114 static grpc_chttp2_transport::RemovedStreamHandle remove_stream(
2115 grpc_chttp2_transport* t, uint32_t id, grpc_error_handle error) {
2116 grpc_chttp2_stream* s = t->stream_map.extract(id).mapped();
2117 GPR_DEBUG_ASSERT(s);
2118 if (t->incoming_stream == s) {
2119 t->incoming_stream = nullptr;
2120 grpc_chttp2_parsing_become_skip_parser(t);
2121 }
2122
2123 if (t->stream_map.empty()) {
2124 post_benign_reclaimer(t);
2125 if (t->sent_goaway_state == GRPC_CHTTP2_FINAL_GOAWAY_SENT) {
2126 close_transport_locked(
2127 t, GRPC_ERROR_CREATE_REFERENCING(
2128 "Last stream closed after sending GOAWAY", &error, 1));
2129 }
2130 }
2131 if (grpc_chttp2_list_remove_writable_stream(t, s)) {
2132 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2_writing:remove_stream");
2133 }
2134 grpc_chttp2_list_remove_stalled_by_stream(t, s);
2135 grpc_chttp2_list_remove_stalled_by_transport(t, s);
2136
2137 maybe_start_some_streams(t);
2138
2139 if (t->is_client) return grpc_chttp2_transport::RemovedStreamHandle();
2140 return grpc_chttp2_transport::RemovedStreamHandle(t->Ref());
2141 }
2142
2143 namespace grpc_core {
2144 namespace {
2145
TarpitDuration(grpc_chttp2_transport * t)2146 Duration TarpitDuration(grpc_chttp2_transport* t) {
2147 return Duration::Milliseconds(absl::LogUniform<int>(
2148 absl::BitGen(), t->min_tarpit_duration_ms, t->max_tarpit_duration_ms));
2149 }
2150
2151 template <typename F>
MaybeTarpit(grpc_chttp2_transport * t,bool tarpit,F fn)2152 void MaybeTarpit(grpc_chttp2_transport* t, bool tarpit, F fn) {
2153 if (!tarpit || !t->allow_tarpit || t->is_client) {
2154 fn(t);
2155 return;
2156 }
2157 const auto duration = TarpitDuration(t);
2158 t->event_engine->RunAfter(
2159 duration, [t = t->Ref(), fn = std::move(fn)]() mutable {
2160 ApplicationCallbackExecCtx app_exec_ctx;
2161 ExecCtx exec_ctx;
2162 t->combiner->Run(
2163 NewClosure([t, fn = std::move(fn)](grpc_error_handle) mutable {
2164 // TODO(ctiller): this can result in not sending RST_STREAMS if a
2165 // request gets tarpit behind a transport close.
2166 if (!t->closed_with_error.ok()) return;
2167 fn(t.get());
2168 }),
2169 absl::OkStatus());
2170 });
2171 }
2172
2173 } // namespace
2174 } // namespace grpc_core
2175
grpc_chttp2_cancel_stream(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle due_to_error,bool tarpit)2176 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2177 grpc_error_handle due_to_error, bool tarpit) {
2178 if (!t->is_client && !s->sent_trailing_metadata &&
2179 grpc_error_has_clear_grpc_status(due_to_error) &&
2180 !(s->read_closed && s->write_closed)) {
2181 close_from_api(t, s, due_to_error, tarpit);
2182 return;
2183 }
2184
2185 if (!due_to_error.ok() && !s->seen_error) {
2186 s->seen_error = true;
2187 }
2188 if (!s->read_closed || !s->write_closed) {
2189 if (s->id != 0) {
2190 grpc_http2_error_code http_error;
2191 grpc_error_get_status(due_to_error, s->deadline, nullptr, nullptr,
2192 &http_error, nullptr);
2193 grpc_core::MaybeTarpit(
2194 t, tarpit,
2195 [id = s->id, http_error,
2196 remove_stream_handle = grpc_chttp2_mark_stream_closed(
2197 t, s, 1, 1, due_to_error)](grpc_chttp2_transport* t) {
2198 grpc_chttp2_add_rst_stream_to_next_write(
2199 t, id, static_cast<uint32_t>(http_error), nullptr);
2200 grpc_chttp2_initiate_write(t,
2201 GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM);
2202 });
2203 return;
2204 }
2205 }
2206 grpc_chttp2_mark_stream_closed(t, s, 1, 1, due_to_error);
2207 }
2208
grpc_chttp2_fake_status(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2209 void grpc_chttp2_fake_status(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2210 grpc_error_handle error) {
2211 grpc_status_code status;
2212 std::string message;
2213 grpc_error_get_status(error, s->deadline, &status, &message, nullptr,
2214 nullptr);
2215 if (status != GRPC_STATUS_OK) {
2216 s->seen_error = true;
2217 }
2218 // stream_global->recv_trailing_metadata_finished gives us a
2219 // last chance replacement: we've received trailing metadata,
2220 // but something more important has become available to signal
2221 // to the upper layers - drop what we've got, and then publish
2222 // what we want - which is safe because we haven't told anyone
2223 // about the metadata yet
2224 if (s->published_metadata[1] == GRPC_METADATA_NOT_PUBLISHED ||
2225 s->recv_trailing_metadata_finished != nullptr ||
2226 !s->final_metadata_requested) {
2227 s->trailing_metadata_buffer.Set(grpc_core::GrpcStatusMetadata(), status);
2228 if (!message.empty()) {
2229 s->trailing_metadata_buffer.Set(
2230 grpc_core::GrpcMessageMetadata(),
2231 grpc_core::Slice::FromCopiedBuffer(message));
2232 }
2233 s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE;
2234 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2235 }
2236 }
2237
add_error(grpc_error_handle error,grpc_error_handle * refs,size_t * nrefs)2238 static void add_error(grpc_error_handle error, grpc_error_handle* refs,
2239 size_t* nrefs) {
2240 if (error.ok()) return;
2241 for (size_t i = 0; i < *nrefs; i++) {
2242 if (error == refs[i]) {
2243 return;
2244 }
2245 }
2246 refs[*nrefs] = error;
2247 ++*nrefs;
2248 }
2249
removal_error(grpc_error_handle extra_error,grpc_chttp2_stream * s,const char * main_error_msg)2250 static grpc_error_handle removal_error(grpc_error_handle extra_error,
2251 grpc_chttp2_stream* s,
2252 const char* main_error_msg) {
2253 grpc_error_handle refs[3];
2254 size_t nrefs = 0;
2255 add_error(s->read_closed_error, refs, &nrefs);
2256 add_error(s->write_closed_error, refs, &nrefs);
2257 add_error(extra_error, refs, &nrefs);
2258 grpc_error_handle error;
2259 if (nrefs > 0) {
2260 error = GRPC_ERROR_CREATE_REFERENCING(main_error_msg, refs, nrefs);
2261 }
2262 return error;
2263 }
2264
flush_write_list(grpc_chttp2_transport * t,grpc_chttp2_write_cb ** list,grpc_error_handle error)2265 static void flush_write_list(grpc_chttp2_transport* t,
2266 grpc_chttp2_write_cb** list,
2267 grpc_error_handle error) {
2268 while (*list) {
2269 grpc_chttp2_write_cb* cb = *list;
2270 *list = cb->next;
2271 grpc_chttp2_complete_closure_step(t, &cb->closure, error,
2272 "on_write_finished_cb");
2273 cb->next = t->write_cb_pool;
2274 t->write_cb_pool = cb;
2275 }
2276 }
2277
grpc_chttp2_fail_pending_writes(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error)2278 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
2279 grpc_chttp2_stream* s,
2280 grpc_error_handle error) {
2281 error =
2282 removal_error(error, s, "Pending writes failed due to stream closure");
2283 s->send_initial_metadata = nullptr;
2284 grpc_chttp2_complete_closure_step(t, &s->send_initial_metadata_finished,
2285 error, "send_initial_metadata_finished");
2286
2287 s->send_trailing_metadata = nullptr;
2288 s->sent_trailing_metadata_op = nullptr;
2289 grpc_chttp2_complete_closure_step(t, &s->send_trailing_metadata_finished,
2290 error, "send_trailing_metadata_finished");
2291
2292 grpc_chttp2_complete_closure_step(t, &s->send_message_finished, error,
2293 "fetching_send_message_finished");
2294 flush_write_list(t, &s->on_write_finished_cbs, error);
2295 flush_write_list(t, &s->on_flow_controlled_cbs, error);
2296 }
2297
grpc_chttp2_mark_stream_closed(grpc_chttp2_transport * t,grpc_chttp2_stream * s,int close_reads,int close_writes,grpc_error_handle error)2298 grpc_chttp2_transport::RemovedStreamHandle grpc_chttp2_mark_stream_closed(
2299 grpc_chttp2_transport* t, grpc_chttp2_stream* s, int close_reads,
2300 int close_writes, grpc_error_handle error) {
2301 grpc_chttp2_transport::RemovedStreamHandle rsh;
2302 if (grpc_http_trace.enabled()) {
2303 gpr_log(
2304 GPR_DEBUG, "MARK_STREAM_CLOSED: t=%p s=%p(id=%d) %s [%s]", t, s, s->id,
2305 (close_reads && close_writes)
2306 ? "read+write"
2307 : (close_reads ? "read" : (close_writes ? "write" : "nothing??")),
2308 grpc_core::StatusToString(error).c_str());
2309 }
2310 if (s->read_closed && s->write_closed) {
2311 // already closed, but we should still fake the status if needed.
2312 grpc_error_handle overall_error = removal_error(error, s, "Stream removed");
2313 if (!overall_error.ok()) {
2314 grpc_chttp2_fake_status(t, s, overall_error);
2315 }
2316 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2317 return rsh;
2318 }
2319 bool closed_read = false;
2320 bool became_closed = false;
2321 if (close_reads && !s->read_closed) {
2322 s->read_closed_error = error;
2323 s->read_closed = true;
2324 closed_read = true;
2325 }
2326 if (close_writes && !s->write_closed) {
2327 s->write_closed_error = error;
2328 s->write_closed = true;
2329 grpc_chttp2_fail_pending_writes(t, s, error);
2330 }
2331 if (s->read_closed && s->write_closed) {
2332 became_closed = true;
2333 grpc_error_handle overall_error = removal_error(error, s, "Stream removed");
2334 if (s->id != 0) {
2335 rsh = remove_stream(t, s->id, overall_error);
2336 } else {
2337 // Purge streams waiting on concurrency still waiting for id assignment
2338 grpc_chttp2_list_remove_waiting_for_concurrency(t, s);
2339 }
2340 if (!overall_error.ok()) {
2341 grpc_chttp2_fake_status(t, s, overall_error);
2342 }
2343 }
2344 if (closed_read) {
2345 for (int i = 0; i < 2; i++) {
2346 if (s->published_metadata[i] == GRPC_METADATA_NOT_PUBLISHED) {
2347 s->published_metadata[i] = GRPC_METADATA_PUBLISHED_AT_CLOSE;
2348 }
2349 }
2350 grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
2351 grpc_chttp2_maybe_complete_recv_message(t, s);
2352 }
2353 if (became_closed) {
2354 s->stats.latency =
2355 gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), s->creation_time);
2356 grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
2357 GRPC_CHTTP2_STREAM_UNREF(s, "chttp2");
2358 }
2359 return rsh;
2360 }
2361
close_from_api(grpc_chttp2_transport * t,grpc_chttp2_stream * s,grpc_error_handle error,bool tarpit)2362 static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
2363 grpc_error_handle error, bool tarpit) {
2364 grpc_status_code grpc_status;
2365 std::string message;
2366 grpc_error_get_status(error, s->deadline, &grpc_status, &message, nullptr,
2367 nullptr);
2368
2369 GPR_ASSERT(grpc_status >= 0 && (int)grpc_status < 100);
2370
2371 auto remove_stream_handle = grpc_chttp2_mark_stream_closed(t, s, 1, 1, error);
2372 grpc_core::MaybeTarpit(
2373 t, tarpit,
2374 [error = std::move(error),
2375 sent_initial_metadata = s->sent_initial_metadata, id = s->id,
2376 grpc_status, message = std::move(message),
2377 remove_stream_handle =
2378 std::move(remove_stream_handle)](grpc_chttp2_transport* t) mutable {
2379 grpc_slice hdr;
2380 grpc_slice status_hdr;
2381 grpc_slice http_status_hdr;
2382 grpc_slice content_type_hdr;
2383 grpc_slice message_pfx;
2384 uint8_t* p;
2385 uint32_t len = 0;
2386
2387 // Hand roll a header block.
2388 // This is unnecessarily ugly - at some point we should find a more
2389 // elegant solution.
2390 // It's complicated by the fact that our send machinery would be dead
2391 // by the time we got around to sending this, so instead we ignore
2392 // HPACK compression and just write the uncompressed bytes onto the
2393 // wire.
2394 if (!sent_initial_metadata) {
2395 http_status_hdr = GRPC_SLICE_MALLOC(13);
2396 p = GRPC_SLICE_START_PTR(http_status_hdr);
2397 *p++ = 0x00;
2398 *p++ = 7;
2399 *p++ = ':';
2400 *p++ = 's';
2401 *p++ = 't';
2402 *p++ = 'a';
2403 *p++ = 't';
2404 *p++ = 'u';
2405 *p++ = 's';
2406 *p++ = 3;
2407 *p++ = '2';
2408 *p++ = '0';
2409 *p++ = '0';
2410 GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr));
2411 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(http_status_hdr);
2412
2413 content_type_hdr = GRPC_SLICE_MALLOC(31);
2414 p = GRPC_SLICE_START_PTR(content_type_hdr);
2415 *p++ = 0x00;
2416 *p++ = 12;
2417 *p++ = 'c';
2418 *p++ = 'o';
2419 *p++ = 'n';
2420 *p++ = 't';
2421 *p++ = 'e';
2422 *p++ = 'n';
2423 *p++ = 't';
2424 *p++ = '-';
2425 *p++ = 't';
2426 *p++ = 'y';
2427 *p++ = 'p';
2428 *p++ = 'e';
2429 *p++ = 16;
2430 *p++ = 'a';
2431 *p++ = 'p';
2432 *p++ = 'p';
2433 *p++ = 'l';
2434 *p++ = 'i';
2435 *p++ = 'c';
2436 *p++ = 'a';
2437 *p++ = 't';
2438 *p++ = 'i';
2439 *p++ = 'o';
2440 *p++ = 'n';
2441 *p++ = '/';
2442 *p++ = 'g';
2443 *p++ = 'r';
2444 *p++ = 'p';
2445 *p++ = 'c';
2446 GPR_ASSERT(p == GRPC_SLICE_END_PTR(content_type_hdr));
2447 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(content_type_hdr);
2448 }
2449
2450 status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10));
2451 p = GRPC_SLICE_START_PTR(status_hdr);
2452 *p++ = 0x00; // literal header, not indexed
2453 *p++ = 11; // len(grpc-status)
2454 *p++ = 'g';
2455 *p++ = 'r';
2456 *p++ = 'p';
2457 *p++ = 'c';
2458 *p++ = '-';
2459 *p++ = 's';
2460 *p++ = 't';
2461 *p++ = 'a';
2462 *p++ = 't';
2463 *p++ = 'u';
2464 *p++ = 's';
2465 if (grpc_status < 10) {
2466 *p++ = 1;
2467 *p++ = static_cast<uint8_t>('0' + grpc_status);
2468 } else {
2469 *p++ = 2;
2470 *p++ = static_cast<uint8_t>('0' + (grpc_status / 10));
2471 *p++ = static_cast<uint8_t>('0' + (grpc_status % 10));
2472 }
2473 GPR_ASSERT(p == GRPC_SLICE_END_PTR(status_hdr));
2474 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(status_hdr);
2475
2476 size_t msg_len = message.length();
2477 GPR_ASSERT(msg_len <= UINT32_MAX);
2478 grpc_core::VarintWriter<1> msg_len_writer(
2479 static_cast<uint32_t>(msg_len));
2480 message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_writer.length());
2481 p = GRPC_SLICE_START_PTR(message_pfx);
2482 *p++ = 0x00; // literal header, not indexed
2483 *p++ = 12; // len(grpc-message)
2484 *p++ = 'g';
2485 *p++ = 'r';
2486 *p++ = 'p';
2487 *p++ = 'c';
2488 *p++ = '-';
2489 *p++ = 'm';
2490 *p++ = 'e';
2491 *p++ = 's';
2492 *p++ = 's';
2493 *p++ = 'a';
2494 *p++ = 'g';
2495 *p++ = 'e';
2496 msg_len_writer.Write(0, p);
2497 p += msg_len_writer.length();
2498 GPR_ASSERT(p == GRPC_SLICE_END_PTR(message_pfx));
2499 len += static_cast<uint32_t> GRPC_SLICE_LENGTH(message_pfx);
2500 len += static_cast<uint32_t>(msg_len);
2501
2502 hdr = GRPC_SLICE_MALLOC(9);
2503 p = GRPC_SLICE_START_PTR(hdr);
2504 *p++ = static_cast<uint8_t>(len >> 16);
2505 *p++ = static_cast<uint8_t>(len >> 8);
2506 *p++ = static_cast<uint8_t>(len);
2507 *p++ = GRPC_CHTTP2_FRAME_HEADER;
2508 *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM |
2509 GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
2510 *p++ = static_cast<uint8_t>(id >> 24);
2511 *p++ = static_cast<uint8_t>(id >> 16);
2512 *p++ = static_cast<uint8_t>(id >> 8);
2513 *p++ = static_cast<uint8_t>(id);
2514 GPR_ASSERT(p == GRPC_SLICE_END_PTR(hdr));
2515
2516 grpc_slice_buffer_add(&t->qbuf, hdr);
2517 if (!sent_initial_metadata) {
2518 grpc_slice_buffer_add(&t->qbuf, http_status_hdr);
2519 grpc_slice_buffer_add(&t->qbuf, content_type_hdr);
2520 }
2521 grpc_slice_buffer_add(&t->qbuf, status_hdr);
2522 grpc_slice_buffer_add(&t->qbuf, message_pfx);
2523 grpc_slice_buffer_add(&t->qbuf,
2524 grpc_slice_from_cpp_string(std::move(message)));
2525 grpc_chttp2_reset_ping_clock(t);
2526 grpc_chttp2_add_rst_stream_to_next_write(t, id, GRPC_HTTP2_NO_ERROR,
2527 nullptr);
2528
2529 grpc_chttp2_initiate_write(t,
2530 GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API);
2531 });
2532 }
2533
end_all_the_calls(grpc_chttp2_transport * t,grpc_error_handle error)2534 static void end_all_the_calls(grpc_chttp2_transport* t,
2535 grpc_error_handle error) {
2536 intptr_t http2_error;
2537 // If there is no explicit grpc or HTTP/2 error, set to UNAVAILABLE on server.
2538 if (!t->is_client && !grpc_error_has_clear_grpc_status(error) &&
2539 !grpc_error_get_int(error, grpc_core::StatusIntProperty::kHttp2Error,
2540 &http2_error)) {
2541 error = grpc_error_set_int(error, grpc_core::StatusIntProperty::kRpcStatus,
2542 GRPC_STATUS_UNAVAILABLE);
2543 }
2544 cancel_unstarted_streams(t, error, false);
2545 std::vector<grpc_chttp2_stream*> to_cancel;
2546 for (auto id_stream : t->stream_map) {
2547 to_cancel.push_back(id_stream.second);
2548 }
2549 for (auto s : to_cancel) {
2550 grpc_chttp2_cancel_stream(t, s, error, false);
2551 }
2552 }
2553
2554 //
2555 // INPUT PROCESSING - PARSING
2556 //
2557
2558 template <class F>
WithUrgency(grpc_chttp2_transport * t,grpc_core::chttp2::FlowControlAction::Urgency urgency,grpc_chttp2_initiate_write_reason reason,F action)2559 static void WithUrgency(grpc_chttp2_transport* t,
2560 grpc_core::chttp2::FlowControlAction::Urgency urgency,
2561 grpc_chttp2_initiate_write_reason reason, F action) {
2562 switch (urgency) {
2563 case grpc_core::chttp2::FlowControlAction::Urgency::NO_ACTION_NEEDED:
2564 break;
2565 case grpc_core::chttp2::FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
2566 grpc_chttp2_initiate_write(t, reason);
2567 ABSL_FALLTHROUGH_INTENDED;
2568 case grpc_core::chttp2::FlowControlAction::Urgency::QUEUE_UPDATE:
2569 action();
2570 break;
2571 }
2572 }
2573
grpc_chttp2_act_on_flowctl_action(const grpc_core::chttp2::FlowControlAction & action,grpc_chttp2_transport * t,grpc_chttp2_stream * s)2574 void grpc_chttp2_act_on_flowctl_action(
2575 const grpc_core::chttp2::FlowControlAction& action,
2576 grpc_chttp2_transport* t, grpc_chttp2_stream* s) {
2577 WithUrgency(t, action.send_stream_update(),
2578 GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL, [t, s]() {
2579 if (s->id != 0 && !s->read_closed) {
2580 grpc_chttp2_mark_stream_writable(t, s);
2581 }
2582 });
2583 WithUrgency(t, action.send_transport_update(),
2584 GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
2585 WithUrgency(t, action.send_initial_window_update(),
2586 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2587 t->settings.mutable_local().SetInitialWindowSize(
2588 action.initial_window_size());
2589 });
2590 WithUrgency(
2591 t, action.send_max_frame_size_update(),
2592 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2593 t->settings.mutable_local().SetMaxFrameSize(action.max_frame_size());
2594 });
2595 if (t->enable_preferred_rx_crypto_frame_advertisement) {
2596 WithUrgency(
2597 t, action.preferred_rx_crypto_frame_size_update(),
2598 GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
2599 t->settings.mutable_local().SetPreferredReceiveCryptoMessageSize(
2600 action.preferred_rx_crypto_frame_size());
2601 });
2602 }
2603 }
2604
try_http_parsing(grpc_chttp2_transport * t)2605 static grpc_error_handle try_http_parsing(grpc_chttp2_transport* t) {
2606 grpc_http_parser parser;
2607 size_t i = 0;
2608 grpc_error_handle error;
2609 grpc_http_response response;
2610
2611 grpc_http_parser_init(&parser, GRPC_HTTP_RESPONSE, &response);
2612
2613 grpc_error_handle parse_error;
2614 for (; i < t->read_buffer.count && parse_error.ok(); i++) {
2615 parse_error =
2616 grpc_http_parser_parse(&parser, t->read_buffer.slices[i], nullptr);
2617 }
2618 if (parse_error.ok() &&
2619 (parse_error = grpc_http_parser_eof(&parser)) == absl::OkStatus()) {
2620 error = grpc_error_set_int(
2621 grpc_error_set_int(
2622 GRPC_ERROR_CREATE("Trying to connect an http1.x server"),
2623 grpc_core::StatusIntProperty::kHttpStatus, response.status),
2624 grpc_core::StatusIntProperty::kRpcStatus,
2625 grpc_http2_status_to_grpc_status(response.status));
2626 }
2627
2628 grpc_http_parser_destroy(&parser);
2629 grpc_http_response_destroy(&response);
2630 return error;
2631 }
2632
read_action(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2633 static void read_action(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2634 grpc_error_handle error) {
2635 auto* tp = t.get();
2636 tp->combiner->Run(grpc_core::InitTransportClosure<read_action_locked>(
2637 std::move(t), &tp->read_action_locked),
2638 error);
2639 }
2640
read_action_parse_loop_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2641 static void read_action_parse_loop_locked(
2642 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2643 grpc_error_handle error) {
2644 if (t->closed_with_error.ok()) {
2645 grpc_error_handle errors[3] = {error, absl::OkStatus(), absl::OkStatus()};
2646 size_t requests_started = 0;
2647 for (size_t i = 0;
2648 i < t->read_buffer.count && errors[1] == absl::OkStatus(); i++) {
2649 auto r = grpc_chttp2_perform_read(t.get(), t->read_buffer.slices[i],
2650 requests_started);
2651 if (auto* partial_read_size = absl::get_if<size_t>(&r)) {
2652 for (size_t j = 0; j < i; j++) {
2653 grpc_core::CSliceUnref(grpc_slice_buffer_take_first(&t->read_buffer));
2654 }
2655 grpc_slice_buffer_sub_first(
2656 &t->read_buffer, *partial_read_size,
2657 GRPC_SLICE_LENGTH(t->read_buffer.slices[0]));
2658 t->combiner->ForceOffload();
2659 auto* tp = t.get();
2660 tp->combiner->Run(
2661 grpc_core::InitTransportClosure<read_action_parse_loop_locked>(
2662 std::move(t), &tp->read_action_locked),
2663 std::move(errors[0]));
2664 // Early return: we queued to retry later.
2665 return;
2666 } else {
2667 errors[1] = std::move(absl::get<absl::Status>(r));
2668 }
2669 }
2670 if (errors[1] != absl::OkStatus()) {
2671 errors[2] = try_http_parsing(t.get());
2672 error = GRPC_ERROR_CREATE_REFERENCING("Failed parsing HTTP/2", errors,
2673 GPR_ARRAY_SIZE(errors));
2674 }
2675
2676 if (t->initial_window_update != 0) {
2677 if (t->initial_window_update > 0) {
2678 grpc_chttp2_stream* s;
2679 while (grpc_chttp2_list_pop_stalled_by_stream(t.get(), &s)) {
2680 grpc_chttp2_mark_stream_writable(t.get(), s);
2681 grpc_chttp2_initiate_write(
2682 t.get(),
2683 GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING);
2684 }
2685 }
2686 t->initial_window_update = 0;
2687 }
2688 }
2689
2690 bool keep_reading = false;
2691 if (error.ok() && !t->closed_with_error.ok()) {
2692 error = GRPC_ERROR_CREATE_REFERENCING("Transport closed",
2693 &t->closed_with_error, 1);
2694 }
2695 if (!error.ok()) {
2696 // If a goaway frame was received, this might be the reason why the read
2697 // failed. Add this info to the error
2698 if (!t->goaway_error.ok()) {
2699 error = grpc_error_add_child(error, t->goaway_error);
2700 }
2701
2702 close_transport_locked(t.get(), error);
2703 } else if (t->closed_with_error.ok()) {
2704 keep_reading = true;
2705 // Since we have read a byte, reset the keepalive timer
2706 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2707 maybe_reset_keepalive_ping_timer_locked(t.get());
2708 }
2709 }
2710 grpc_slice_buffer_reset_and_unref(&t->read_buffer);
2711
2712 if (keep_reading) {
2713 if (t->num_pending_induced_frames >= DEFAULT_MAX_PENDING_INDUCED_FRAMES) {
2714 t->reading_paused_on_pending_induced_frames = true;
2715 GRPC_CHTTP2_IF_TRACING(
2716 gpr_log(GPR_INFO,
2717 "transport %p : Pausing reading due to too "
2718 "many unwritten SETTINGS ACK and RST_STREAM frames",
2719 t.get()));
2720 } else {
2721 continue_read_action_locked(std::move(t));
2722 }
2723 }
2724 }
2725
read_action_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2726 static void read_action_locked(
2727 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2728 grpc_error_handle error) {
2729 // got an incoming read, cancel any pending keepalive timers
2730 t->keepalive_incoming_data_wanted = false;
2731 if (t->keepalive_ping_timeout_handle != TaskHandle::kInvalid) {
2732 if (GRPC_TRACE_FLAG_ENABLED(grpc_ping_trace) ||
2733 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2734 gpr_log(GPR_INFO,
2735 "%s[%p]: Clear keepalive timer because data was received",
2736 t->is_client ? "CLIENT" : "SERVER", t.get());
2737 }
2738 t->event_engine->Cancel(
2739 std::exchange(t->keepalive_ping_timeout_handle, TaskHandle::kInvalid));
2740 }
2741 grpc_error_handle err = error;
2742 if (!err.ok()) {
2743 err = grpc_error_set_int(
2744 GRPC_ERROR_CREATE_REFERENCING("Endpoint read failed", &err, 1),
2745 grpc_core::StatusIntProperty::kOccurredDuringWrite, t->write_state);
2746 }
2747 std::swap(err, error);
2748 read_action_parse_loop_locked(std::move(t), std::move(err));
2749 }
2750
continue_read_action_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)2751 static void continue_read_action_locked(
2752 grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
2753 const bool urgent = !t->goaway_error.ok();
2754 auto* tp = t.get();
2755 grpc_endpoint_read(tp->ep, &tp->read_buffer,
2756 grpc_core::InitTransportClosure<read_action>(
2757 std::move(t), &tp->read_action_locked),
2758 urgent, grpc_chttp2_min_read_progress_size(tp));
2759 }
2760
2761 // t is reffed prior to calling the first time, and once the callback chain
2762 // that kicks off finishes, it's unreffed
schedule_bdp_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)2763 void schedule_bdp_ping_locked(
2764 grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
2765 auto* tp = t.get();
2766 tp->flow_control.bdp_estimator()->SchedulePing();
2767 send_ping_locked(tp,
2768 grpc_core::InitTransportClosure<start_bdp_ping>(
2769 tp->Ref(), &tp->start_bdp_ping_locked),
2770 grpc_core::InitTransportClosure<finish_bdp_ping>(
2771 std::move(t), &tp->finish_bdp_ping_locked));
2772 grpc_chttp2_initiate_write(tp, GRPC_CHTTP2_INITIATE_WRITE_BDP_PING);
2773 }
2774
start_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2775 static void start_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2776 grpc_error_handle error) {
2777 grpc_chttp2_transport* tp = t.get();
2778 tp->combiner->Run(grpc_core::InitTransportClosure<start_bdp_ping_locked>(
2779 std::move(t), &tp->start_bdp_ping_locked),
2780 error);
2781 }
2782
start_bdp_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2783 static void start_bdp_ping_locked(
2784 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2785 grpc_error_handle error) {
2786 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2787 gpr_log(GPR_INFO, "%s: Start BDP ping err=%s",
2788 std::string(t->peer_string.as_string_view()).c_str(),
2789 grpc_core::StatusToString(error).c_str());
2790 }
2791 if (!error.ok() || !t->closed_with_error.ok()) {
2792 return;
2793 }
2794 // Reset the keepalive ping timer
2795 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING) {
2796 maybe_reset_keepalive_ping_timer_locked(t.get());
2797 }
2798 t->flow_control.bdp_estimator()->StartPing();
2799 t->bdp_ping_started = true;
2800 }
2801
finish_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2802 static void finish_bdp_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2803 grpc_error_handle error) {
2804 grpc_chttp2_transport* tp = t.get();
2805 tp->combiner->Run(grpc_core::InitTransportClosure<finish_bdp_ping_locked>(
2806 std::move(t), &tp->finish_bdp_ping_locked),
2807 error);
2808 }
2809
finish_bdp_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2810 static void finish_bdp_ping_locked(
2811 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2812 grpc_error_handle error) {
2813 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
2814 gpr_log(GPR_INFO, "%s: Complete BDP ping err=%s",
2815 std::string(t->peer_string.as_string_view()).c_str(),
2816 grpc_core::StatusToString(error).c_str());
2817 }
2818 if (!error.ok() || !t->closed_with_error.ok()) {
2819 return;
2820 }
2821 if (!t->bdp_ping_started) {
2822 // start_bdp_ping_locked has not been run yet. Schedule
2823 // finish_bdp_ping_locked to be run later.
2824 finish_bdp_ping(std::move(t), std::move(error));
2825 return;
2826 }
2827 t->bdp_ping_started = false;
2828 grpc_core::Timestamp next_ping =
2829 t->flow_control.bdp_estimator()->CompletePing();
2830 grpc_chttp2_act_on_flowctl_action(t->flow_control.PeriodicUpdate(), t.get(),
2831 nullptr);
2832 GPR_ASSERT(t->next_bdp_ping_timer_handle == TaskHandle::kInvalid);
2833 t->next_bdp_ping_timer_handle =
2834 t->event_engine->RunAfter(next_ping - grpc_core::Timestamp::Now(), [t] {
2835 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2836 grpc_core::ExecCtx exec_ctx;
2837 next_bdp_ping_timer_expired(t.get());
2838 });
2839 }
2840
next_bdp_ping_timer_expired(grpc_chttp2_transport * t)2841 static void next_bdp_ping_timer_expired(grpc_chttp2_transport* t) {
2842 t->combiner->Run(
2843 grpc_core::InitTransportClosure<next_bdp_ping_timer_expired_locked>(
2844 t->Ref(), &t->next_bdp_ping_timer_expired_locked),
2845 absl::OkStatus());
2846 }
2847
next_bdp_ping_timer_expired_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,GRPC_UNUSED grpc_error_handle error)2848 static void next_bdp_ping_timer_expired_locked(
2849 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2850 GRPC_UNUSED grpc_error_handle error) {
2851 GPR_DEBUG_ASSERT(error.ok());
2852 t->next_bdp_ping_timer_handle = TaskHandle::kInvalid;
2853 if (t->flow_control.bdp_estimator()->accumulator() == 0) {
2854 // Block the bdp ping till we receive more data.
2855 t->bdp_ping_blocked = true;
2856 } else {
2857 schedule_bdp_ping_locked(std::move(t));
2858 }
2859 }
2860
grpc_chttp2_config_default_keepalive_args(grpc_channel_args * args,bool is_client)2861 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
2862 bool is_client) {
2863 grpc_chttp2_config_default_keepalive_args(grpc_core::ChannelArgs::FromC(args),
2864 is_client);
2865 }
2866
grpc_chttp2_config_default_keepalive_args(const grpc_core::ChannelArgs & channel_args,bool is_client)2867 void grpc_chttp2_config_default_keepalive_args(
2868 const grpc_core::ChannelArgs& channel_args, bool is_client) {
2869 const auto keepalive_time =
2870 std::max(grpc_core::Duration::Milliseconds(1),
2871 channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIME_MS)
2872 .value_or(is_client ? g_default_client_keepalive_time
2873 : g_default_server_keepalive_time));
2874 if (is_client) {
2875 g_default_client_keepalive_time = keepalive_time;
2876 } else {
2877 g_default_server_keepalive_time = keepalive_time;
2878 }
2879
2880 const auto keepalive_timeout = std::max(
2881 grpc_core::Duration::Zero(),
2882 channel_args.GetDurationFromIntMillis(GRPC_ARG_KEEPALIVE_TIMEOUT_MS)
2883 .value_or(is_client ? g_default_client_keepalive_timeout
2884 : g_default_server_keepalive_timeout));
2885 if (is_client) {
2886 g_default_client_keepalive_timeout = keepalive_timeout;
2887 } else {
2888 g_default_server_keepalive_timeout = keepalive_timeout;
2889 }
2890
2891 const bool keepalive_permit_without_calls =
2892 channel_args.GetBool(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS)
2893 .value_or(is_client
2894 ? g_default_client_keepalive_permit_without_calls
2895 : g_default_server_keepalive_permit_without_calls);
2896 if (is_client) {
2897 g_default_client_keepalive_permit_without_calls =
2898 keepalive_permit_without_calls;
2899 } else {
2900 g_default_server_keepalive_permit_without_calls =
2901 keepalive_permit_without_calls;
2902 }
2903
2904 grpc_core::Chttp2PingAbusePolicy::SetDefaults(channel_args);
2905 grpc_core::Chttp2PingRatePolicy::SetDefaults(channel_args);
2906 }
2907
init_keepalive_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t)2908 static void init_keepalive_ping(
2909 grpc_core::RefCountedPtr<grpc_chttp2_transport> t) {
2910 auto* tp = t.get();
2911 tp->combiner->Run(grpc_core::InitTransportClosure<init_keepalive_ping_locked>(
2912 std::move(t), &tp->init_keepalive_ping_locked),
2913 absl::OkStatus());
2914 }
2915
init_keepalive_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,GRPC_UNUSED grpc_error_handle error)2916 static void init_keepalive_ping_locked(
2917 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2918 GRPC_UNUSED grpc_error_handle error) {
2919 GPR_DEBUG_ASSERT(error.ok());
2920 GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING);
2921 GPR_ASSERT(t->keepalive_ping_timer_handle != TaskHandle::kInvalid);
2922 t->keepalive_ping_timer_handle = TaskHandle::kInvalid;
2923 if (t->destroying || !t->closed_with_error.ok()) {
2924 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING;
2925 } else {
2926 if (t->keepalive_permit_without_calls || !t->stream_map.empty()) {
2927 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING;
2928 send_keepalive_ping_locked(t);
2929 grpc_chttp2_initiate_write(t.get(),
2930 GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING);
2931 } else {
2932 t->keepalive_ping_timer_handle =
2933 t->event_engine->RunAfter(t->keepalive_time, [t] {
2934 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2935 grpc_core::ExecCtx exec_ctx;
2936 init_keepalive_ping(t);
2937 });
2938 }
2939 }
2940 }
2941
finish_keepalive_ping(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2942 static void finish_keepalive_ping(
2943 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2944 grpc_error_handle error) {
2945 auto* tp = t.get();
2946 tp->combiner->Run(
2947 grpc_core::InitTransportClosure<finish_keepalive_ping_locked>(
2948 std::move(t), &tp->finish_keepalive_ping_locked),
2949 error);
2950 }
2951
finish_keepalive_ping_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)2952 static void finish_keepalive_ping_locked(
2953 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
2954 grpc_error_handle error) {
2955 if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) {
2956 if (error.ok()) {
2957 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2958 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2959 gpr_log(GPR_INFO, "%s: Finish keepalive ping",
2960 std::string(t->peer_string.as_string_view()).c_str());
2961 }
2962 t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING;
2963 GPR_ASSERT(t->keepalive_ping_timer_handle == TaskHandle::kInvalid);
2964 t->keepalive_ping_timer_handle =
2965 t->event_engine->RunAfter(t->keepalive_time, [t] {
2966 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2967 grpc_core::ExecCtx exec_ctx;
2968 init_keepalive_ping(t);
2969 });
2970 }
2971 }
2972 }
2973
maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport * t)2974 static void maybe_reset_keepalive_ping_timer_locked(grpc_chttp2_transport* t) {
2975 if (t->keepalive_ping_timer_handle != TaskHandle::kInvalid &&
2976 t->event_engine->Cancel(t->keepalive_ping_timer_handle)) {
2977 // Cancel succeeds, resets the keepalive ping timer. Note that we don't
2978 // need to Ref or Unref here since we still hold the Ref.
2979 if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace) ||
2980 GRPC_TRACE_FLAG_ENABLED(grpc_keepalive_trace)) {
2981 gpr_log(GPR_INFO, "%s: Keepalive ping cancelled. Resetting timer.",
2982 std::string(t->peer_string.as_string_view()).c_str());
2983 }
2984 t->keepalive_ping_timer_handle =
2985 t->event_engine->RunAfter(t->keepalive_time, [t = t->Ref()]() mutable {
2986 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
2987 grpc_core::ExecCtx exec_ctx;
2988 init_keepalive_ping(std::move(t));
2989 });
2990 }
2991 }
2992
2993 //
2994 // CALLBACK LOOP
2995 //
2996
connectivity_state_set(grpc_chttp2_transport * t,grpc_connectivity_state state,const absl::Status & status,const char * reason)2997 static void connectivity_state_set(grpc_chttp2_transport* t,
2998 grpc_connectivity_state state,
2999 const absl::Status& status,
3000 const char* reason) {
3001 GRPC_CHTTP2_IF_TRACING(gpr_log(
3002 GPR_INFO, "transport %p set connectivity_state=%d; status=%s; reason=%s",
3003 t, state, status.ToString().c_str(), reason));
3004 t->state_tracker.SetState(state, status, reason);
3005 }
3006
3007 //
3008 // POLLSET STUFF
3009 //
3010
SetPollset(grpc_stream *,grpc_pollset * pollset)3011 void grpc_chttp2_transport::SetPollset(grpc_stream* /*gs*/,
3012 grpc_pollset* pollset) {
3013 grpc_endpoint_add_to_pollset(ep, pollset);
3014 }
3015
SetPollsetSet(grpc_stream *,grpc_pollset_set * pollset_set)3016 void grpc_chttp2_transport::SetPollsetSet(grpc_stream* /*gs*/,
3017 grpc_pollset_set* pollset_set) {
3018 grpc_endpoint_add_to_pollset_set(ep, pollset_set);
3019 }
3020
3021 //
3022 // RESOURCE QUOTAS
3023 //
3024
post_benign_reclaimer(grpc_chttp2_transport * t)3025 static void post_benign_reclaimer(grpc_chttp2_transport* t) {
3026 if (!t->benign_reclaimer_registered) {
3027 t->benign_reclaimer_registered = true;
3028 t->memory_owner.PostReclaimer(
3029 grpc_core::ReclamationPass::kBenign,
3030 [t = t->Ref()](
3031 absl::optional<grpc_core::ReclamationSweep> sweep) mutable {
3032 if (sweep.has_value()) {
3033 auto* tp = t.get();
3034 tp->active_reclamation = std::move(*sweep);
3035 tp->combiner->Run(
3036 grpc_core::InitTransportClosure<benign_reclaimer_locked>(
3037 std::move(t), &tp->benign_reclaimer_locked),
3038 absl::OkStatus());
3039 }
3040 });
3041 }
3042 }
3043
post_destructive_reclaimer(grpc_chttp2_transport * t)3044 static void post_destructive_reclaimer(grpc_chttp2_transport* t) {
3045 if (!t->destructive_reclaimer_registered) {
3046 t->destructive_reclaimer_registered = true;
3047 t->memory_owner.PostReclaimer(
3048 grpc_core::ReclamationPass::kDestructive,
3049 [t = t->Ref()](
3050 absl::optional<grpc_core::ReclamationSweep> sweep) mutable {
3051 if (sweep.has_value()) {
3052 auto* tp = t.get();
3053 tp->active_reclamation = std::move(*sweep);
3054 tp->combiner->Run(
3055 grpc_core::InitTransportClosure<destructive_reclaimer_locked>(
3056 std::move(t), &tp->destructive_reclaimer_locked),
3057 absl::OkStatus());
3058 }
3059 });
3060 }
3061 }
3062
benign_reclaimer_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)3063 static void benign_reclaimer_locked(
3064 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
3065 grpc_error_handle error) {
3066 if (error.ok() && t->stream_map.empty()) {
3067 // Channel with no active streams: send a goaway to try and make it
3068 // disconnect cleanly
3069 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3070 gpr_log(GPR_INFO, "HTTP2: %s - send goaway to free memory",
3071 std::string(t->peer_string.as_string_view()).c_str());
3072 }
3073 send_goaway(t.get(),
3074 grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
3075 grpc_core::StatusIntProperty::kHttp2Error,
3076 GRPC_HTTP2_ENHANCE_YOUR_CALM),
3077 /*immediate_disconnect_hint=*/true);
3078 } else if (error.ok() && GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3079 gpr_log(GPR_INFO,
3080 "HTTP2: %s - skip benign reclamation, there are still %" PRIdPTR
3081 " streams",
3082 std::string(t->peer_string.as_string_view()).c_str(),
3083 t->stream_map.size());
3084 }
3085 t->benign_reclaimer_registered = false;
3086 if (error != absl::CancelledError()) {
3087 t->active_reclamation.Finish();
3088 }
3089 }
3090
destructive_reclaimer_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport> t,grpc_error_handle error)3091 static void destructive_reclaimer_locked(
3092 grpc_core::RefCountedPtr<grpc_chttp2_transport> t,
3093 grpc_error_handle error) {
3094 t->destructive_reclaimer_registered = false;
3095 if (error.ok() && !t->stream_map.empty()) {
3096 // As stream_map is a hash map, this selects effectively a random stream.
3097 grpc_chttp2_stream* s = t->stream_map.begin()->second;
3098 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
3099 gpr_log(GPR_INFO, "HTTP2: %s - abandon stream id %d",
3100 std::string(t->peer_string.as_string_view()).c_str(), s->id);
3101 }
3102 grpc_chttp2_cancel_stream(
3103 t.get(), s,
3104 grpc_error_set_int(GRPC_ERROR_CREATE("Buffers full"),
3105 grpc_core::StatusIntProperty::kHttp2Error,
3106 GRPC_HTTP2_ENHANCE_YOUR_CALM),
3107 false);
3108 if (!t->stream_map.empty()) {
3109 // Since we cancel one stream per destructive reclamation, if
3110 // there are more streams left, we can immediately post a new
3111 // reclaimer in case the resource quota needs to free more
3112 // memory
3113 post_destructive_reclaimer(t.get());
3114 }
3115 }
3116 if (error != absl::CancelledError()) {
3117 t->active_reclamation.Finish();
3118 }
3119 }
3120
3121 //
3122 // MONITORING
3123 //
3124
grpc_chttp2_initiate_write_reason_string(grpc_chttp2_initiate_write_reason reason)3125 const char* grpc_chttp2_initiate_write_reason_string(
3126 grpc_chttp2_initiate_write_reason reason) {
3127 switch (reason) {
3128 case GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE:
3129 return "INITIAL_WRITE";
3130 case GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM:
3131 return "START_NEW_STREAM";
3132 case GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE:
3133 return "SEND_MESSAGE";
3134 case GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA:
3135 return "SEND_INITIAL_METADATA";
3136 case GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA:
3137 return "SEND_TRAILING_METADATA";
3138 case GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING:
3139 return "RETRY_SEND_PING";
3140 case GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS:
3141 return "CONTINUE_PINGS";
3142 case GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT:
3143 return "GOAWAY_SENT";
3144 case GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM:
3145 return "RST_STREAM";
3146 case GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API:
3147 return "CLOSE_FROM_API";
3148 case GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL:
3149 return "STREAM_FLOW_CONTROL";
3150 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL:
3151 return "TRANSPORT_FLOW_CONTROL";
3152 case GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS:
3153 return "SEND_SETTINGS";
3154 case GRPC_CHTTP2_INITIATE_WRITE_SETTINGS_ACK:
3155 return "SETTINGS_ACK";
3156 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING:
3157 return "FLOW_CONTROL_UNSTALLED_BY_SETTING";
3158 case GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE:
3159 return "FLOW_CONTROL_UNSTALLED_BY_UPDATE";
3160 case GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING:
3161 return "APPLICATION_PING";
3162 case GRPC_CHTTP2_INITIATE_WRITE_BDP_PING:
3163 return "BDP_PING";
3164 case GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING:
3165 return "KEEPALIVE_PING";
3166 case GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED:
3167 return "TRANSPORT_FLOW_CONTROL_UNSTALLED";
3168 case GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE:
3169 return "PING_RESPONSE";
3170 case GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM:
3171 return "FORCE_RST_STREAM";
3172 }
3173 GPR_UNREACHABLE_CODE(return "unknown");
3174 }
3175
GetEndpoint()3176 grpc_endpoint* grpc_chttp2_transport::GetEndpoint() { return ep; }
3177
SizeOfStream() const3178 size_t grpc_chttp2_transport::SizeOfStream() const {
3179 return sizeof(grpc_chttp2_stream);
3180 }
3181
3182 bool grpc_chttp2_transport::
HackyDisableStreamOpBatchCoalescingInConnectedChannel() const3183 HackyDisableStreamOpBatchCoalescingInConnectedChannel() const {
3184 return false;
3185 }
3186
GetTransportName() const3187 absl::string_view grpc_chttp2_transport::GetTransportName() const {
3188 return "chttp2";
3189 }
3190
3191 grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode>
grpc_chttp2_transport_get_socket_node(grpc_core::Transport * transport)3192 grpc_chttp2_transport_get_socket_node(grpc_core::Transport* transport) {
3193 grpc_chttp2_transport* t =
3194 reinterpret_cast<grpc_chttp2_transport*>(transport);
3195 return t->channelz_socket;
3196 }
3197
grpc_create_chttp2_transport(const grpc_core::ChannelArgs & channel_args,grpc_endpoint * ep,bool is_client)3198 grpc_core::Transport* grpc_create_chttp2_transport(
3199 const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep,
3200 bool is_client) {
3201 return new grpc_chttp2_transport(channel_args, ep, is_client);
3202 }
3203
grpc_chttp2_transport_start_reading(grpc_core::Transport * transport,grpc_slice_buffer * read_buffer,grpc_closure * notify_on_receive_settings,grpc_closure * notify_on_close)3204 void grpc_chttp2_transport_start_reading(
3205 grpc_core::Transport* transport, grpc_slice_buffer* read_buffer,
3206 grpc_closure* notify_on_receive_settings, grpc_closure* notify_on_close) {
3207 auto t = reinterpret_cast<grpc_chttp2_transport*>(transport)->Ref();
3208 if (read_buffer != nullptr) {
3209 grpc_slice_buffer_move_into(read_buffer, &t->read_buffer);
3210 gpr_free(read_buffer);
3211 }
3212 auto* tp = t.get();
3213 tp->combiner->Run(
3214 grpc_core::NewClosure([t = std::move(t), notify_on_receive_settings,
3215 notify_on_close](grpc_error_handle) mutable {
3216 if (!t->closed_with_error.ok()) {
3217 if (notify_on_receive_settings != nullptr) {
3218 grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_receive_settings,
3219 t->closed_with_error);
3220 }
3221 if (notify_on_close != nullptr) {
3222 grpc_core::ExecCtx::Run(DEBUG_LOCATION, notify_on_close,
3223 t->closed_with_error);
3224 }
3225 return;
3226 }
3227 t->notify_on_receive_settings = notify_on_receive_settings;
3228 t->notify_on_close = notify_on_close;
3229 read_action_locked(std::move(t), absl::OkStatus());
3230 }),
3231 absl::OkStatus());
3232 }
3233