xref: /aosp_15_r20/external/grpc-grpc/src/core/ext/transport/chttp2/transport/internal.h (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
20 #define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <stddef.h>
25 #include <stdint.h>
26 
27 #include <atomic>
28 #include <memory>
29 #include <utility>
30 
31 #include "absl/container/flat_hash_map.h"
32 #include "absl/random/random.h"
33 #include "absl/status/status.h"
34 #include "absl/strings/string_view.h"
35 #include "absl/types/optional.h"
36 #include "absl/types/variant.h"
37 
38 #include <grpc/event_engine/event_engine.h>
39 #include <grpc/event_engine/memory_allocator.h>
40 #include <grpc/grpc.h>
41 #include <grpc/slice.h>
42 #include <grpc/support/time.h>
43 
44 #include "src/core/ext/transport/chttp2/transport/context_list_entry.h"
45 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
46 #include "src/core/ext/transport/chttp2/transport/frame_goaway.h"
47 #include "src/core/ext/transport/chttp2/transport/frame_ping.h"
48 #include "src/core/ext/transport/chttp2/transport/frame_rst_stream.h"
49 #include "src/core/ext/transport/chttp2/transport/frame_settings.h"
50 #include "src/core/ext/transport/chttp2/transport/frame_window_update.h"
51 #include "src/core/ext/transport/chttp2/transport/hpack_encoder.h"
52 #include "src/core/ext/transport/chttp2/transport/hpack_parser.h"
53 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
54 #include "src/core/ext/transport/chttp2/transport/legacy_frame.h"
55 #include "src/core/ext/transport/chttp2/transport/max_concurrent_streams_policy.h"
56 #include "src/core/ext/transport/chttp2/transport/ping_abuse_policy.h"
57 #include "src/core/ext/transport/chttp2/transport/ping_callbacks.h"
58 #include "src/core/ext/transport/chttp2/transport/ping_rate_policy.h"
59 #include "src/core/ext/transport/chttp2/transport/write_size_policy.h"
60 #include "src/core/lib/channel/call_tracer.h"
61 #include "src/core/lib/channel/channel_args.h"
62 #include "src/core/lib/channel/channelz.h"
63 #include "src/core/lib/channel/tcp_tracer.h"
64 #include "src/core/lib/debug/trace.h"
65 #include "src/core/lib/gprpp/bitset.h"
66 #include "src/core/lib/gprpp/debug_location.h"
67 #include "src/core/lib/gprpp/ref_counted.h"
68 #include "src/core/lib/gprpp/ref_counted_ptr.h"
69 #include "src/core/lib/gprpp/time.h"
70 #include "src/core/lib/iomgr/closure.h"
71 #include "src/core/lib/iomgr/combiner.h"
72 #include "src/core/lib/iomgr/endpoint.h"
73 #include "src/core/lib/iomgr/error.h"
74 #include "src/core/lib/iomgr/iomgr_fwd.h"
75 #include "src/core/lib/resource_quota/arena.h"
76 #include "src/core/lib/resource_quota/memory_quota.h"
77 #include "src/core/lib/slice/slice.h"
78 #include "src/core/lib/slice/slice_buffer.h"
79 #include "src/core/lib/surface/init_internally.h"
80 #include "src/core/lib/transport/connectivity_state.h"
81 #include "src/core/lib/transport/metadata_batch.h"
82 #include "src/core/lib/transport/transport.h"
83 
84 // Flag that this closure barrier may be covering a write in a pollset, and so
85 //   we should not complete this closure until we can prove that the write got
86 //   scheduled
87 #define CLOSURE_BARRIER_MAY_COVER_WRITE (1 << 0)
88 // First bit of the reference count, stored in the high order bits (with the low
89 //   bits being used for flags defined above)
90 #define CLOSURE_BARRIER_FIRST_REF_BIT (1 << 16)
91 
92 // streams are kept in various linked lists depending on what things need to
93 // happen to them... this enum labels each list
94 typedef enum {
95   // If a stream is in the following two lists, an explicit ref is associated
96   // with the stream
97   GRPC_CHTTP2_LIST_WRITABLE,
98   GRPC_CHTTP2_LIST_WRITING,
99   // No additional ref is taken for the following refs. Make sure to remove the
100   // stream from these lists when the stream is removed.
101   GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT,
102   GRPC_CHTTP2_LIST_STALLED_BY_STREAM,
103   /// streams that are waiting to start because there are too many concurrent
104   /// streams on the connection
105   GRPC_CHTTP2_LIST_WAITING_FOR_CONCURRENCY,
106   STREAM_LIST_COUNT  // must be last
107 } grpc_chttp2_stream_list_id;
108 
109 typedef enum {
110   GRPC_CHTTP2_WRITE_STATE_IDLE,
111   GRPC_CHTTP2_WRITE_STATE_WRITING,
112   GRPC_CHTTP2_WRITE_STATE_WRITING_WITH_MORE,
113 } grpc_chttp2_write_state;
114 
115 typedef enum {
116   GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY,
117   GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT,
118 } grpc_chttp2_optimization_target;
119 
120 typedef enum {
121   GRPC_CHTTP2_PCL_INITIATE = 0,
122   GRPC_CHTTP2_PCL_NEXT,
123   GRPC_CHTTP2_PCL_INFLIGHT,
124   GRPC_CHTTP2_PCL_COUNT  // must be last
125 } grpc_chttp2_ping_closure_list;
126 
127 typedef enum {
128   GRPC_CHTTP2_INITIATE_WRITE_INITIAL_WRITE,
129   GRPC_CHTTP2_INITIATE_WRITE_START_NEW_STREAM,
130   GRPC_CHTTP2_INITIATE_WRITE_SEND_MESSAGE,
131   GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA,
132   GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA,
133   GRPC_CHTTP2_INITIATE_WRITE_RETRY_SEND_PING,
134   GRPC_CHTTP2_INITIATE_WRITE_CONTINUE_PINGS,
135   GRPC_CHTTP2_INITIATE_WRITE_GOAWAY_SENT,
136   GRPC_CHTTP2_INITIATE_WRITE_RST_STREAM,
137   GRPC_CHTTP2_INITIATE_WRITE_CLOSE_FROM_API,
138   GRPC_CHTTP2_INITIATE_WRITE_STREAM_FLOW_CONTROL,
139   GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL,
140   GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS,
141   GRPC_CHTTP2_INITIATE_WRITE_SETTINGS_ACK,
142   GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_SETTING,
143   GRPC_CHTTP2_INITIATE_WRITE_FLOW_CONTROL_UNSTALLED_BY_UPDATE,
144   GRPC_CHTTP2_INITIATE_WRITE_APPLICATION_PING,
145   GRPC_CHTTP2_INITIATE_WRITE_BDP_PING,
146   GRPC_CHTTP2_INITIATE_WRITE_KEEPALIVE_PING,
147   GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL_UNSTALLED,
148   GRPC_CHTTP2_INITIATE_WRITE_PING_RESPONSE,
149   GRPC_CHTTP2_INITIATE_WRITE_FORCE_RST_STREAM,
150 } grpc_chttp2_initiate_write_reason;
151 
152 const char* grpc_chttp2_initiate_write_reason_string(
153     grpc_chttp2_initiate_write_reason reason);
154 
155 // deframer state for the overall http2 stream of bytes
156 typedef enum {
157   // prefix: one entry per http2 connection prefix byte
158   GRPC_DTS_CLIENT_PREFIX_0 = 0,
159   GRPC_DTS_CLIENT_PREFIX_1,
160   GRPC_DTS_CLIENT_PREFIX_2,
161   GRPC_DTS_CLIENT_PREFIX_3,
162   GRPC_DTS_CLIENT_PREFIX_4,
163   GRPC_DTS_CLIENT_PREFIX_5,
164   GRPC_DTS_CLIENT_PREFIX_6,
165   GRPC_DTS_CLIENT_PREFIX_7,
166   GRPC_DTS_CLIENT_PREFIX_8,
167   GRPC_DTS_CLIENT_PREFIX_9,
168   GRPC_DTS_CLIENT_PREFIX_10,
169   GRPC_DTS_CLIENT_PREFIX_11,
170   GRPC_DTS_CLIENT_PREFIX_12,
171   GRPC_DTS_CLIENT_PREFIX_13,
172   GRPC_DTS_CLIENT_PREFIX_14,
173   GRPC_DTS_CLIENT_PREFIX_15,
174   GRPC_DTS_CLIENT_PREFIX_16,
175   GRPC_DTS_CLIENT_PREFIX_17,
176   GRPC_DTS_CLIENT_PREFIX_18,
177   GRPC_DTS_CLIENT_PREFIX_19,
178   GRPC_DTS_CLIENT_PREFIX_20,
179   GRPC_DTS_CLIENT_PREFIX_21,
180   GRPC_DTS_CLIENT_PREFIX_22,
181   GRPC_DTS_CLIENT_PREFIX_23,
182   // frame header byte 0...
183   // must follow from the prefix states
184   GRPC_DTS_FH_0,
185   GRPC_DTS_FH_1,
186   GRPC_DTS_FH_2,
187   GRPC_DTS_FH_3,
188   GRPC_DTS_FH_4,
189   GRPC_DTS_FH_5,
190   GRPC_DTS_FH_6,
191   GRPC_DTS_FH_7,
192   // ... frame header byte 8
193   GRPC_DTS_FH_8,
194   // inside a http2 frame
195   GRPC_DTS_FRAME
196 } grpc_chttp2_deframe_transport_state;
197 
198 struct grpc_chttp2_stream_list {
199   grpc_chttp2_stream* head;
200   grpc_chttp2_stream* tail;
201 };
202 struct grpc_chttp2_stream_link {
203   grpc_chttp2_stream* next;
204   grpc_chttp2_stream* prev;
205 };
206 
207 typedef enum {
208   GRPC_CHTTP2_NO_GOAWAY_SEND,
209   GRPC_CHTTP2_GRACEFUL_GOAWAY,
210   GRPC_CHTTP2_FINAL_GOAWAY_SEND_SCHEDULED,
211   GRPC_CHTTP2_FINAL_GOAWAY_SENT,
212 } grpc_chttp2_sent_goaway_state;
213 
214 typedef struct grpc_chttp2_write_cb {
215   int64_t call_at_byte;
216   grpc_closure* closure;
217   struct grpc_chttp2_write_cb* next;
218 } grpc_chttp2_write_cb;
219 
220 typedef enum {
221   GRPC_CHTTP2_KEEPALIVE_STATE_WAITING,
222   GRPC_CHTTP2_KEEPALIVE_STATE_PINGING,
223   GRPC_CHTTP2_KEEPALIVE_STATE_DYING,
224   GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
225 } grpc_chttp2_keepalive_state;
226 
227 struct grpc_chttp2_transport final
228     : public grpc_core::Transport,
229       public grpc_core::FilterStackTransport,
230       public grpc_core::RefCounted<grpc_chttp2_transport,
231                                    grpc_core::NonPolymorphicRefCount>,
232       public grpc_core::KeepsGrpcInitialized {
233   grpc_chttp2_transport(const grpc_core::ChannelArgs& channel_args,
234                         grpc_endpoint* ep, bool is_client);
235   ~grpc_chttp2_transport() override;
236 
237   void Orphan() override;
238 
239   size_t SizeOfStream() const override;
240   bool HackyDisableStreamOpBatchCoalescingInConnectedChannel() const override;
241   void PerformStreamOp(grpc_stream* gs,
242                        grpc_transport_stream_op_batch* op) override;
243   void DestroyStream(grpc_stream* gs,
244                      grpc_closure* then_schedule_closure) override;
245 
filter_stack_transportfinal246   grpc_core::FilterStackTransport* filter_stack_transport() override {
247     return this;
248   }
client_transportfinal249   grpc_core::ClientTransport* client_transport() override { return nullptr; }
server_transportfinal250   grpc_core::ServerTransport* server_transport() override { return nullptr; }
251 
252   absl::string_view GetTransportName() const override;
253   void InitStream(grpc_stream* gs, grpc_stream_refcount* refcount,
254                   const void* server_data, grpc_core::Arena* arena) override;
255   void SetPollset(grpc_stream* stream, grpc_pollset* pollset) override;
256   void SetPollsetSet(grpc_stream* stream,
257                      grpc_pollset_set* pollset_set) override;
258   void PerformOp(grpc_transport_op* op) override;
259   grpc_endpoint* GetEndpoint() override;
260 
261   grpc_endpoint* ep;
262   grpc_core::Slice peer_string;
263 
264   grpc_core::MemoryOwner memory_owner;
265   const grpc_core::MemoryAllocator::Reservation self_reservation;
266   grpc_core::ReclamationSweep active_reclamation;
267 
268   std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine;
269   grpc_core::Combiner* combiner;
270   absl::BitGen bitgen;
271 
272   grpc_closure* notify_on_receive_settings = nullptr;
273   grpc_closure* notify_on_close = nullptr;
274 
275   /// has the upper layer closed the transport?
276   grpc_error_handle closed_with_error;
277 
278   /// various lists of streams
279   grpc_chttp2_stream_list lists[STREAM_LIST_COUNT] = {};
280 
281   /// maps stream id to grpc_chttp2_stream objects
282   absl::flat_hash_map<uint32_t, grpc_chttp2_stream*> stream_map;
283   // Count of streams that should be counted against max concurrent streams but
284   // are not in stream_map (due to tarpitting).
285   size_t extra_streams = 0;
286 
287   class RemovedStreamHandle {
288    public:
289     RemovedStreamHandle() = default;
RemovedStreamHandlefinal290     explicit RemovedStreamHandle(
291         grpc_core::RefCountedPtr<grpc_chttp2_transport> t)
292         : transport_(std::move(t)) {
293       ++transport_->extra_streams;
294     }
~RemovedStreamHandlefinal295     ~RemovedStreamHandle() {
296       if (transport_ != nullptr) {
297         --transport_->extra_streams;
298       }
299     }
300     RemovedStreamHandle(const RemovedStreamHandle&) = delete;
301     RemovedStreamHandle& operator=(const RemovedStreamHandle&) = delete;
302     RemovedStreamHandle(RemovedStreamHandle&&) = default;
303     RemovedStreamHandle& operator=(RemovedStreamHandle&&) = default;
304 
305    private:
306     grpc_core::RefCountedPtr<grpc_chttp2_transport> transport_;
307   };
308 
309   grpc_closure write_action_begin_locked;
310   grpc_closure write_action_end_locked;
311 
312   grpc_closure read_action_locked;
313 
314   /// incoming read bytes
315   grpc_slice_buffer read_buffer;
316 
317   /// address to place a newly accepted stream - set and unset by
318   /// grpc_chttp2_parsing_accept_stream; used by init_stream to
319   /// publish the accepted server stream
320   grpc_chttp2_stream** accepting_stream = nullptr;
321 
322   // accept stream callback
323   void (*accept_stream_cb)(void* user_data, grpc_core::Transport* transport,
324                            const void* server_data);
325   // registered_method_matcher_cb is called before invoking the recv initial
326   // metadata callback.
327   void (*registered_method_matcher_cb)(
328       void* user_data, grpc_core::ServerMetadata* metadata) = nullptr;
329   void* accept_stream_cb_user_data;
330 
331   /// connectivity tracking
332   grpc_core::ConnectivityStateTracker state_tracker;
333 
334   /// data to write now
335   grpc_core::SliceBuffer outbuf;
336   /// hpack encoding
337   grpc_core::HPackCompressor hpack_compressor;
338 
339   /// data to write next write
340   grpc_slice_buffer qbuf;
341 
342   size_t max_requests_per_read;
343 
344   /// Set to a grpc_error object if a goaway frame is received. By default, set
345   /// to absl::OkStatus()
346   grpc_error_handle goaway_error;
347 
348   grpc_chttp2_sent_goaway_state sent_goaway_state = GRPC_CHTTP2_NO_GOAWAY_SEND;
349 
350   /// settings values
351   grpc_core::Http2SettingsManager settings;
352 
353   grpc_event_engine::experimental::EventEngine::TaskHandle
354       settings_ack_watchdog =
355           grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
356 
357   /// what is the next stream id to be allocated by this peer?
358   /// copied to next_stream_id in parsing when parsing commences
359   uint32_t next_stream_id = 0;
360 
361   /// last new stream id
362   uint32_t last_new_stream_id = 0;
363 
364   /// Number of incoming streams allowed before a settings ACK is required
365   uint32_t num_incoming_streams_before_settings_ack = 0;
366 
367   /// ping queues for various ping insertion points
368   grpc_core::Chttp2PingAbusePolicy ping_abuse_policy;
369   grpc_core::Chttp2PingRatePolicy ping_rate_policy;
370   grpc_core::Chttp2PingCallbacks ping_callbacks;
371   grpc_event_engine::experimental::EventEngine::TaskHandle
372       delayed_ping_timer_handle =
373           grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
374   grpc_closure retry_initiate_ping_locked;
375 
376   grpc_core::Chttp2MaxConcurrentStreamsPolicy max_concurrent_streams_policy;
377 
378   /// ping acks
379   size_t ping_ack_count = 0;
380   size_t ping_ack_capacity = 0;
381   uint64_t* ping_acks = nullptr;
382 
383   /// parser for headers
384   grpc_core::HPackParser hpack_parser;
385   /// simple one shot parsers
386   union {
387     grpc_chttp2_window_update_parser window_update;
388     grpc_chttp2_settings_parser settings;
389     grpc_chttp2_ping_parser ping;
390     grpc_chttp2_rst_stream_parser rst_stream;
391   } simple;
392   /// parser for goaway frames
393   grpc_chttp2_goaway_parser goaway_parser;
394 
395   grpc_core::chttp2::TransportFlowControl flow_control;
396   /// initial window change. This is tracked as we parse settings frames from
397   /// the remote peer. If there is a positive delta, then we will make all
398   /// streams readable since they may have become unstalled
399   int64_t initial_window_update = 0;
400 
401   // deframing
402   grpc_chttp2_deframe_transport_state deframe_state = GRPC_DTS_CLIENT_PREFIX_0;
403   uint8_t incoming_frame_type = 0;
404   uint8_t incoming_frame_flags = 0;
405   uint8_t header_eof = 0;
406   bool is_first_frame = true;
407   uint32_t expect_continuation_stream_id = 0;
408   uint32_t incoming_frame_size = 0;
409 
410   int min_tarpit_duration_ms;
411   int max_tarpit_duration_ms;
412   bool allow_tarpit;
413 
414   grpc_chttp2_stream* incoming_stream = nullptr;
415   // active parser
416   struct Parser {
417     const char* name;
418     grpc_error_handle (*parser)(void* parser_user_data,
419                                 grpc_chttp2_transport* t, grpc_chttp2_stream* s,
420                                 const grpc_slice& slice, int is_last);
421     void* user_data = nullptr;
422   };
423   Parser parser;
424 
425   grpc_chttp2_write_cb* write_cb_pool = nullptr;
426 
427   // bdp estimator
428   grpc_closure next_bdp_ping_timer_expired_locked;
429   grpc_closure start_bdp_ping_locked;
430   grpc_closure finish_bdp_ping_locked;
431 
432   // if non-NULL, close the transport with this error when writes are finished
433   grpc_error_handle close_transport_on_writes_finished;
434 
435   // a list of closures to run after writes are finished
436   grpc_closure_list run_after_write = GRPC_CLOSURE_LIST_INIT;
437 
438   // buffer pool state
439   /// benign cleanup closure
440   grpc_closure benign_reclaimer_locked;
441   /// destructive cleanup closure
442   grpc_closure destructive_reclaimer_locked;
443 
444   // next bdp ping timer handle
445   grpc_event_engine::experimental::EventEngine::TaskHandle
446       next_bdp_ping_timer_handle =
447           grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
448 
449   // keep-alive ping support
450   /// Closure to initialize a keepalive ping
451   grpc_closure init_keepalive_ping_locked;
452   /// Closure to run when the keepalive ping ack is received
453   grpc_closure finish_keepalive_ping_locked;
454   /// timer to initiate ping events
455   grpc_event_engine::experimental::EventEngine::TaskHandle
456       keepalive_ping_timer_handle =
457           grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
458   ;
459   /// time duration in between pings
460   grpc_core::Duration keepalive_time;
461   /// grace period to wait for data after sending a ping before keepalives
462   /// timeout
463   grpc_core::Duration keepalive_timeout;
464   /// number of stream objects currently allocated by this transport
465   std::atomic<size_t> streams_allocated{0};
466   /// keep-alive state machine state
467   grpc_chttp2_keepalive_state keepalive_state;
468   // Soft limit on max header size.
469   uint32_t max_header_list_size_soft_limit = 0;
470   grpc_core::ContextList* cl = nullptr;
471   grpc_core::RefCountedPtr<grpc_core::channelz::SocketNode> channelz_socket;
472   uint32_t num_messages_in_next_write = 0;
473   /// The number of pending induced frames (SETTINGS_ACK, PINGS_ACK and
474   /// RST_STREAM) in the outgoing buffer (t->qbuf). If this number goes beyond
475   /// DEFAULT_MAX_PENDING_INDUCED_FRAMES, we pause reading new frames. We would
476   /// only continue reading when we are able to write to the socket again,
477   /// thereby reducing the number of induced frames.
478   uint32_t num_pending_induced_frames = 0;
479   uint32_t incoming_stream_id = 0;
480 
481   /// grace period after sending a ping to wait for the ping ack
482   grpc_core::Duration ping_timeout;
483   grpc_event_engine::experimental::EventEngine::TaskHandle
484       keepalive_ping_timeout_handle =
485           grpc_event_engine::experimental::EventEngine::TaskHandle::kInvalid;
486   /// grace period before settings timeout expires
487   grpc_core::Duration settings_timeout;
488 
489   /// how much data are we willing to buffer when the WRITE_BUFFER_HINT is set?
490   uint32_t write_buffer_size = grpc_core::chttp2::kDefaultWindow;
491 
492   /// write execution state of the transport
493   grpc_chttp2_write_state write_state = GRPC_CHTTP2_WRITE_STATE_IDLE;
494 
495   /// policy for how much data we're willing to put into one http2 write
496   grpc_core::Chttp2WriteSizePolicy write_size_policy;
497 
498   bool reading_paused_on_pending_induced_frames = false;
499   /// Based on channel args, preferred_rx_crypto_frame_sizes are advertised to
500   /// the peer
501   bool enable_preferred_rx_crypto_frame_advertisement = false;
502   /// Set to non zero if closures associated with the transport may be
503   /// covering a write in a pollset. Such closures cannot be scheduled until
504   /// we can prove that the write got scheduled.
505   uint8_t closure_barrier_may_cover_write = CLOSURE_BARRIER_MAY_COVER_WRITE;
506 
507   /// have we scheduled a benign cleanup?
508   bool benign_reclaimer_registered = false;
509   /// have we scheduled a destructive cleanup?
510   bool destructive_reclaimer_registered = false;
511 
512   /// if keepalive pings are allowed when there's no outstanding streams
513   bool keepalive_permit_without_calls = false;
514 
515   // bdp estimator
516   bool bdp_ping_blocked =
517       false;  // Is the BDP blocked due to not receiving any data?
518 
519   /// is the transport destroying itself?
520   uint8_t destroying = false;
521 
522   /// is this a client?
523   bool is_client;
524 
525   /// If start_bdp_ping_locked has been called
526   bool bdp_ping_started = false;
527   // True if pings should be acked
528   bool ack_pings = true;
529   /// True if the keepalive system wants to see some data incoming
530   bool keepalive_incoming_data_wanted = false;
531   /// True if we count stream allocation (instead of HTTP2 concurrency) for
532   /// MAX_CONCURRENT_STREAMS
533   bool max_concurrent_streams_overload_protection = false;
534 
535   // What percentage of rst_stream frames on the server should cause a ping
536   // frame to be generated.
537   uint8_t ping_on_rst_stream_percent;
538 };
539 
540 typedef enum {
541   GRPC_METADATA_NOT_PUBLISHED,
542   GRPC_METADATA_SYNTHESIZED_FROM_FAKE,
543   GRPC_METADATA_PUBLISHED_FROM_WIRE,
544   GRPC_METADATA_PUBLISHED_AT_CLOSE
545 } grpc_published_metadata_method;
546 
547 struct grpc_chttp2_stream {
548   grpc_chttp2_stream(grpc_chttp2_transport* t, grpc_stream_refcount* refcount,
549                      const void* server_data);
550   ~grpc_chttp2_stream();
551 
552   void* context = nullptr;
553   const grpc_core::RefCountedPtr<grpc_chttp2_transport> t;
554   grpc_stream_refcount* refcount;
555 
556   grpc_closure destroy_stream;
557   grpc_closure* destroy_stream_arg;
558 
559   grpc_chttp2_stream_link links[STREAM_LIST_COUNT];
560 
561   /// HTTP2 stream id for this stream, or zero if one has not been assigned
562   uint32_t id = 0;
563 
564   /// things the upper layers would like to send
565   grpc_metadata_batch* send_initial_metadata = nullptr;
566   grpc_closure* send_initial_metadata_finished = nullptr;
567   grpc_metadata_batch* send_trailing_metadata = nullptr;
568   // TODO(yashykt): Find a better name for the below field and others in this
569   //                struct to betteer distinguish inputs, return values, and
570   //                internal state.
571   // sent_trailing_metadata_op allows the transport to fill in to the upper
572   // layer whether this stream was able to send its trailing metadata (used for
573   // detecting cancellation on the server-side)..
574   bool* sent_trailing_metadata_op = nullptr;
575   grpc_closure* send_trailing_metadata_finished = nullptr;
576 
577   int64_t next_message_end_offset;
578   int64_t flow_controlled_bytes_written = 0;
579   int64_t flow_controlled_bytes_flowed = 0;
580   grpc_closure* send_message_finished = nullptr;
581 
582   grpc_metadata_batch* recv_initial_metadata;
583   grpc_closure* recv_initial_metadata_ready = nullptr;
584   bool* trailing_metadata_available = nullptr;
585   absl::optional<grpc_core::SliceBuffer>* recv_message = nullptr;
586   uint32_t* recv_message_flags = nullptr;
587   bool* call_failed_before_recv_message = nullptr;
588   grpc_closure* recv_message_ready = nullptr;
589   grpc_metadata_batch* recv_trailing_metadata;
590   grpc_closure* recv_trailing_metadata_finished = nullptr;
591 
592   grpc_transport_stream_stats* collecting_stats = nullptr;
593   grpc_transport_stream_stats stats = grpc_transport_stream_stats();
594 
595   /// Is this stream closed for writing.
596   bool write_closed = false;
597   /// Is this stream reading half-closed.
598   bool read_closed = false;
599   /// Are all published incoming byte streams closed.
600   bool all_incoming_byte_streams_finished = false;
601   /// Has this stream seen an error.
602   /// If true, then pending incoming frames can be thrown away.
603   bool seen_error = false;
604   /// Are we buffering writes on this stream? If yes, we won't become writable
605   /// until there's enough queued up in the flow_controlled_buffer
606   bool write_buffering = false;
607 
608   // have we sent or received the EOS bit?
609   bool eos_received = false;
610   bool eos_sent = false;
611 
612   grpc_core::BitSet<STREAM_LIST_COUNT> included;
613 
614   /// the error that resulted in this stream being read-closed
615   grpc_error_handle read_closed_error;
616   /// the error that resulted in this stream being write-closed
617   grpc_error_handle write_closed_error;
618 
619   grpc_published_metadata_method published_metadata[2] = {};
620 
621   grpc_metadata_batch initial_metadata_buffer;
622   grpc_metadata_batch trailing_metadata_buffer;
623 
624   grpc_slice_buffer frame_storage;  // protected by t combiner
625 
626   grpc_core::Timestamp deadline = grpc_core::Timestamp::InfFuture();
627 
628   /// number of bytes received - reset at end of parse thread execution
629   int64_t received_bytes = 0;
630 
631   grpc_core::chttp2::StreamFlowControl flow_control;
632 
633   grpc_slice_buffer flow_controlled_buffer;
634 
635   grpc_chttp2_write_cb* on_flow_controlled_cbs = nullptr;
636   grpc_chttp2_write_cb* on_write_finished_cbs = nullptr;
637   grpc_chttp2_write_cb* finish_after_write = nullptr;
638   size_t sending_bytes = 0;
639 
640   /// Byte counter for number of bytes written
641   size_t byte_counter = 0;
642 
643   /// Number of times written
644   int64_t write_counter = 0;
645 
646   /// Only set when enabled.
647   grpc_core::CallTracerInterface* call_tracer = nullptr;
648 
649   /// Only set when enabled.
650   std::shared_ptr<grpc_core::TcpTracerInterface> tcp_tracer;
651 
652   // time this stream was created
653   gpr_timespec creation_time = gpr_now(GPR_CLOCK_MONOTONIC);
654 
655   bool parsed_trailers_only = false;
656 
657   bool final_metadata_requested = false;
658   bool received_last_frame = false;  // protected by t combiner
659 
660   /// how many header frames have we received?
661   uint8_t header_frames_received = 0;
662 
663   bool sent_initial_metadata = false;
664   bool sent_trailing_metadata = false;
665 
666   /// Whether the bytes needs to be traced using Fathom
667   bool traced = false;
668 };
669 
670 #define GRPC_ARG_PING_TIMEOUT_MS "grpc.http2.ping_timeout_ms"
671 
672 // EXPERIMENTAL: provide protection against overloading a server with too many
673 // requests: wait for streams to be deallocated before they stop counting
674 // against MAX_CONCURRENT_STREAMS
675 #define GRPC_ARG_MAX_CONCURRENT_STREAMS_OVERLOAD_PROTECTION \
676   "grpc.http.overload_protection"
677 
678 /// Transport writing call flow:
679 /// grpc_chttp2_initiate_write() is called anywhere that we know bytes need to
680 /// go out on the wire.
681 /// If no other write has been started, a task is enqueued onto our workqueue.
682 /// When that task executes, it obtains the global lock, and gathers the data
683 /// to write.
684 /// The global lock is dropped and we do the syscall to write.
685 /// After writing, a follow-up check is made to see if another round of writing
686 /// should be performed.
687 
688 /// The actual call chain is documented in the implementation of this function.
689 ///
690 void grpc_chttp2_initiate_write(grpc_chttp2_transport* t,
691                                 grpc_chttp2_initiate_write_reason reason);
692 
693 struct grpc_chttp2_begin_write_result {
694   /// are we writing?
695   bool writing;
696   /// if writing: was it a complete flush (false) or a partial flush (true)
697   bool partial;
698   /// did we queue any completions as part of beginning the write
699   bool early_results_scheduled;
700 };
701 grpc_chttp2_begin_write_result grpc_chttp2_begin_write(
702     grpc_chttp2_transport* t);
703 void grpc_chttp2_end_write(grpc_chttp2_transport* t, grpc_error_handle error);
704 
705 /// Process one slice of incoming data
706 /// Returns:
707 ///  - a count of parsed bytes in the event of a partial read: the caller should
708 ///    offload responsibilities to another thread to continue parsing.
709 ///  - or a status in the case of a completed read
710 absl::variant<size_t, absl::Status> grpc_chttp2_perform_read(
711     grpc_chttp2_transport* t, const grpc_slice& slice,
712     size_t& requests_started);
713 
714 bool grpc_chttp2_list_add_writable_stream(grpc_chttp2_transport* t,
715                                           grpc_chttp2_stream* s);
716 /// Get a writable stream
717 /// returns non-zero if there was a stream available
718 bool grpc_chttp2_list_pop_writable_stream(grpc_chttp2_transport* t,
719                                           grpc_chttp2_stream** s);
720 bool grpc_chttp2_list_remove_writable_stream(grpc_chttp2_transport* t,
721                                              grpc_chttp2_stream* s);
722 
723 bool grpc_chttp2_list_add_writing_stream(grpc_chttp2_transport* t,
724                                          grpc_chttp2_stream* s);
725 bool grpc_chttp2_list_have_writing_streams(grpc_chttp2_transport* t);
726 bool grpc_chttp2_list_pop_writing_stream(grpc_chttp2_transport* t,
727                                          grpc_chttp2_stream** s);
728 
729 void grpc_chttp2_list_add_written_stream(grpc_chttp2_transport* t,
730                                          grpc_chttp2_stream* s);
731 bool grpc_chttp2_list_pop_written_stream(grpc_chttp2_transport* t,
732                                          grpc_chttp2_stream** s);
733 
734 void grpc_chttp2_list_add_waiting_for_concurrency(grpc_chttp2_transport* t,
735                                                   grpc_chttp2_stream* s);
736 bool grpc_chttp2_list_pop_waiting_for_concurrency(grpc_chttp2_transport* t,
737                                                   grpc_chttp2_stream** s);
738 void grpc_chttp2_list_remove_waiting_for_concurrency(grpc_chttp2_transport* t,
739                                                      grpc_chttp2_stream* s);
740 
741 void grpc_chttp2_list_add_stalled_by_transport(grpc_chttp2_transport* t,
742                                                grpc_chttp2_stream* s);
743 bool grpc_chttp2_list_pop_stalled_by_transport(grpc_chttp2_transport* t,
744                                                grpc_chttp2_stream** s);
745 void grpc_chttp2_list_remove_stalled_by_transport(grpc_chttp2_transport* t,
746                                                   grpc_chttp2_stream* s);
747 
748 void grpc_chttp2_list_add_stalled_by_stream(grpc_chttp2_transport* t,
749                                             grpc_chttp2_stream* s);
750 bool grpc_chttp2_list_pop_stalled_by_stream(grpc_chttp2_transport* t,
751                                             grpc_chttp2_stream** s);
752 bool grpc_chttp2_list_remove_stalled_by_stream(grpc_chttp2_transport* t,
753                                                grpc_chttp2_stream* s);
754 
755 //******** Flow Control **************
756 
757 // Takes in a flow control action and performs all the needed operations.
758 void grpc_chttp2_act_on_flowctl_action(
759     const grpc_core::chttp2::FlowControlAction& action,
760     grpc_chttp2_transport* t, grpc_chttp2_stream* s);
761 
762 //******** End of Flow Control **************
763 
grpc_chttp2_parsing_lookup_stream(grpc_chttp2_transport * t,uint32_t id)764 inline grpc_chttp2_stream* grpc_chttp2_parsing_lookup_stream(
765     grpc_chttp2_transport* t, uint32_t id) {
766   auto it = t->stream_map.find(id);
767   if (it == t->stream_map.end()) return nullptr;
768   return it->second;
769 }
770 grpc_chttp2_stream* grpc_chttp2_parsing_accept_stream(grpc_chttp2_transport* t,
771                                                       uint32_t id);
772 
773 void grpc_chttp2_add_incoming_goaway(grpc_chttp2_transport* t,
774                                      uint32_t goaway_error,
775                                      uint32_t last_stream_id,
776                                      absl::string_view goaway_text);
777 
778 void grpc_chttp2_parsing_become_skip_parser(grpc_chttp2_transport* t);
779 
780 void grpc_chttp2_complete_closure_step(grpc_chttp2_transport* t,
781                                        grpc_closure** pclosure,
782                                        grpc_error_handle error,
783                                        const char* desc,
784                                        grpc_core::DebugLocation whence = {});
785 
786 void grpc_chttp2_keepalive_timeout(
787     grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
788 void grpc_chttp2_ping_timeout(
789     grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
790 
791 void grpc_chttp2_settings_timeout(
792     grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
793 
794 #define GRPC_HEADER_SIZE_IN_BYTES 5
795 #define MAX_SIZE_T (~(size_t)0)
796 
797 #define GRPC_CHTTP2_CLIENT_CONNECT_STRING "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
798 #define GRPC_CHTTP2_CLIENT_CONNECT_STRLEN \
799   (sizeof(GRPC_CHTTP2_CLIENT_CONNECT_STRING) - 1)
800 
801 // extern grpc_core::TraceFlag grpc_flowctl_trace;
802 
803 #define GRPC_CHTTP2_IF_TRACING(stmt)                \
804   do {                                              \
805     if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) { \
806       (stmt);                                       \
807     }                                               \
808   } while (0)
809 
810 void grpc_chttp2_fake_status(grpc_chttp2_transport* t,
811                              grpc_chttp2_stream* stream,
812                              grpc_error_handle error);
813 grpc_chttp2_transport::RemovedStreamHandle grpc_chttp2_mark_stream_closed(
814     grpc_chttp2_transport* t, grpc_chttp2_stream* s, int close_reads,
815     int close_writes, grpc_error_handle error);
816 void grpc_chttp2_start_writing(grpc_chttp2_transport* t);
817 
818 #ifndef NDEBUG
819 #define GRPC_CHTTP2_STREAM_REF(stream, reason) \
820   grpc_chttp2_stream_ref(stream, reason)
821 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
822   grpc_chttp2_stream_unref(stream, reason)
823 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s, const char* reason);
824 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s, const char* reason);
825 #else
826 #define GRPC_CHTTP2_STREAM_REF(stream, reason) grpc_chttp2_stream_ref(stream)
827 #define GRPC_CHTTP2_STREAM_UNREF(stream, reason) \
828   grpc_chttp2_stream_unref(stream)
829 void grpc_chttp2_stream_ref(grpc_chttp2_stream* s);
830 void grpc_chttp2_stream_unref(grpc_chttp2_stream* s);
831 #endif
832 
833 void grpc_chttp2_ack_ping(grpc_chttp2_transport* t, uint64_t id);
834 
835 /// Sends GOAWAY with error code ENHANCE_YOUR_CALM and additional debug data
836 /// resembling "too_many_pings" followed by immediately closing the connection.
837 void grpc_chttp2_exceeded_ping_strikes(grpc_chttp2_transport* t);
838 
839 /// Resets ping clock. Should be called when flushing window updates,
840 /// initial/trailing metadata or data frames. For a server, it resets the number
841 /// of ping strikes and the last_ping_recv_time. For a ping sender, it resets
842 /// pings_before_data_required.
843 void grpc_chttp2_reset_ping_clock(grpc_chttp2_transport* t);
844 
845 /// add a ref to the stream and add it to the writable list;
846 /// ref will be dropped in writing.c
847 void grpc_chttp2_mark_stream_writable(grpc_chttp2_transport* t,
848                                       grpc_chttp2_stream* s);
849 
850 void grpc_chttp2_cancel_stream(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
851                                grpc_error_handle due_to_error, bool tarpit);
852 
853 void grpc_chttp2_maybe_complete_recv_initial_metadata(grpc_chttp2_transport* t,
854                                                       grpc_chttp2_stream* s);
855 void grpc_chttp2_maybe_complete_recv_message(grpc_chttp2_transport* t,
856                                              grpc_chttp2_stream* s);
857 void grpc_chttp2_maybe_complete_recv_trailing_metadata(grpc_chttp2_transport* t,
858                                                        grpc_chttp2_stream* s);
859 
860 void grpc_chttp2_fail_pending_writes(grpc_chttp2_transport* t,
861                                      grpc_chttp2_stream* s,
862                                      grpc_error_handle error);
863 
864 /// Set the default keepalive configurations, must only be called at
865 /// initialization
866 void grpc_chttp2_config_default_keepalive_args(grpc_channel_args* args,
867                                                bool is_client);
868 void grpc_chttp2_config_default_keepalive_args(
869     const grpc_core::ChannelArgs& channel_args, bool is_client);
870 
871 void grpc_chttp2_retry_initiate_ping(
872     grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
873 
874 void schedule_bdp_ping_locked(
875     grpc_core::RefCountedPtr<grpc_chttp2_transport> t);
876 
877 uint32_t grpc_chttp2_min_read_progress_size(grpc_chttp2_transport* t);
878 
879 #endif  // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H
880