1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #include "src/core/lib/transport/promise_endpoint.h"
18
19 #include <atomic>
20 #include <functional>
21 #include <memory>
22 #include <utility>
23
24 #include "absl/status/status.h"
25 #include "absl/types/optional.h"
26
27 #include <grpc/event_engine/event_engine.h>
28 #include <grpc/event_engine/slice_buffer.h>
29 #include <grpc/slice_buffer.h>
30 #include <grpc/support/log.h>
31
32 #include "src/core/lib/gprpp/sync.h"
33 #include "src/core/lib/slice/slice_buffer.h"
34
35 namespace grpc_core {
36
PromiseEndpoint(std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint> endpoint,SliceBuffer already_received)37 PromiseEndpoint::PromiseEndpoint(
38 std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
39 endpoint,
40 SliceBuffer already_received)
41 : endpoint_(std::move(endpoint)) {
42 GPR_ASSERT(endpoint_ != nullptr);
43 read_state_->endpoint = endpoint_;
44 // TODO(ladynana): Replace this with `SliceBufferCast<>` when it is
45 // available.
46 grpc_slice_buffer_swap(read_state_->buffer.c_slice_buffer(),
47 already_received.c_slice_buffer());
48 }
49
50 const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetPeerAddress() const51 PromiseEndpoint::GetPeerAddress() const {
52 return endpoint_->GetPeerAddress();
53 }
54
55 const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetLocalAddress() const56 PromiseEndpoint::GetLocalAddress() const {
57 return endpoint_->GetLocalAddress();
58 }
59
Complete(absl::Status status,const size_t num_bytes_requested)60 void PromiseEndpoint::ReadState::Complete(absl::Status status,
61 const size_t num_bytes_requested) {
62 while (true) {
63 if (!status.ok()) {
64 // Invalidates all previous reads.
65 pending_buffer.Clear();
66 buffer.Clear();
67 result = status;
68 auto w = std::move(waker);
69 complete.store(true, std::memory_order_release);
70 w.Wakeup();
71 return;
72 }
73 // Appends `pending_buffer` to `buffer`.
74 pending_buffer.MoveFirstNBytesIntoSliceBuffer(pending_buffer.Length(),
75 buffer);
76 GPR_DEBUG_ASSERT(pending_buffer.Count() == 0u);
77 if (buffer.Length() < num_bytes_requested) {
78 // A further read is needed.
79 // Set read args with number of bytes needed as hint.
80 grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs
81 read_args = {
82 static_cast<int64_t>(num_bytes_requested - buffer.Length())};
83 // If `Read()` returns true immediately, the callback will not be
84 // called. We still need to call our callback to pick up the result and
85 // maybe do further reads.
86 auto ep = endpoint.lock();
87 if (ep == nullptr) {
88 status = absl::UnavailableError("Endpoint closed during read.");
89 continue;
90 }
91 if (ep->Read(
92 [self = Ref(), num_bytes_requested](absl::Status status) {
93 ApplicationCallbackExecCtx callback_exec_ctx;
94 ExecCtx exec_ctx;
95 self->Complete(std::move(status), num_bytes_requested);
96 },
97 &pending_buffer, &read_args)) {
98 continue;
99 }
100 return;
101 }
102 result = status;
103 auto w = std::move(waker);
104 complete.store(true, std::memory_order_release);
105 w.Wakeup();
106 return;
107 }
108 }
109
110 } // namespace grpc_core
111