1 // Copyright 2024 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_TRANSPORT_CALL_FILTERS_H 16 #define GRPC_SRC_CORE_LIB_TRANSPORT_CALL_FILTERS_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <cstdint> 21 #include <memory> 22 #include <type_traits> 23 24 #include "src/core/lib/gprpp/ref_counted.h" 25 #include "src/core/lib/gprpp/ref_counted_ptr.h" 26 #include "src/core/lib/promise/latch.h" 27 #include "src/core/lib/promise/map.h" 28 #include "src/core/lib/promise/promise.h" 29 #include "src/core/lib/promise/status_flag.h" 30 #include "src/core/lib/transport/call_final_info.h" 31 #include "src/core/lib/transport/message.h" 32 #include "src/core/lib/transport/metadata.h" 33 34 // CallFilters tracks a list of filters that are attached to a call. 35 // At a high level, a filter (for the purposes of this module) is a class 36 // that has a Call member class, and a set of methods that are called 37 // for each major event in the lifetime of a call. 38 // 39 // The Call member class must have the following members: 40 // - OnClientInitialMetadata - $VALUE_TYPE = ClientMetadata 41 // - OnServerInitialMetadata - $VALUE_TYPE = ServerMetadata 42 // - OnServerToClientMessage - $VALUE_TYPE = Message 43 // - OnClientToServerMessage - $VALUE_TYPE = Message 44 // - OnServerTrailingMetadata - $VALUE_TYPE = ServerMetadata 45 // - OnFinalize - special, see below 46 // These members define an interception point for a particular event in 47 // the call lifecycle. 48 // 49 // The type of these members matters, and is selectable by the class 50 // author. For $INTERCEPTOR_NAME in the above list: 51 // - static const NoInterceptor $INTERCEPTOR_NAME: 52 // defines that this filter does not intercept this event. 53 // there is zero runtime cost added to handling that event by this filter. 54 // - void $INTERCEPTOR_NAME($VALUE_TYPE&): 55 // the filter intercepts this event, and can modify the value. 56 // it never fails. 57 // - absl::Status $INTERCEPTOR_NAME($VALUE_TYPE&): 58 // the filter intercepts this event, and can modify the value. 59 // it can fail, in which case the call will be aborted. 60 // - ServerMetadataHandle $INTERCEPTOR_NAME($VALUE_TYPE&) 61 // the filter intercepts this event, and can modify the value. 62 // the filter can return nullptr for success, or a metadata handle for 63 // failure (in which case the call will be aborted). 64 // useful for cases where the exact metadata returned needs to be customized. 65 // - void $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*): 66 // the filter intercepts this event, and can modify the value. 67 // it can access the channel via the second argument. 68 // it never fails. 69 // - absl::Status $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*): 70 // the filter intercepts this event, and can modify the value. 71 // it can access the channel via the second argument. 72 // it can fail, in which case the call will be aborted. 73 // - ServerMetadataHandle $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*) 74 // the filter intercepts this event, and can modify the value. 75 // it can access the channel via the second argument. 76 // the filter can return nullptr for success, or a metadata handle for 77 // failure (in which case the call will be aborted). 78 // useful for cases where the exact metadata returned needs to be customized. 79 // It's also acceptable to return a promise that resolves to the 80 // relevant return type listed above. 81 // 82 // Finally, OnFinalize is added to intecept call finalization. 83 // It must have one of the signatures: 84 // - static const NoInterceptor OnFinalize: 85 // the filter does not intercept call finalization. 86 // - void OnFinalize(const grpc_call_final_info*): 87 // the filter intercepts call finalization. 88 // - void OnFinalize(const grpc_call_final_info*, FilterType*): 89 // the filter intercepts call finalization. 90 // 91 // The constructor of the Call object can either take a pointer to the channel 92 // object, or not take any arguments. 93 // 94 // *THIS MODULE* holds no opinion on what members the channel part of the 95 // filter should or should not have, but does require that it have a stable 96 // pointer for the lifetime of a call (ownership is expected to happen 97 // elsewhere). 98 99 namespace grpc_core { 100 101 // Tag type to indicate no interception. 102 // This is used to indicate that a filter does not intercept a particular 103 // event. 104 // In C++14 we declare these as (for example): 105 // static const NoInterceptor OnClientInitialMetadata; 106 // and out-of-line provide the definition: 107 // const MyFilter::Call::NoInterceptor 108 // MyFilter::Call::OnClientInitialMetadata; 109 // In C++17 and later we can use inline variables instead: 110 // inline static const NoInterceptor OnClientInitialMetadata; 111 struct NoInterceptor {}; 112 113 namespace filters_detail { 114 115 // One call filter constructor 116 // Contains enough information to allocate and initialize the 117 // call data for one filter. 118 struct FilterConstructor { 119 // Pointer to corresponding channel data for this filter 120 void* channel_data; 121 // Offset of the call data for this filter within the call data memory 122 // allocation 123 size_t call_offset; 124 // Initialize the call data for this filter 125 void (*call_init)(void* call_data, void* channel_data); 126 }; 127 128 // One call filter destructor 129 struct FilterDestructor { 130 // Offset of the call data for this filter within the call data memory 131 // allocation 132 size_t call_offset; 133 // Destroy the call data for this filter 134 void (*call_destroy)(void* call_data); 135 }; 136 137 template <typename FilterType, typename = void> 138 struct CallConstructor { ConstructCallConstructor139 static void Construct(void* call_data, FilterType*) { 140 new (call_data) typename FilterType::Call(); 141 } 142 }; 143 144 template <typename FilterType> 145 struct CallConstructor<FilterType, 146 absl::void_t<decltype(typename FilterType::Call( 147 static_cast<FilterType*>(nullptr)))>> { 148 static void Construct(void* call_data, FilterType* channel) { 149 new (call_data) typename FilterType::Call(channel); 150 } 151 }; 152 153 // Result of a filter operation 154 // Can be either ok (if ok is non-null) or an error. 155 // Only one pointer can be set. 156 template <typename T> 157 struct ResultOr { 158 T ok; 159 ServerMetadataHandle error; 160 }; 161 162 // One filter operation metadata 163 // Given a value of type V, produces a promise of type R. 164 template <typename R, typename V> 165 struct Operator { 166 using Result = R; 167 using Arg = V; 168 // Pointer to corresponding channel data for this filter 169 void* channel_data; 170 // Offset of the call data for this filter within the call data memory 171 size_t call_offset; 172 // Initialize the promise data for this filter, and poll once. 173 // Return the result of the poll. 174 // If the promise finishes, also destroy the promise data! 175 Poll<R> (*promise_init)(void* promise_data, void* call_data, 176 void* channel_data, V value); 177 // Poll the promise data for this filter. 178 // If the promise finishes, also destroy the promise data! 179 // Note that if the promise always finishes on the first poll, then supplying 180 // this method is unnecessary (as it will never be called). 181 Poll<R> (*poll)(void* promise_data); 182 // Destroy the promise data for this filter for an in-progress operation 183 // before the promise finishes. 184 // Note that if the promise always finishes on the first poll, then supplying 185 // this method is unnecessary (as it will never be called). 186 void (*early_destroy)(void* promise_data); 187 }; 188 189 // We divide operations into fallible and infallible. 190 // Fallible operations can fail, and that failure terminates the call. 191 // Infallible operations cannot fail. 192 // Fallible operations are used for client initial, and server initial metadata, 193 // and messages. 194 // Infallible operations are used for server trailing metadata. 195 // (This is because server trailing metadata occurs when the call is finished - 196 // and so we couldn't possibly become more finished - and also because it's the 197 // preferred representation of failure anyway!) 198 199 // An operation that could fail: takes a T argument, produces a ResultOr<T> 200 template <typename T> 201 using FallibleOperator = Operator<ResultOr<T>, T>; 202 // And one that cannot: takes a T argument, produces a T 203 template <typename T> 204 using InfallibleOperator = Operator<T, T>; 205 206 // One call finalizer 207 struct Finalizer { 208 void* channel_data; 209 size_t call_offset; 210 void (*final)(void* call_data, void* channel_data, 211 const grpc_call_final_info* final_info); 212 }; 213 214 // A layout of operations for a given filter stack 215 // This includes which operations, how much memory is required, what alignment. 216 template <typename Op> 217 struct Layout { 218 size_t promise_size = 0; 219 size_t promise_alignment = 0; 220 std::vector<Op> ops; 221 222 void Add(size_t filter_promise_size, size_t filter_promise_alignment, Op op) { 223 promise_size = std::max(promise_size, filter_promise_size); 224 promise_alignment = std::max(promise_alignment, filter_promise_alignment); 225 ops.push_back(op); 226 } 227 228 void Reverse() { std::reverse(ops.begin(), ops.end()); } 229 }; 230 231 // AddOp and friends 232 // These are helpers to wrap a member function on a class into an operation 233 // and attach it to a layout. 234 // There's a generic wrapper function `AddOp` for each of fallible and 235 // infallible operations. 236 // There are then specializations of AddOpImpl for each kind of member function 237 // an operation could have. 238 // Each specialization has an `Add` member function for the kinds of operations 239 // it supports: some only support fallible, some only support infallible, some 240 // support both. 241 242 template <typename FilterType, typename T, typename FunctionImpl, 243 FunctionImpl impl, typename SfinaeVoid = void> 244 struct AddOpImpl; 245 246 template <typename FunctionImpl, FunctionImpl impl, typename FilterType, 247 typename T> 248 void AddOp(FilterType* channel_data, size_t call_offset, 249 Layout<FallibleOperator<T>>& to) { 250 AddOpImpl<FilterType, T, FunctionImpl, impl>::Add(channel_data, call_offset, 251 to); 252 } 253 254 template <typename FunctionImpl, FunctionImpl impl, typename FilterType, 255 typename T> 256 void AddOp(FilterType* channel_data, size_t call_offset, 257 Layout<InfallibleOperator<T>>& to) { 258 AddOpImpl<FilterType, T, FunctionImpl, impl>::Add(channel_data, call_offset, 259 to); 260 } 261 262 // const NoInterceptor $EVENT 263 // These do nothing, and specifically DO NOT add an operation to the layout. 264 // Supported for fallible & infallible operations. 265 template <typename FilterType, typename T, const NoInterceptor* which> 266 struct AddOpImpl<FilterType, T, const NoInterceptor*, which> { 267 static void Add(FilterType*, size_t, Layout<FallibleOperator<T>>&) {} 268 static void Add(FilterType*, size_t, Layout<InfallibleOperator<T>>&) {} 269 }; 270 271 // void $INTERCEPTOR_NAME($VALUE_TYPE&) 272 template <typename FilterType, typename T, 273 void (FilterType::Call::*impl)(typename T::element_type&)> 274 struct AddOpImpl<FilterType, T, 275 void (FilterType::Call::*)(typename T::element_type&), impl> { 276 static void Add(FilterType* channel_data, size_t call_offset, 277 Layout<FallibleOperator<T>>& to) { 278 to.Add(0, 0, 279 FallibleOperator<T>{ 280 channel_data, 281 call_offset, 282 [](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> { 283 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 284 *value); 285 return ResultOr<T>{std::move(value), nullptr}; 286 }, 287 nullptr, 288 nullptr, 289 }); 290 } 291 static void Add(FilterType* channel_data, size_t call_offset, 292 Layout<InfallibleOperator<T>>& to) { 293 to.Add(0, 0, 294 InfallibleOperator<T>{ 295 channel_data, 296 call_offset, 297 [](void*, void* call_data, void*, T value) -> Poll<T> { 298 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 299 *value); 300 return std::move(value); 301 }, 302 nullptr, 303 nullptr, 304 }); 305 } 306 }; 307 308 // void $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*) 309 template <typename FilterType, typename T, 310 void (FilterType::Call::*impl)(typename T::element_type&, 311 FilterType*)> 312 struct AddOpImpl< 313 FilterType, T, 314 void (FilterType::Call::*)(typename T::element_type&, FilterType*), impl> { 315 static void Add(FilterType* channel_data, size_t call_offset, 316 Layout<FallibleOperator<T>>& to) { 317 to.Add(0, 0, 318 FallibleOperator<T>{ 319 channel_data, 320 call_offset, 321 [](void*, void* call_data, void* channel_data, 322 T value) -> Poll<ResultOr<T>> { 323 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 324 *value, static_cast<FilterType*>(channel_data)); 325 return ResultOr<T>{std::move(value), nullptr}; 326 }, 327 nullptr, 328 nullptr, 329 }); 330 } 331 static void Add(FilterType* channel_data, size_t call_offset, 332 Layout<InfallibleOperator<T>>& to) { 333 to.Add( 334 0, 0, 335 InfallibleOperator<T>{ 336 channel_data, 337 call_offset, 338 [](void*, void* call_data, void* channel_data, T value) -> Poll<T> { 339 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 340 *value, static_cast<FilterType*>(channel_data)); 341 return std::move(value); 342 }, 343 nullptr, 344 nullptr, 345 }); 346 } 347 }; 348 349 // $VALUE_HANDLE $INTERCEPTOR_NAME($VALUE_HANDLE, FilterType*) 350 template <typename FilterType, typename T, 351 T (FilterType::Call::*impl)(T, FilterType*)> 352 struct AddOpImpl<FilterType, T, T (FilterType::Call::*)(T, FilterType*), impl> { 353 static void Add(FilterType* channel_data, size_t call_offset, 354 Layout<FallibleOperator<T>>& to) { 355 to.Add( 356 0, 0, 357 FallibleOperator<T>{ 358 channel_data, 359 call_offset, 360 [](void*, void* call_data, void* channel_data, 361 T value) -> Poll<ResultOr<T>> { 362 return ResultOr<T>{ 363 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 364 std::move(value), static_cast<FilterType*>(channel_data)), 365 nullptr}; 366 }, 367 nullptr, 368 nullptr, 369 }); 370 } 371 static void Add(FilterType* channel_data, size_t call_offset, 372 Layout<InfallibleOperator<T>>& to) { 373 to.Add( 374 0, 0, 375 InfallibleOperator<T>{ 376 channel_data, 377 call_offset, 378 [](void*, void* call_data, void* channel_data, T value) -> Poll<T> { 379 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 380 *value, static_cast<FilterType*>(channel_data)); 381 return ( 382 static_cast<typename FilterType::Call*>(call_data)->*impl)( 383 std::move(value), static_cast<FilterType*>(channel_data)); 384 }, 385 nullptr, 386 nullptr, 387 }); 388 } 389 }; 390 391 // absl::Status $INTERCEPTOR_NAME($VALUE_TYPE&) 392 template <typename FilterType, typename T, 393 absl::Status (FilterType::Call::*impl)(typename T::element_type&)> 394 struct AddOpImpl<FilterType, T, 395 absl::Status (FilterType::Call::*)(typename T::element_type&), 396 impl> { 397 static void Add(FilterType* channel_data, size_t call_offset, 398 Layout<FallibleOperator<T>>& to) { 399 to.Add( 400 0, 0, 401 FallibleOperator<T>{ 402 channel_data, 403 call_offset, 404 [](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> { 405 auto r = 406 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 407 *value); 408 if (r.ok()) return ResultOr<T>{std::move(value), nullptr}; 409 return ResultOr<T>{ 410 nullptr, StatusCast<ServerMetadataHandle>(std::move(r))}; 411 }, 412 nullptr, 413 nullptr, 414 }); 415 } 416 static void Add(FilterType* channel_data, size_t call_offset, 417 Layout<InfallibleOperator<T>>& to) { 418 to.Add( 419 0, 0, 420 InfallibleOperator<T>{ 421 channel_data, 422 call_offset, 423 [](void*, void* call_data, void*, T value) -> Poll<T> { 424 auto r = 425 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 426 *value); 427 if (r.ok()) return std::move(value); 428 return StatusCast<ServerMetadataHandle>(std::move(r)); 429 }, 430 nullptr, 431 nullptr, 432 }); 433 } 434 }; 435 436 // absl::Status $INTERCEPTOR_NAME(const $VALUE_TYPE&) 437 template <typename FilterType, typename T, 438 absl::Status (FilterType::Call::*impl)( 439 const typename T::element_type&)> 440 struct AddOpImpl< 441 FilterType, T, 442 absl::Status (FilterType::Call::*)(const typename T::element_type&), impl> { 443 static void Add(FilterType* channel_data, size_t call_offset, 444 Layout<FallibleOperator<T>>& to) { 445 to.Add( 446 0, 0, 447 FallibleOperator<T>{ 448 channel_data, 449 call_offset, 450 [](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> { 451 auto r = 452 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 453 *value); 454 if (r.ok()) return ResultOr<T>{std::move(value), nullptr}; 455 return ResultOr<T>{ 456 nullptr, StatusCast<ServerMetadataHandle>(std::move(r))}; 457 }, 458 nullptr, 459 nullptr, 460 }); 461 } 462 }; 463 464 // absl::Status $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*) 465 template <typename FilterType, typename T, 466 absl::Status (FilterType::Call::*impl)(typename T::element_type&, 467 FilterType*)> 468 struct AddOpImpl<FilterType, T, 469 absl::Status (FilterType::Call::*)(typename T::element_type&, 470 FilterType*), 471 impl> { 472 static void Add(FilterType* channel_data, size_t call_offset, 473 Layout<FallibleOperator<T>>& to) { 474 to.Add( 475 0, 0, 476 FallibleOperator<T>{ 477 channel_data, 478 call_offset, 479 [](void*, void* call_data, void* channel_data, 480 T value) -> Poll<ResultOr<T>> { 481 auto r = 482 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 483 *value, static_cast<FilterType*>(channel_data)); 484 if (IsStatusOk(r)) return ResultOr<T>{std::move(value), nullptr}; 485 return ResultOr<T>{ 486 nullptr, StatusCast<ServerMetadataHandle>(std::move(r))}; 487 }, 488 nullptr, 489 nullptr, 490 }); 491 } 492 }; 493 494 // absl::Status $INTERCEPTOR_NAME(const $VALUE_TYPE&, FilterType*) 495 template <typename FilterType, typename T, 496 absl::Status (FilterType::Call::*impl)( 497 const typename T::element_type&, FilterType*)> 498 struct AddOpImpl<FilterType, T, 499 absl::Status (FilterType::Call::*)( 500 const typename T::element_type&, FilterType*), 501 impl> { 502 static void Add(FilterType* channel_data, size_t call_offset, 503 Layout<FallibleOperator<T>>& to) { 504 to.Add( 505 0, 0, 506 FallibleOperator<T>{ 507 channel_data, 508 call_offset, 509 [](void*, void* call_data, void* channel_data, 510 T value) -> Poll<ResultOr<T>> { 511 auto r = 512 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 513 *value, static_cast<FilterType*>(channel_data)); 514 if (IsStatusOk(r)) return ResultOr<T>{std::move(value), nullptr}; 515 return ResultOr<T>{ 516 nullptr, StatusCast<ServerMetadataHandle>(std::move(r))}; 517 }, 518 nullptr, 519 nullptr, 520 }); 521 } 522 }; 523 524 // absl::StatusOr<$VALUE_HANDLE> $INTERCEPTOR_NAME($VALUE_HANDLE, FilterType*) 525 template <typename FilterType, typename T, 526 absl::StatusOr<T> (FilterType::Call::*impl)(T, FilterType*)> 527 struct AddOpImpl<FilterType, T, 528 absl::StatusOr<T> (FilterType::Call::*)(T, FilterType*), 529 impl> { 530 static void Add(FilterType* channel_data, size_t call_offset, 531 Layout<FallibleOperator<T>>& to) { 532 to.Add( 533 0, 0, 534 FallibleOperator<T>{ 535 channel_data, 536 call_offset, 537 [](void*, void* call_data, void* channel_data, 538 T value) -> Poll<ResultOr<T>> { 539 auto r = 540 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 541 std::move(value), static_cast<FilterType*>(channel_data)); 542 if (IsStatusOk(r)) return ResultOr<T>{std::move(*r), nullptr}; 543 return ResultOr<T>{ 544 nullptr, StatusCast<ServerMetadataHandle>(std::move(r))}; 545 }, 546 nullptr, 547 nullptr, 548 }); 549 } 550 }; 551 552 // ServerMetadataHandle $INTERCEPTOR_NAME($VALUE_TYPE&) 553 template <typename FilterType, typename T, 554 ServerMetadataHandle (FilterType::Call::*impl)( 555 typename T::element_type&)> 556 struct AddOpImpl<FilterType, T, 557 ServerMetadataHandle (FilterType::Call::*)( 558 typename T::element_type&), 559 impl> { 560 static void Add(FilterType* channel_data, size_t call_offset, 561 Layout<FallibleOperator<T>>& to) { 562 to.Add( 563 0, 0, 564 FallibleOperator<T>{ 565 channel_data, 566 call_offset, 567 [](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> { 568 auto r = 569 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 570 *value); 571 if (r == nullptr) return ResultOr<T>{std::move(value), nullptr}; 572 return ResultOr<T>{ 573 nullptr, StatusCast<ServerMetadataHandle>(std::move(r))}; 574 }, 575 nullptr, 576 nullptr, 577 }); 578 } 579 }; 580 581 // ServerMetadataHandle $INTERCEPTOR_NAME(const $VALUE_TYPE&) 582 template <typename FilterType, typename T, 583 ServerMetadataHandle (FilterType::Call::*impl)( 584 const typename T::element_type&)> 585 struct AddOpImpl<FilterType, T, 586 ServerMetadataHandle (FilterType::Call::*)( 587 const typename T::element_type&), 588 impl> { 589 static void Add(FilterType* channel_data, size_t call_offset, 590 Layout<FallibleOperator<T>>& to) { 591 to.Add( 592 0, 0, 593 FallibleOperator<T>{ 594 channel_data, 595 call_offset, 596 [](void*, void* call_data, void*, T value) -> Poll<ResultOr<T>> { 597 auto r = 598 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 599 *value); 600 if (r == nullptr) return ResultOr<T>{std::move(value), nullptr}; 601 return ResultOr<T>{ 602 nullptr, StatusCast<ServerMetadataHandle>(std::move(r))}; 603 }, 604 nullptr, 605 nullptr, 606 }); 607 } 608 }; 609 610 // ServerMetadataHandle $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*) 611 template <typename FilterType, typename T, 612 ServerMetadataHandle (FilterType::Call::*impl)( 613 typename T::element_type&, FilterType*)> 614 struct AddOpImpl<FilterType, T, 615 ServerMetadataHandle (FilterType::Call::*)( 616 typename T::element_type&, FilterType*), 617 impl> { 618 static void Add(FilterType* channel_data, size_t call_offset, 619 Layout<FallibleOperator<T>>& to) { 620 to.Add( 621 0, 0, 622 FallibleOperator<T>{ 623 channel_data, 624 call_offset, 625 [](void*, void* call_data, void* channel_data, 626 T value) -> Poll<ResultOr<T>> { 627 auto r = 628 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 629 *value, static_cast<FilterType*>(channel_data)); 630 if (r == nullptr) return ResultOr<T>{std::move(value), nullptr}; 631 return ResultOr<T>{ 632 nullptr, StatusCast<ServerMetadataHandle>(std::move(r))}; 633 }, 634 nullptr, 635 nullptr, 636 }); 637 } 638 }; 639 640 // ServerMetadataHandle $INTERCEPTOR_NAME(const $VALUE_TYPE&, FilterType*) 641 template <typename FilterType, typename T, 642 ServerMetadataHandle (FilterType::Call::*impl)( 643 const typename T::element_type&, FilterType*)> 644 struct AddOpImpl<FilterType, T, 645 ServerMetadataHandle (FilterType::Call::*)( 646 const typename T::element_type&, FilterType*), 647 impl> { 648 static void Add(FilterType* channel_data, size_t call_offset, 649 Layout<FallibleOperator<T>>& to) { 650 to.Add( 651 0, 0, 652 FallibleOperator<T>{ 653 channel_data, 654 call_offset, 655 [](void*, void* call_data, void* channel_data, 656 T value) -> Poll<ResultOr<T>> { 657 auto r = 658 (static_cast<typename FilterType::Call*>(call_data)->*impl)( 659 *value, static_cast<FilterType*>(channel_data)); 660 if (r == nullptr) return ResultOr<T>{std::move(value), nullptr}; 661 return ResultOr<T>{ 662 nullptr, StatusCast<ServerMetadataHandle>(std::move(r))}; 663 }, 664 nullptr, 665 nullptr, 666 }); 667 } 668 }; 669 670 // PROMISE_RETURNING(absl::Status) $INTERCEPTOR_NAME($VALUE_TYPE&) 671 template <typename FilterType, typename T, typename R, 672 R (FilterType::Call::*impl)(typename T::element_type&)> 673 struct AddOpImpl< 674 FilterType, T, R (FilterType::Call::*)(typename T::element_type&), impl, 675 absl::enable_if_t<std::is_same<absl::Status, PromiseResult<R>>::value>> { 676 static void Add(FilterType* channel_data, size_t call_offset, 677 Layout<FallibleOperator<T>>& to) { 678 class Promise { 679 public: 680 Promise(T value, typename FilterType::Call* call_data, FilterType*) 681 : value_(std::move(value)), impl_((call_data->*impl)(*value_)) {} 682 683 Poll<ResultOr<T>> PollOnce() { 684 auto p = impl_(); 685 auto* r = p.value_if_ready(); 686 if (r == nullptr) return Pending{}; 687 T value = std::move(value_); 688 this->~Promise(); 689 if (r->ok()) { 690 return ResultOr<T>{std::move(value), nullptr}; 691 } 692 return ResultOr<T>{nullptr, ServerMetadataFromStatus(*r)}; 693 } 694 695 private: 696 GPR_NO_UNIQUE_ADDRESS T value_; 697 GPR_NO_UNIQUE_ADDRESS R impl_; 698 }; 699 to.Add(sizeof(Promise), alignof(Promise), 700 FallibleOperator<T>{ 701 channel_data, 702 call_offset, 703 [](void* promise_data, void* call_data, void* channel_data, 704 T value) -> Poll<ResultOr<T>> { 705 auto* promise = new (promise_data) 706 Promise(std::move(value), 707 static_cast<typename FilterType::Call*>(call_data), 708 static_cast<FilterType*>(channel_data)); 709 return promise->PollOnce(); 710 }, 711 [](void* promise_data) { 712 return static_cast<Promise*>(promise_data)->PollOnce(); 713 }, 714 [](void* promise_data) { 715 static_cast<Promise*>(promise_data)->~Promise(); 716 }, 717 }); 718 } 719 }; 720 721 // PROMISE_RETURNING(absl::Status) $INTERCEPTOR_NAME($VALUE_TYPE&, FilterType*) 722 template <typename FilterType, typename T, typename R, 723 R (FilterType::Call::*impl)(typename T::element_type&, FilterType*)> 724 struct AddOpImpl< 725 FilterType, T, 726 R (FilterType::Call::*)(typename T::element_type&, FilterType*), impl, 727 absl::enable_if_t<!std::is_same<R, absl::Status>::value && 728 std::is_same<absl::Status, PromiseResult<R>>::value>> { 729 static void Add(FilterType* channel_data, size_t call_offset, 730 Layout<FallibleOperator<T>>& to) { 731 class Promise { 732 public: 733 Promise(T value, typename FilterType::Call* call_data, 734 FilterType* channel_data) 735 : value_(std::move(value)), 736 impl_((call_data->*impl)(*value_, channel_data)) {} 737 738 Poll<ResultOr<T>> PollOnce() { 739 auto p = impl_(); 740 auto* r = p.value_if_ready(); 741 if (r == nullptr) return Pending{}; 742 T value = std::move(value_); 743 this->~Promise(); 744 if (r->ok()) { 745 return ResultOr<T>{std::move(value), nullptr}; 746 } 747 return ResultOr<T>{nullptr, ServerMetadataFromStatus(*r)}; 748 } 749 750 private: 751 GPR_NO_UNIQUE_ADDRESS T value_; 752 GPR_NO_UNIQUE_ADDRESS R impl_; 753 }; 754 to.Add(sizeof(Promise), alignof(Promise), 755 FallibleOperator<T>{ 756 channel_data, 757 call_offset, 758 [](void* promise_data, void* call_data, void* channel_data, 759 T value) -> Poll<ResultOr<T>> { 760 auto* promise = new (promise_data) 761 Promise(std::move(value), 762 static_cast<typename FilterType::Call*>(call_data), 763 static_cast<FilterType*>(channel_data)); 764 return promise->PollOnce(); 765 }, 766 [](void* promise_data) { 767 return static_cast<Promise*>(promise_data)->PollOnce(); 768 }, 769 [](void* promise_data) { 770 static_cast<Promise*>(promise_data)->~Promise(); 771 }, 772 }); 773 } 774 }; 775 776 // PROMISE_RETURNING(absl::StatusOr<$VALUE_HANDLE>) 777 // $INTERCEPTOR_NAME($VALUE_HANDLE, FilterType*) 778 template <typename FilterType, typename T, typename R, 779 R (FilterType::Call::*impl)(T, FilterType*)> 780 struct AddOpImpl<FilterType, T, R (FilterType::Call::*)(T, FilterType*), impl, 781 absl::enable_if_t<std::is_same<absl::StatusOr<T>, 782 PromiseResult<R>>::value>> { 783 static void Add(FilterType* channel_data, size_t call_offset, 784 Layout<FallibleOperator<T>>& to) { 785 class Promise { 786 public: 787 Promise(T value, typename FilterType::Call* call_data, 788 FilterType* channel_data) 789 : impl_((call_data->*impl)(std::move(value), channel_data)) {} 790 791 Poll<ResultOr<T>> PollOnce() { 792 auto p = impl_(); 793 auto* r = p.value_if_ready(); 794 if (r == nullptr) return Pending{}; 795 this->~Promise(); 796 if (r->ok()) return ResultOr<T>{std::move(**r), nullptr}; 797 return ResultOr<T>{nullptr, ServerMetadataFromStatus(r->status())}; 798 } 799 800 private: 801 GPR_NO_UNIQUE_ADDRESS R impl_; 802 }; 803 to.Add(sizeof(Promise), alignof(Promise), 804 FallibleOperator<T>{ 805 channel_data, 806 call_offset, 807 [](void* promise_data, void* call_data, void* channel_data, 808 T value) -> Poll<ResultOr<T>> { 809 auto* promise = new (promise_data) 810 Promise(std::move(value), 811 static_cast<typename FilterType::Call*>(call_data), 812 static_cast<FilterType*>(channel_data)); 813 return promise->PollOnce(); 814 }, 815 [](void* promise_data) { 816 return static_cast<Promise*>(promise_data)->PollOnce(); 817 }, 818 [](void* promise_data) { 819 static_cast<Promise*>(promise_data)->~Promise(); 820 }, 821 }); 822 } 823 }; 824 825 struct ChannelDataDestructor { 826 void (*destroy)(void* channel_data); 827 void* channel_data; 828 }; 829 830 // StackData contains the main datastructures built up by this module. 831 // It's a complete representation of all the code that needs to be invoked 832 // to execute a call for a given set of filters. 833 // This structure is held at the channel layer and is shared between many 834 // in-flight calls. 835 struct StackData { 836 // Overall size and alignment of the call data for this stack. 837 size_t call_data_alignment = 1; 838 size_t call_data_size = 0; 839 // A complete list of filters for this call, so that we can construct the 840 // call data for each filter. 841 std::vector<FilterConstructor> filter_constructor; 842 std::vector<FilterDestructor> filter_destructor; 843 // For each kind of operation, a layout of the operations for this call. 844 // (there's some duplicate data here, and that's ok: we want to avoid 845 // pointer chasing as much as possible when executing a call) 846 Layout<FallibleOperator<ClientMetadataHandle>> client_initial_metadata; 847 Layout<FallibleOperator<ServerMetadataHandle>> server_initial_metadata; 848 Layout<FallibleOperator<MessageHandle>> client_to_server_messages; 849 Layout<FallibleOperator<MessageHandle>> server_to_client_messages; 850 Layout<InfallibleOperator<ServerMetadataHandle>> server_trailing_metadata; 851 // A list of finalizers for this call. 852 // We use a bespoke data structure here because finalizers can never be 853 // asynchronous. 854 std::vector<Finalizer> finalizers; 855 // A list of functions to call when this stack data is destroyed 856 // (to capture ownership of channel data) 857 std::vector<ChannelDataDestructor> channel_data_destructors; 858 859 // Add one filter to the list of filters, and update alignment. 860 // Returns the offset of the call data for this filter. 861 // Specifically does not update any of the layouts or finalizers. 862 // Callers are expected to do that themselves. 863 // This separation enables separation of *testing* of filters, and since 864 // this is a detail type it's felt that a slightly harder to hold API that 865 // we have exactly one caller for is warranted for a more thorough testing 866 // story. 867 template <typename FilterType> 868 absl::enable_if_t<!std::is_empty<typename FilterType::Call>::value, size_t> 869 AddFilterConstructor(FilterType* channel_data) { 870 const size_t alignment = alignof(typename FilterType::Call); 871 call_data_alignment = std::max(call_data_alignment, alignment); 872 if (call_data_size % alignment != 0) { 873 call_data_size += alignment - call_data_size % alignment; 874 } 875 const size_t call_offset = call_data_size; 876 call_data_size += sizeof(typename FilterType::Call); 877 filter_constructor.push_back(FilterConstructor{ 878 channel_data, 879 call_offset, 880 [](void* call_data, void* channel_data) { 881 CallConstructor<FilterType>::Construct( 882 call_data, static_cast<FilterType*>(channel_data)); 883 }, 884 }); 885 return call_offset; 886 } 887 888 template <typename FilterType> 889 absl::enable_if_t< 890 std::is_empty<typename FilterType::Call>::value && 891 !std::is_trivially_constructible<typename FilterType::Call>::value, 892 size_t> 893 AddFilterConstructor(FilterType* channel_data) { 894 const size_t alignment = alignof(typename FilterType::Call); 895 call_data_alignment = std::max(call_data_alignment, alignment); 896 filter_constructor.push_back(FilterConstructor{ 897 channel_data, 898 0, 899 [](void* call_data, void* channel_data) { 900 CallConstructor<FilterType>::Construct( 901 call_data, static_cast<FilterType*>(channel_data)); 902 }, 903 }); 904 return 0; 905 } 906 907 template <typename FilterType> 908 absl::enable_if_t< 909 std::is_empty<typename FilterType::Call>::value && 910 std::is_trivially_constructible<typename FilterType::Call>::value, 911 size_t> 912 AddFilterConstructor(FilterType*) { 913 const size_t alignment = alignof(typename FilterType::Call); 914 call_data_alignment = std::max(call_data_alignment, alignment); 915 return 0; 916 } 917 918 template <typename FilterType> 919 absl::enable_if_t< 920 !std::is_trivially_destructible<typename FilterType::Call>::value> 921 AddFilterDestructor(size_t call_offset) { 922 filter_destructor.push_back(FilterDestructor{ 923 call_offset, 924 [](void* call_data) { 925 static_cast<typename FilterType::Call*>(call_data)->~Call(); 926 }, 927 }); 928 } 929 930 template <typename FilterType> 931 absl::enable_if_t< 932 std::is_trivially_destructible<typename FilterType::Call>::value> 933 AddFilterDestructor(size_t) {} 934 935 template <typename FilterType> 936 size_t AddFilter(FilterType* filter) { 937 const size_t call_offset = AddFilterConstructor(filter); 938 AddFilterDestructor<FilterType>(call_offset); 939 return call_offset; 940 } 941 942 // Per operation adders - one for each interception point. 943 // Delegate to AddOp() above. 944 945 template <typename FilterType> 946 void AddClientInitialMetadataOp(FilterType* channel_data, 947 size_t call_offset) { 948 AddOp<decltype(&FilterType::Call::OnClientInitialMetadata), 949 &FilterType::Call::OnClientInitialMetadata>(channel_data, call_offset, 950 client_initial_metadata); 951 } 952 953 template <typename FilterType> 954 void AddServerInitialMetadataOp(FilterType* channel_data, 955 size_t call_offset) { 956 AddOp<decltype(&FilterType::Call::OnServerInitialMetadata), 957 &FilterType::Call::OnServerInitialMetadata>(channel_data, call_offset, 958 server_initial_metadata); 959 } 960 961 template <typename FilterType> 962 void AddClientToServerMessageOp(FilterType* channel_data, 963 size_t call_offset) { 964 AddOp<decltype(&FilterType::Call::OnClientToServerMessage), 965 &FilterType::Call::OnClientToServerMessage>( 966 channel_data, call_offset, client_to_server_messages); 967 } 968 969 template <typename FilterType> 970 void AddServerToClientMessageOp(FilterType* channel_data, 971 size_t call_offset) { 972 AddOp<decltype(&FilterType::Call::OnServerToClientMessage), 973 &FilterType::Call::OnServerToClientMessage>( 974 channel_data, call_offset, server_to_client_messages); 975 } 976 977 template <typename FilterType> 978 void AddServerTrailingMetadataOp(FilterType* channel_data, 979 size_t call_offset) { 980 AddOp<decltype(&FilterType::Call::OnServerTrailingMetadata), 981 &FilterType::Call::OnServerTrailingMetadata>( 982 channel_data, call_offset, server_trailing_metadata); 983 } 984 985 // Finalizer interception adders 986 987 template <typename FilterType> 988 void AddFinalizer(FilterType*, size_t, const NoInterceptor* p) { 989 GPR_DEBUG_ASSERT(p == &FilterType::Call::OnFinalize); 990 } 991 992 template <typename FilterType> 993 void AddFinalizer(FilterType* channel_data, size_t call_offset, 994 void (FilterType::Call::*p)(const grpc_call_final_info*)) { 995 GPR_DEBUG_ASSERT(p == &FilterType::Call::OnFinalize); 996 finalizers.push_back(Finalizer{ 997 channel_data, 998 call_offset, 999 [](void* call_data, void*, const grpc_call_final_info* final_info) { 1000 static_cast<typename FilterType::Call*>(call_data)->OnFinalize( 1001 final_info); 1002 }, 1003 }); 1004 } 1005 1006 template <typename FilterType> 1007 void AddFinalizer(FilterType* channel_data, size_t call_offset, 1008 void (FilterType::Call::*p)(const grpc_call_final_info*, 1009 FilterType*)) { 1010 GPR_DEBUG_ASSERT(p == &FilterType::Call::OnFinalize); 1011 finalizers.push_back(Finalizer{ 1012 channel_data, 1013 call_offset, 1014 [](void* call_data, void* channel_data, 1015 const grpc_call_final_info* final_info) { 1016 static_cast<typename FilterType::Call*>(call_data)->OnFinalize( 1017 final_info, static_cast<FilterType*>(channel_data)); 1018 }, 1019 }); 1020 } 1021 }; 1022 1023 // OperationExecutor is a helper class to execute a sequence of operations 1024 // from a layout on one value. 1025 // We instantiate one of these during the *Pull* promise for each operation 1026 // and wait for it to resolve. 1027 // At this layer the filters look like a list of transformations on the 1028 // value pushed. 1029 // An early-failing filter will cause subsequent filters to not execute. 1030 template <typename T> 1031 class OperationExecutor { 1032 public: 1033 OperationExecutor() = default; 1034 ~OperationExecutor(); 1035 OperationExecutor(const OperationExecutor&) = delete; 1036 OperationExecutor& operator=(const OperationExecutor&) = delete; 1037 OperationExecutor(OperationExecutor&& other) noexcept 1038 : ops_(other.ops_), end_ops_(other.end_ops_) { 1039 // Movable iff we're not running. 1040 GPR_DEBUG_ASSERT(other.promise_data_ == nullptr); 1041 } 1042 OperationExecutor& operator=(OperationExecutor&& other) noexcept { 1043 GPR_DEBUG_ASSERT(other.promise_data_ == nullptr); 1044 GPR_DEBUG_ASSERT(promise_data_ == nullptr); 1045 ops_ = other.ops_; 1046 end_ops_ = other.end_ops_; 1047 return *this; 1048 } 1049 // IsRunning() is true if we're currently executing a sequence of operations. 1050 bool IsRunning() const { return promise_data_ != nullptr; } 1051 // Start executing a layout. May allocate space to store the relevant promise. 1052 // Returns the result of the first poll. 1053 // If the promise finishes, also destroy the promise data. 1054 Poll<ResultOr<T>> Start(const Layout<FallibleOperator<T>>* layout, T input, 1055 void* call_data); 1056 // Continue executing a layout. Returns the result of the next poll. 1057 // If the promise finishes, also destroy the promise data. 1058 Poll<ResultOr<T>> Step(void* call_data); 1059 1060 private: 1061 // Start polling on the current step of the layout. 1062 // `input` is the current value (either the input to the first step, or the 1063 // so far transformed value) 1064 // `call_data` is the call data for the filter stack. 1065 // If this op finishes immediately then we iterative move to the next step. 1066 // If we reach the end up the ops, we return the overall poll result, 1067 // otherwise we return Pending. 1068 Poll<ResultOr<T>> InitStep(T input, void* call_data); 1069 // Continue polling on the current step of the layout. 1070 // Called on the next poll after InitStep returns pending. 1071 // If the promise is still pending, returns this. 1072 // If the promise completes we call into InitStep to continue execution 1073 // through the filters. 1074 Poll<ResultOr<T>> ContinueStep(void* call_data); 1075 1076 void* promise_data_ = nullptr; 1077 const FallibleOperator<T>* ops_; 1078 const FallibleOperator<T>* end_ops_; 1079 }; 1080 1081 // Per OperationExecutor, but for infallible operation sequences. 1082 template <typename T> 1083 class InfallibleOperationExecutor { 1084 public: 1085 InfallibleOperationExecutor() = default; 1086 ~InfallibleOperationExecutor(); 1087 InfallibleOperationExecutor(const InfallibleOperationExecutor&) = delete; 1088 InfallibleOperationExecutor& operator=(const InfallibleOperationExecutor&) = 1089 delete; 1090 InfallibleOperationExecutor(InfallibleOperationExecutor&& other) noexcept 1091 : ops_(other.ops_), end_ops_(other.end_ops_) { 1092 // Movable iff we're not running. 1093 GPR_DEBUG_ASSERT(other.promise_data_ == nullptr); 1094 } 1095 InfallibleOperationExecutor& operator=( 1096 InfallibleOperationExecutor&& other) noexcept { 1097 GPR_DEBUG_ASSERT(other.promise_data_ == nullptr); 1098 GPR_DEBUG_ASSERT(promise_data_ == nullptr); 1099 ops_ = other.ops_; 1100 end_ops_ = other.end_ops_; 1101 return *this; 1102 } 1103 1104 // IsRunning() is true if we're currently executing a sequence of operations. 1105 bool IsRunning() const { return promise_data_ != nullptr; } 1106 // Start executing a layout. May allocate space to store the relevant promise. 1107 // Returns the result of the first poll. 1108 // If the promise finishes, also destroy the promise data. 1109 Poll<T> Start(const Layout<InfallibleOperator<T>>* layout, T input, 1110 void* call_data); 1111 // Continue executing a layout. Returns the result of the next poll. 1112 // If the promise finishes, also destroy the promise data. 1113 Poll<T> Step(void* call_data); 1114 1115 private: 1116 // Start polling on the current step of the layout. 1117 // `input` is the current value (either the input to the first step, or the 1118 // so far transformed value) 1119 // `call_data` is the call data for the filter stack. 1120 // If this op finishes immediately then we iterative move to the next step. 1121 // If we reach the end up the ops, we return the overall poll result, 1122 // otherwise we return Pending. 1123 Poll<T> InitStep(T input, void* call_data); 1124 // Continue polling on the current step of the layout. 1125 // Called on the next poll after InitStep returns pending. 1126 // If the promise is still pending, returns this. 1127 // If the promise completes we call into InitStep to continue execution 1128 // through the filters. 1129 Poll<T> ContinueStep(void* call_data); 1130 1131 void* promise_data_ = nullptr; 1132 const InfallibleOperator<T>* ops_; 1133 const InfallibleOperator<T>* end_ops_; 1134 }; 1135 1136 // The current state of a pipe. 1137 // CallFilters expose a set of pipe like objects for client & server initial 1138 // metadata and for messages. 1139 // This class tracks the state of one of those pipes. 1140 // Size matters here: this state is kept for the lifetime of a call, and we keep 1141 // multiple of them. 1142 // This class encapsulates the untyped work of the state machine; there are 1143 // typed wrappers around this class as private members of CallFilters that 1144 // augment it to provide all the functionality that we must. 1145 class PipeState { 1146 public: 1147 struct StartPushed {}; 1148 PipeState() = default; 1149 explicit PipeState(StartPushed) : state_(ValueState::kQueued) {} 1150 // Start the pipe: allows pulls to proceed 1151 void Start(); 1152 // A push operation is beginning 1153 void BeginPush(); 1154 // A previously started push operation has completed 1155 void DropPush(); 1156 // Poll for push completion: occurs after the corresponding Pull() 1157 Poll<StatusFlag> PollPush(); 1158 // Poll for pull completion; returns Failure{} if closed with error, 1159 // true if a value is available, or false if the pipe was closed without 1160 // error. 1161 Poll<ValueOrFailure<bool>> PollPull(); 1162 // A pulled value has been consumed: we can unblock the push 1163 void AckPull(); 1164 // A previously started pull operation has completed 1165 void DropPull(); 1166 // Close sending 1167 void CloseSending(); 1168 // Close sending with error 1169 void CloseWithError(); 1170 // Poll for closedness - if true, closed with error 1171 Poll<bool> PollClosed(); 1172 1173 bool holds_error() const { return state_ == ValueState::kError; } 1174 1175 std::string DebugString() const; 1176 1177 private: 1178 enum class ValueState : uint8_t { 1179 // Nothing sending nor receiving 1180 kIdle, 1181 // Sent, but not yet received 1182 kQueued, 1183 // Trying to receive, but not yet sent 1184 kWaiting, 1185 // Ready to start processing, but not yet started 1186 // (we have the value to send through the pipe, the reader is waiting, 1187 // but it's not yet been polled) 1188 kReady, 1189 // Processing through filters 1190 kProcessing, 1191 // Closed sending 1192 kClosed, 1193 // Closed due to failure 1194 kError 1195 }; 1196 // Waiter for a promise blocked waiting to send. 1197 IntraActivityWaiter wait_send_; 1198 // Waiter for a promise blocked waiting to receive. 1199 IntraActivityWaiter wait_recv_; 1200 // Current state. 1201 ValueState state_ = ValueState::kIdle; 1202 // Has the pipe been started? 1203 bool started_ = false; 1204 }; 1205 1206 template <typename Fn> 1207 class ServerTrailingMetadataInterceptor { 1208 public: 1209 class Call { 1210 public: 1211 static const NoInterceptor OnClientInitialMetadata; 1212 static const NoInterceptor OnServerInitialMetadata; 1213 static const NoInterceptor OnClientToServerMessage; 1214 static const NoInterceptor OnServerToClientMessage; 1215 static const NoInterceptor OnFinalize; 1216 void OnServerTrailingMetadata(ServerMetadata& md, 1217 ServerTrailingMetadataInterceptor* filter) { 1218 filter->fn_(md); 1219 } 1220 }; 1221 1222 explicit ServerTrailingMetadataInterceptor(Fn fn) : fn_(std::move(fn)) {} 1223 1224 private: 1225 GPR_NO_UNIQUE_ADDRESS Fn fn_; 1226 }; 1227 template <typename Fn> 1228 const NoInterceptor 1229 ServerTrailingMetadataInterceptor<Fn>::Call::OnClientInitialMetadata; 1230 template <typename Fn> 1231 const NoInterceptor 1232 ServerTrailingMetadataInterceptor<Fn>::Call::OnServerInitialMetadata; 1233 template <typename Fn> 1234 const NoInterceptor 1235 ServerTrailingMetadataInterceptor<Fn>::Call::OnClientToServerMessage; 1236 template <typename Fn> 1237 const NoInterceptor 1238 ServerTrailingMetadataInterceptor<Fn>::Call::OnServerToClientMessage; 1239 template <typename Fn> 1240 const NoInterceptor ServerTrailingMetadataInterceptor<Fn>::Call::OnFinalize; 1241 1242 } // namespace filters_detail 1243 1244 // Execution environment for a stack of filters. 1245 // This is a per-call object. 1246 class CallFilters { 1247 public: 1248 class StackBuilder; 1249 class StackTestSpouse; 1250 1251 // A stack is an opaque, immutable type that contains the data necessary to 1252 // execute a call through a given set of filters. 1253 // It's reference counted so that it can be shared between many calls. 1254 // It contains pointers to the individual filters, yet it does not own those 1255 // pointers: it's expected that some other object will track that ownership. 1256 class Stack : public RefCounted<Stack> { 1257 public: 1258 ~Stack() override; 1259 1260 private: 1261 friend class CallFilters; 1262 friend class StackBuilder; 1263 friend class StackTestSpouse; 1264 explicit Stack(filters_detail::StackData data) : data_(std::move(data)) {} 1265 const filters_detail::StackData data_; 1266 }; 1267 1268 // Build stacks... repeatedly call Add with each filter that contributes to 1269 // the stack, then call Build() to generate a ref counted Stack object. 1270 class StackBuilder { 1271 public: 1272 ~StackBuilder(); 1273 1274 template <typename FilterType> 1275 void Add(FilterType* filter) { 1276 const size_t call_offset = data_.AddFilter<FilterType>(filter); 1277 data_.AddClientInitialMetadataOp(filter, call_offset); 1278 data_.AddServerInitialMetadataOp(filter, call_offset); 1279 data_.AddClientToServerMessageOp(filter, call_offset); 1280 data_.AddServerToClientMessageOp(filter, call_offset); 1281 data_.AddServerTrailingMetadataOp(filter, call_offset); 1282 data_.AddFinalizer(filter, call_offset, &FilterType::Call::OnFinalize); 1283 } 1284 1285 void AddOwnedObject(void (*destroy)(void* p), void* p) { 1286 data_.channel_data_destructors.push_back({destroy, p}); 1287 } 1288 1289 template <typename T> 1290 void AddOwnedObject(RefCountedPtr<T> p) { 1291 AddOwnedObject([](void* p) { static_cast<T*>(p)->Unref(); }, p.release()); 1292 } 1293 1294 template <typename T> 1295 void AddOwnedObject(std::unique_ptr<T> p) { 1296 AddOwnedObject([](void* p) { delete static_cast<T*>(p); }, p.release()); 1297 } 1298 1299 template <typename Fn> 1300 void AddOnServerTrailingMetadata(Fn fn) { 1301 auto filter = std::make_unique< 1302 filters_detail::ServerTrailingMetadataInterceptor<Fn>>(std::move(fn)); 1303 Add(filter.get()); 1304 AddOwnedObject(std::move(filter)); 1305 } 1306 1307 RefCountedPtr<Stack> Build(); 1308 1309 private: 1310 filters_detail::StackData data_; 1311 }; 1312 1313 class NextMessage { 1314 public: 1315 NextMessage() : has_value_(false), cancelled_(false) {} 1316 explicit NextMessage(MessageHandle value) 1317 : has_value_(true), value_(std::move(value)) {} 1318 explicit NextMessage(bool cancelled) 1319 : has_value_(false), cancelled_(cancelled) {} 1320 NextMessage(const NextMessage&) = delete; 1321 NextMessage& operator=(const NextMessage&) = delete; 1322 NextMessage(NextMessage&& other) noexcept = default; 1323 NextMessage& operator=(NextMessage&& other) = default; 1324 1325 using value_type = MessageHandle; 1326 1327 void reset() { 1328 has_value_ = false; 1329 cancelled_ = false; 1330 value_.reset(); 1331 } 1332 bool has_value() const { return has_value_; } 1333 const MessageHandle& value() const { 1334 GPR_DEBUG_ASSERT(has_value_); 1335 return value_; 1336 } 1337 MessageHandle& value() { 1338 GPR_DEBUG_ASSERT(has_value_); 1339 return value_; 1340 } 1341 const MessageHandle& operator*() const { return value(); } 1342 MessageHandle& operator*() { return value(); } 1343 bool cancelled() const { return !has_value_ && cancelled_; } 1344 1345 private: 1346 bool has_value_; 1347 bool cancelled_; 1348 MessageHandle value_; 1349 }; 1350 1351 explicit CallFilters(ClientMetadataHandle client_initial_metadata); 1352 ~CallFilters(); 1353 1354 CallFilters(const CallFilters&) = delete; 1355 CallFilters& operator=(const CallFilters&) = delete; 1356 CallFilters(CallFilters&&) = delete; 1357 CallFilters& operator=(CallFilters&&) = delete; 1358 1359 void SetStack(RefCountedPtr<Stack> stack); 1360 1361 // Access client initial metadata before it's processed 1362 ClientMetadata* unprocessed_client_initial_metadata() { 1363 return client_initial_metadata_.get(); 1364 } 1365 1366 // Client: Fetch client initial metadata 1367 // Returns a promise that resolves to ValueOrFailure<ClientMetadataHandle> 1368 GRPC_MUST_USE_RESULT auto PullClientInitialMetadata(); 1369 // Server: Indicate that no server initial metadata will be sent 1370 void NoServerInitialMetadata() { 1371 server_initial_metadata_state_.CloseSending(); 1372 } 1373 // Server: Push server initial metadata 1374 // Returns a promise that resolves to a StatusFlag indicating success 1375 GRPC_MUST_USE_RESULT auto PushServerInitialMetadata(ServerMetadataHandle md); 1376 // Client: Fetch server initial metadata 1377 // Returns a promise that resolves to ValueOrFailure<ServerMetadataHandle> 1378 GRPC_MUST_USE_RESULT auto PullServerInitialMetadata(); 1379 // Client: Push client to server message 1380 // Returns a promise that resolves to a StatusFlag indicating success 1381 GRPC_MUST_USE_RESULT auto PushClientToServerMessage(MessageHandle message); 1382 // Client: Indicate that no more messages will be sent 1383 void FinishClientToServerSends() { 1384 client_to_server_message_state_.CloseSending(); 1385 } 1386 // Server: Fetch client to server message 1387 // Returns a promise that resolves to ValueOrFailure<MessageHandle> 1388 GRPC_MUST_USE_RESULT auto PullClientToServerMessage(); 1389 // Server: Push server to client message 1390 // Returns a promise that resolves to a StatusFlag indicating success 1391 GRPC_MUST_USE_RESULT auto PushServerToClientMessage(MessageHandle message); 1392 // Server: Fetch server to client message 1393 // Returns a promise that resolves to ValueOrFailure<MessageHandle> 1394 GRPC_MUST_USE_RESULT auto PullServerToClientMessage(); 1395 // Server: Indicate end of response 1396 // Closes the request entirely - no messages can be sent/received 1397 // If no server initial metadata has been sent, implies 1398 // NoServerInitialMetadata() called. 1399 void PushServerTrailingMetadata(ServerMetadataHandle md); 1400 // Client: Fetch server trailing metadata 1401 // Returns a promise that resolves to ServerMetadataHandle 1402 GRPC_MUST_USE_RESULT auto PullServerTrailingMetadata(); 1403 // Server: Wait for server trailing metadata to have been sent 1404 // Returns a promise that resolves to a StatusFlag indicating whether the 1405 // request was cancelled or not -- failure to send trailing metadata is 1406 // considered a cancellation, as is actual cancellation -- but not application 1407 // errors. 1408 GRPC_MUST_USE_RESULT auto WasCancelled(); 1409 // Client & server: fill in final_info with the final status of the call. 1410 void Finalize(const grpc_call_final_info* final_info); 1411 1412 std::string DebugString() const; 1413 1414 private: 1415 template <filters_detail::PipeState(CallFilters::*state_ptr), 1416 void*(CallFilters::*push_ptr), typename T, 1417 filters_detail::Layout<filters_detail::FallibleOperator<T>>( 1418 filters_detail::StackData::*layout_ptr)> 1419 class PipePromise { 1420 public: 1421 class Push { 1422 public: 1423 Push(CallFilters* filters, T x) 1424 : filters_(filters), value_(std::move(x)) { 1425 state().BeginPush(); 1426 push_slot() = this; 1427 } 1428 ~Push() { 1429 if (filters_ != nullptr) { 1430 state().DropPush(); 1431 push_slot() = nullptr; 1432 } 1433 } 1434 1435 Push(const Push&) = delete; 1436 Push& operator=(const Push&) = delete; 1437 Push(Push&& other) noexcept 1438 : filters_(std::exchange(other.filters_, nullptr)), 1439 value_(std::move(other.value_)) { 1440 if (filters_ != nullptr) { 1441 GPR_DEBUG_ASSERT(push_slot() == &other); 1442 push_slot() = this; 1443 } 1444 } 1445 1446 Push& operator=(Push&&) = delete; 1447 1448 Poll<StatusFlag> operator()() { return state().PollPush(); } 1449 1450 T TakeValue() { return std::move(value_); } 1451 1452 absl::string_view DebugString() const { 1453 return value_ != nullptr ? " (not pulled)" : ""; 1454 } 1455 1456 private: 1457 filters_detail::PipeState& state() { return filters_->*state_ptr; } 1458 void*& push_slot() { return filters_->*push_ptr; } 1459 1460 CallFilters* filters_; 1461 T value_; 1462 }; 1463 1464 static std::string DebugString(absl::string_view name, 1465 const CallFilters* filters) { 1466 auto* push = static_cast<Push*>(filters->*push_ptr); 1467 return absl::StrCat(name, ":", (filters->*state_ptr).DebugString(), 1468 push == nullptr ? "" : push->DebugString()); 1469 } 1470 1471 class PullMaybe { 1472 public: 1473 explicit PullMaybe(CallFilters* filters) : filters_(filters) {} 1474 ~PullMaybe() { 1475 if (filters_ != nullptr) { 1476 state().DropPull(); 1477 } 1478 } 1479 1480 PullMaybe(const PullMaybe&) = delete; 1481 PullMaybe& operator=(const PullMaybe&) = delete; 1482 PullMaybe(PullMaybe&& other) noexcept 1483 : filters_(std::exchange(other.filters_, nullptr)), 1484 executor_(std::move(other.executor_)) {} 1485 PullMaybe& operator=(PullMaybe&&) = delete; 1486 1487 Poll<ValueOrFailure<absl::optional<T>>> operator()() { 1488 if (executor_.IsRunning()) { 1489 auto c = state().PollClosed(); 1490 if (c.ready() && c.value()) { 1491 filters_->CancelDueToFailedPipeOperation(); 1492 return Failure{}; 1493 } 1494 return FinishOperationExecutor(executor_.Step(filters_->call_data_)); 1495 } 1496 auto p = state().PollPull(); 1497 auto* r = p.value_if_ready(); 1498 if (r == nullptr) return Pending{}; 1499 if (!r->ok()) { 1500 filters_->CancelDueToFailedPipeOperation(); 1501 return Failure{}; 1502 } 1503 if (!**r) return absl::nullopt; 1504 return FinishOperationExecutor(executor_.Start( 1505 layout(), push()->TakeValue(), filters_->call_data_)); 1506 } 1507 1508 private: 1509 filters_detail::PipeState& state() { return filters_->*state_ptr; } 1510 Push* push() { return static_cast<Push*>(filters_->*push_ptr); } 1511 const filters_detail::Layout<filters_detail::FallibleOperator<T>>* 1512 layout() { 1513 return &(filters_->stack_->data_.*layout_ptr); 1514 } 1515 1516 Poll<ValueOrFailure<absl::optional<T>>> FinishOperationExecutor( 1517 Poll<filters_detail::ResultOr<T>> p) { 1518 auto* r = p.value_if_ready(); 1519 if (r == nullptr) return Pending{}; 1520 GPR_DEBUG_ASSERT(!executor_.IsRunning()); 1521 state().AckPull(); 1522 if (r->ok != nullptr) return std::move(r->ok); 1523 filters_->PushServerTrailingMetadata(std::move(r->error)); 1524 return Failure{}; 1525 } 1526 1527 CallFilters* filters_; 1528 filters_detail::OperationExecutor<T> executor_; 1529 }; 1530 1531 class PullMessage { 1532 public: 1533 explicit PullMessage(CallFilters* filters) : filters_(filters) {} 1534 ~PullMessage() { 1535 if (filters_ != nullptr) { 1536 state().DropPull(); 1537 } 1538 } 1539 1540 PullMessage(const PullMessage&) = delete; 1541 PullMessage& operator=(const PullMessage&) = delete; 1542 PullMessage(PullMessage&& other) noexcept 1543 : filters_(std::exchange(other.filters_, nullptr)), 1544 executor_(std::move(other.executor_)) {} 1545 PullMessage& operator=(PullMessage&&) = delete; 1546 1547 Poll<NextMessage> operator()() { 1548 if (executor_.IsRunning()) { 1549 auto c = state().PollClosed(); 1550 if (c.ready() && c.value()) { 1551 filters_->CancelDueToFailedPipeOperation(); 1552 return NextMessage(true); 1553 } 1554 return FinishOperationExecutor(executor_.Step(filters_->call_data_)); 1555 } 1556 auto p = state().PollPull(); 1557 auto* r = p.value_if_ready(); 1558 if (r == nullptr) return Pending{}; 1559 if (!r->ok()) { 1560 filters_->CancelDueToFailedPipeOperation(); 1561 return NextMessage(true); 1562 } 1563 if (!**r) return NextMessage(false); 1564 return FinishOperationExecutor(executor_.Start( 1565 layout(), push()->TakeValue(), filters_->call_data_)); 1566 } 1567 1568 private: 1569 filters_detail::PipeState& state() { return filters_->*state_ptr; } 1570 Push* push() { return static_cast<Push*>(filters_->*push_ptr); } 1571 const filters_detail::Layout<filters_detail::FallibleOperator<T>>* 1572 layout() { 1573 return &(filters_->stack_->data_.*layout_ptr); 1574 } 1575 1576 Poll<NextMessage> FinishOperationExecutor( 1577 Poll<filters_detail::ResultOr<T>> p) { 1578 auto* r = p.value_if_ready(); 1579 if (r == nullptr) return Pending{}; 1580 GPR_DEBUG_ASSERT(!executor_.IsRunning()); 1581 state().AckPull(); 1582 if (r->ok != nullptr) return NextMessage(std::move(r->ok)); 1583 filters_->PushServerTrailingMetadata(std::move(r->error)); 1584 return NextMessage(true); 1585 } 1586 1587 CallFilters* filters_; 1588 filters_detail::OperationExecutor<T> executor_; 1589 }; 1590 }; 1591 1592 class PullClientInitialMetadataPromise { 1593 public: 1594 explicit PullClientInitialMetadataPromise(CallFilters* filters) 1595 : filters_(filters) {} 1596 1597 PullClientInitialMetadataPromise(const PullClientInitialMetadataPromise&) = 1598 delete; 1599 PullClientInitialMetadataPromise& operator=( 1600 const PullClientInitialMetadataPromise&) = delete; 1601 PullClientInitialMetadataPromise( 1602 PullClientInitialMetadataPromise&& other) noexcept 1603 : filters_(std::exchange(other.filters_, nullptr)), 1604 executor_(std::move(other.executor_)) {} 1605 PullClientInitialMetadataPromise& operator=( 1606 PullClientInitialMetadataPromise&&) = delete; 1607 1608 Poll<ValueOrFailure<ClientMetadataHandle>> operator()() { 1609 if (executor_.IsRunning()) { 1610 return FinishOperationExecutor(executor_.Step(filters_->call_data_)); 1611 } 1612 auto p = state().PollPull(); 1613 auto* r = p.value_if_ready(); 1614 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) { 1615 gpr_log(GPR_INFO, "%s", 1616 r == nullptr 1617 ? "PENDING" 1618 : (r->ok() ? (r->value() ? "TRUE" : "FALSE") : "FAILURE")); 1619 } 1620 if (r == nullptr) return Pending{}; 1621 if (!r->ok()) { 1622 filters_->CancelDueToFailedPipeOperation(); 1623 return Failure{}; 1624 } 1625 GPR_ASSERT(filters_->client_initial_metadata_ != nullptr); 1626 return FinishOperationExecutor(executor_.Start( 1627 &filters_->stack_->data_.client_initial_metadata, 1628 std::move(filters_->client_initial_metadata_), filters_->call_data_)); 1629 } 1630 1631 private: 1632 filters_detail::PipeState& state() { 1633 return filters_->client_initial_metadata_state_; 1634 } 1635 1636 Poll<ValueOrFailure<ClientMetadataHandle>> FinishOperationExecutor( 1637 Poll<filters_detail::ResultOr<ClientMetadataHandle>> p) { 1638 auto* r = p.value_if_ready(); 1639 if (r == nullptr) return Pending{}; 1640 GPR_DEBUG_ASSERT(!executor_.IsRunning()); 1641 state().AckPull(); 1642 if (r->ok != nullptr) return std::move(r->ok); 1643 filters_->PushServerTrailingMetadata(std::move(r->error)); 1644 return Failure{}; 1645 } 1646 1647 CallFilters* filters_; 1648 filters_detail::OperationExecutor<ClientMetadataHandle> executor_; 1649 }; 1650 1651 class PullServerTrailingMetadataPromise { 1652 public: 1653 explicit PullServerTrailingMetadataPromise(CallFilters* filters) 1654 : filters_(filters) {} 1655 1656 PullServerTrailingMetadataPromise( 1657 const PullServerTrailingMetadataPromise&) = delete; 1658 PullServerTrailingMetadataPromise& operator=( 1659 const PullServerTrailingMetadataPromise&) = delete; 1660 PullServerTrailingMetadataPromise( 1661 PullServerTrailingMetadataPromise&& other) noexcept 1662 : filters_(std::exchange(other.filters_, nullptr)), 1663 executor_(std::move(other.executor_)) {} 1664 PullServerTrailingMetadataPromise& operator=( 1665 PullServerTrailingMetadataPromise&&) = delete; 1666 1667 Poll<ServerMetadataHandle> operator()() { 1668 if (executor_.IsRunning()) { 1669 auto r = executor_.Step(filters_->call_data_); 1670 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) { 1671 if (r.pending()) { 1672 gpr_log(GPR_INFO, 1673 "%s PullServerTrailingMetadata[%p]: Pending(but executing)", 1674 GetContext<Activity>()->DebugTag().c_str(), filters_); 1675 } else { 1676 gpr_log(GPR_INFO, "%s PullServerTrailingMetadata[%p]: Ready: %s", 1677 GetContext<Activity>()->DebugTag().c_str(), filters_, 1678 r.value()->DebugString().c_str()); 1679 } 1680 } 1681 return r; 1682 } 1683 if (filters_->server_trailing_metadata_ == nullptr) { 1684 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) { 1685 gpr_log(GPR_INFO, 1686 "%s PullServerTrailingMetadata[%p]: Pending(not pushed)", 1687 GetContext<Activity>()->DebugTag().c_str(), filters_); 1688 } 1689 return filters_->server_trailing_metadata_waiter_.pending(); 1690 } 1691 // If no stack has been set, we can just return the result of the call 1692 if (filters_->stack_ == nullptr) { 1693 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_promise_primitives)) { 1694 gpr_log(GPR_INFO, 1695 "%s PullServerTrailingMetadata[%p]: Ready(no-stack): %s", 1696 GetContext<Activity>()->DebugTag().c_str(), filters_, 1697 filters_->server_trailing_metadata_->DebugString().c_str()); 1698 } 1699 return std::move(filters_->server_trailing_metadata_); 1700 } 1701 // Otherwise we need to process it through all the filters. 1702 return executor_.Start(&filters_->stack_->data_.server_trailing_metadata, 1703 std::move(filters_->server_trailing_metadata_), 1704 filters_->call_data_); 1705 } 1706 1707 private: 1708 CallFilters* filters_; 1709 filters_detail::InfallibleOperationExecutor<ServerMetadataHandle> executor_; 1710 }; 1711 1712 void CancelDueToFailedPipeOperation(SourceLocation but_where = {}); 1713 1714 RefCountedPtr<Stack> stack_; 1715 1716 filters_detail::PipeState client_initial_metadata_state_{ 1717 filters_detail::PipeState::StartPushed{}}; 1718 filters_detail::PipeState server_initial_metadata_state_; 1719 filters_detail::PipeState client_to_server_message_state_; 1720 filters_detail::PipeState server_to_client_message_state_; 1721 IntraActivityWaiter server_trailing_metadata_waiter_; 1722 Latch<bool> cancelled_; 1723 1724 void* call_data_; 1725 ClientMetadataHandle client_initial_metadata_; 1726 1727 // The following void*'s are pointers to a `Push` object (from above). 1728 // They are used to track the current push operation for each pipe. 1729 // It would be lovely for them to be typed pointers, but that would require 1730 // a recursive type definition since the location of this field needs to be 1731 // a template argument to the `Push` object itself. 1732 void* server_initial_metadata_push_ = nullptr; 1733 void* client_to_server_message_push_ = nullptr; 1734 void* server_to_client_message_push_ = nullptr; 1735 1736 ServerMetadataHandle server_trailing_metadata_; 1737 1738 using ServerInitialMetadataPromises = 1739 PipePromise<&CallFilters::server_initial_metadata_state_, 1740 &CallFilters::server_initial_metadata_push_, 1741 ServerMetadataHandle, 1742 &filters_detail::StackData::server_initial_metadata>; 1743 using ClientToServerMessagePromises = 1744 PipePromise<&CallFilters::client_to_server_message_state_, 1745 &CallFilters::client_to_server_message_push_, MessageHandle, 1746 &filters_detail::StackData::client_to_server_messages>; 1747 using ServerToClientMessagePromises = 1748 PipePromise<&CallFilters::server_to_client_message_state_, 1749 &CallFilters::server_to_client_message_push_, MessageHandle, 1750 &filters_detail::StackData::server_to_client_messages>; 1751 }; 1752 1753 inline auto CallFilters::PullClientInitialMetadata() { 1754 return PullClientInitialMetadataPromise(this); 1755 } 1756 1757 inline auto CallFilters::PushServerInitialMetadata(ServerMetadataHandle md) { 1758 GPR_ASSERT(md != nullptr); 1759 return [p = ServerInitialMetadataPromises::Push{ 1760 this, std::move(md)}]() mutable { return p(); }; 1761 } 1762 1763 inline auto CallFilters::PullServerInitialMetadata() { 1764 return ServerInitialMetadataPromises::PullMaybe{this}; 1765 } 1766 1767 inline auto CallFilters::PushClientToServerMessage(MessageHandle message) { 1768 GPR_ASSERT(message != nullptr); 1769 return [p = ClientToServerMessagePromises::Push{ 1770 this, std::move(message)}]() mutable { return p(); }; 1771 } 1772 1773 inline auto CallFilters::PullClientToServerMessage() { 1774 return ClientToServerMessagePromises::PullMessage{this}; 1775 } 1776 1777 inline auto CallFilters::PushServerToClientMessage(MessageHandle message) { 1778 GPR_ASSERT(message != nullptr); 1779 return [p = ServerToClientMessagePromises::Push{ 1780 this, std::move(message)}]() mutable { return p(); }; 1781 } 1782 1783 inline auto CallFilters::PullServerToClientMessage() { 1784 return ServerToClientMessagePromises::PullMessage{this}; 1785 } 1786 1787 inline auto CallFilters::PullServerTrailingMetadata() { 1788 return Map(PullServerTrailingMetadataPromise(this), 1789 [this](ServerMetadataHandle h) { 1790 cancelled_.Set(h->get(GrpcCallWasCancelled()).value_or(false)); 1791 return h; 1792 }); 1793 } 1794 1795 inline auto CallFilters::WasCancelled() { return cancelled_.Wait(); } 1796 1797 } // namespace grpc_core 1798 1799 #endif // GRPC_SRC_CORE_LIB_TRANSPORT_CALL_FILTERS_H 1800