1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/transport/promise_endpoint.h"
16
17 // IWYU pragma: no_include <sys/socket.h>
18
19 #include <cstring>
20 #include <memory>
21 #include <string>
22 #include <tuple>
23
24 #include "absl/functional/any_invocable.h"
25 #include "gmock/gmock.h"
26 #include "gtest/gtest.h"
27
28 #include <grpc/event_engine/event_engine.h>
29 #include <grpc/event_engine/port.h> // IWYU pragma: keep
30 #include <grpc/event_engine/slice_buffer.h>
31
32 #include "src/core/lib/promise/activity.h"
33 #include "src/core/lib/promise/join.h"
34 #include "src/core/lib/promise/seq.h"
35 #include "src/core/lib/slice/slice.h"
36 #include "src/core/lib/slice/slice_buffer.h"
37 #include "src/core/lib/slice/slice_internal.h"
38 #include "test/core/promise/test_wakeup_schedulers.h"
39
40 using testing::AtMost;
41 using testing::MockFunction;
42 using testing::Return;
43 using testing::ReturnRef;
44 using testing::Sequence;
45 using testing::StrictMock;
46 using testing::WithArg;
47 using testing::WithArgs;
48
49 namespace grpc_core {
50 namespace testing {
51
52 class MockEndpoint
53 : public grpc_event_engine::experimental::EventEngine::Endpoint {
54 public:
55 MOCK_METHOD(
56 bool, Read,
57 (absl::AnyInvocable<void(absl::Status)> on_read,
58 grpc_event_engine::experimental::SliceBuffer* buffer,
59 const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs*
60 args),
61 (override));
62
63 MOCK_METHOD(
64 bool, Write,
65 (absl::AnyInvocable<void(absl::Status)> on_writable,
66 grpc_event_engine::experimental::SliceBuffer* data,
67 const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs*
68 args),
69 (override));
70
71 MOCK_METHOD(
72 const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
73 GetPeerAddress, (), (const, override));
74 MOCK_METHOD(
75 const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
76 GetLocalAddress, (), (const, override));
77 };
78
79 class MockActivity : public Activity, public Wakeable {
80 public:
81 MOCK_METHOD(void, WakeupRequested, ());
82
ForceImmediateRepoll(WakeupMask)83 void ForceImmediateRepoll(WakeupMask /*mask*/) override { WakeupRequested(); }
Orphan()84 void Orphan() override {}
MakeOwningWaker()85 Waker MakeOwningWaker() override { return Waker(this, 0); }
MakeNonOwningWaker()86 Waker MakeNonOwningWaker() override { return Waker(this, 0); }
Wakeup(WakeupMask)87 void Wakeup(WakeupMask /*mask*/) override { WakeupRequested(); }
WakeupAsync(WakeupMask)88 void WakeupAsync(WakeupMask /*mask*/) override { WakeupRequested(); }
Drop(WakeupMask)89 void Drop(WakeupMask /*mask*/) override {}
DebugTag() const90 std::string DebugTag() const override { return "MockActivity"; }
ActivityDebugTag(WakeupMask) const91 std::string ActivityDebugTag(WakeupMask /*mask*/) const override {
92 return DebugTag();
93 }
94
Activate()95 void Activate() {
96 if (scoped_activity_ == nullptr) {
97 scoped_activity_ = std::make_unique<ScopedActivity>(this);
98 }
99 }
100
Deactivate()101 void Deactivate() { scoped_activity_.reset(); }
102
103 private:
104 std::unique_ptr<ScopedActivity> scoped_activity_;
105 };
106
107 class PromiseEndpointTest : public ::testing::Test {
108 public:
PromiseEndpointTest()109 PromiseEndpointTest()
110 : mock_endpoint_ptr_(new StrictMock<MockEndpoint>()),
111 mock_endpoint_(*mock_endpoint_ptr_),
112 promise_endpoint_(std::make_unique<PromiseEndpoint>(
113 std::unique_ptr<
114 grpc_event_engine::experimental::EventEngine::Endpoint>(
115 mock_endpoint_ptr_),
116 SliceBuffer())) {}
117
118 private:
119 MockEndpoint* mock_endpoint_ptr_;
120
121 protected:
122 MockEndpoint& mock_endpoint_;
123 std::unique_ptr<PromiseEndpoint> promise_endpoint_;
124
125 const absl::Status kDummyErrorStatus =
126 absl::ErrnoToStatus(5566, "just an error");
127 static constexpr size_t kDummyRequestSize = 5566u;
128 };
129
TEST_F(PromiseEndpointTest,OneReadSuccessful)130 TEST_F(PromiseEndpointTest, OneReadSuccessful) {
131 MockActivity activity;
132 const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
133 activity.Activate();
134 EXPECT_CALL(activity, WakeupRequested).Times(0);
135 EXPECT_CALL(mock_endpoint_, Read)
136 .WillOnce(WithArgs<1>(
137 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
138 // Schedule mock_endpoint to read buffer.
139 grpc_event_engine::experimental::Slice slice(
140 grpc_slice_from_cpp_string(kBuffer));
141 buffer->Append(std::move(slice));
142 return true;
143 }));
144 auto promise = promise_endpoint_->Read(kBuffer.size());
145 auto poll = promise();
146 ASSERT_TRUE(poll.ready());
147 ASSERT_TRUE(poll.value().ok());
148 EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer);
149 activity.Deactivate();
150 }
151
TEST_F(PromiseEndpointTest,OneReadFailed)152 TEST_F(PromiseEndpointTest, OneReadFailed) {
153 MockActivity activity;
154 activity.Activate();
155 EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1));
156 EXPECT_CALL(mock_endpoint_, Read)
157 .WillOnce(WithArgs<0>(
158 [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
159 // Mock EventEngine enpoint read fails.
160 read_callback(this->kDummyErrorStatus);
161 return false;
162 }));
163 auto promise = promise_endpoint_->Read(kDummyRequestSize);
164 auto poll = promise();
165 ASSERT_TRUE(poll.ready());
166 ASSERT_FALSE(poll.value().ok());
167 EXPECT_EQ(kDummyErrorStatus, poll.value().status());
168 activity.Deactivate();
169 }
170
TEST_F(PromiseEndpointTest,MutipleReadsSuccessful)171 TEST_F(PromiseEndpointTest, MutipleReadsSuccessful) {
172 MockActivity activity;
173 const std::string kBuffer = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
174 activity.Activate();
175 EXPECT_CALL(activity, WakeupRequested).Times(0);
176 Sequence s;
177 EXPECT_CALL(mock_endpoint_, Read)
178 .InSequence(s)
179 .WillOnce(WithArg<1>(
180 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
181 // Schedule mock_endpoint to read buffer.
182 grpc_event_engine::experimental::Slice slice(
183 grpc_slice_from_cpp_string(kBuffer.substr(0, 4)));
184 buffer->Append(std::move(slice));
185 return true;
186 }));
187 EXPECT_CALL(mock_endpoint_, Read)
188 .InSequence(s)
189 .WillOnce(WithArg<1>(
190 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
191 // Schedule mock_endpoint to read buffer.
192 grpc_event_engine::experimental::Slice slice(
193 grpc_slice_from_cpp_string(kBuffer.substr(4)));
194 buffer->Append(std::move(slice));
195 return true;
196 }));
197 {
198 auto promise = promise_endpoint_->Read(4u);
199 auto poll = promise();
200 ASSERT_TRUE(poll.ready());
201 ASSERT_TRUE(poll.value().ok());
202 EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer.substr(0, 4));
203 }
204 {
205 auto promise = promise_endpoint_->Read(4u);
206 auto poll = promise();
207 ASSERT_TRUE(poll.ready());
208 ASSERT_TRUE(poll.value().ok());
209 EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer.substr(4));
210 }
211 activity.Deactivate();
212 }
213
TEST_F(PromiseEndpointTest,OnePendingReadSuccessful)214 TEST_F(PromiseEndpointTest, OnePendingReadSuccessful) {
215 MockActivity activity;
216 const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
217 absl::AnyInvocable<void(absl::Status)> read_callback;
218 activity.Activate();
219 EXPECT_CALL(activity, WakeupRequested).Times(1);
220 EXPECT_CALL(mock_endpoint_, Read)
221 .WillOnce(WithArgs<0, 1>(
222 [&read_callback, &kBuffer](
223 absl::AnyInvocable<void(absl::Status)> on_read,
224 grpc_event_engine::experimental::SliceBuffer* buffer) {
225 read_callback = std::move(on_read);
226 // Schedule mock_endpoint to read buffer.
227 grpc_event_engine::experimental::Slice slice(
228 grpc_slice_from_cpp_string(kBuffer));
229 buffer->Append(std::move(slice));
230 // Return false to mock EventEngine read not finish..
231 return false;
232 }));
233 auto promise = promise_endpoint_->Read(kBuffer.size());
234 EXPECT_TRUE(promise().pending());
235 // Mock EventEngine read succeeds, and promise resolves.
236 read_callback(absl::OkStatus());
237 auto poll = promise();
238 ASSERT_TRUE(poll.ready());
239 ASSERT_TRUE(poll.value().ok());
240 EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer);
241 activity.Deactivate();
242 }
243
TEST_F(PromiseEndpointTest,OnePendingReadFailed)244 TEST_F(PromiseEndpointTest, OnePendingReadFailed) {
245 MockActivity activity;
246 absl::AnyInvocable<void(absl::Status)> read_callback;
247 activity.Activate();
248 EXPECT_CALL(activity, WakeupRequested).Times(1);
249 EXPECT_CALL(mock_endpoint_, Read)
250 .WillOnce(WithArgs<0>(
251 [&read_callback](absl::AnyInvocable<void(absl::Status)> on_read) {
252 read_callback = std::move(on_read);
253 // Return false to mock EventEngine read not finish.
254 return false;
255 }));
256 auto promise = promise_endpoint_->Read(kDummyRequestSize);
257 EXPECT_TRUE(promise().pending());
258 // Mock EventEngine read fails, and promise returns error.
259 read_callback(kDummyErrorStatus);
260 auto poll = promise();
261 ASSERT_TRUE(poll.ready());
262 ASSERT_FALSE(poll.value().ok());
263 EXPECT_EQ(kDummyErrorStatus, poll.value().status());
264 activity.Deactivate();
265 }
266
TEST_F(PromiseEndpointTest,OneReadSliceSuccessful)267 TEST_F(PromiseEndpointTest, OneReadSliceSuccessful) {
268 MockActivity activity;
269 const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
270 activity.Activate();
271 EXPECT_CALL(activity, WakeupRequested).Times(0);
272 EXPECT_CALL(mock_endpoint_, Read)
273 .WillOnce(WithArgs<1>(
274 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
275 // Schedule mock_endpoint to read buffer.
276 grpc_event_engine::experimental::Slice slice(
277 grpc_slice_from_cpp_string(kBuffer));
278 buffer->Append(std::move(slice));
279 return true;
280 }));
281 auto promise = promise_endpoint_->ReadSlice(kBuffer.size());
282 auto poll = promise();
283 ASSERT_TRUE(poll.ready());
284 ASSERT_TRUE(poll.value().ok());
285 EXPECT_EQ(poll.value()->as_string_view(), kBuffer);
286 activity.Deactivate();
287 }
288
TEST_F(PromiseEndpointTest,OneReadSliceFailed)289 TEST_F(PromiseEndpointTest, OneReadSliceFailed) {
290 MockActivity activity;
291 activity.Activate();
292 EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1));
293 EXPECT_CALL(mock_endpoint_, Read)
294 .WillOnce(WithArgs<0>(
295 [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
296 // Mock EventEngine enpoint read fails.
297 read_callback(this->kDummyErrorStatus);
298 return false;
299 }));
300 auto promise = promise_endpoint_->ReadSlice(kDummyRequestSize);
301 auto poll = promise();
302 ASSERT_TRUE(poll.ready());
303 ASSERT_FALSE(poll.value().ok());
304 EXPECT_EQ(kDummyErrorStatus, poll.value().status());
305 activity.Deactivate();
306 }
307
TEST_F(PromiseEndpointTest,MutipleReadSlicesSuccessful)308 TEST_F(PromiseEndpointTest, MutipleReadSlicesSuccessful) {
309 MockActivity activity;
310 const std::string kBuffer = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
311 activity.Activate();
312 EXPECT_CALL(activity, WakeupRequested).Times(0);
313 Sequence s;
314 EXPECT_CALL(mock_endpoint_, Read)
315 .InSequence(s)
316 .WillOnce(WithArg<1>(
317 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
318 // Schedule mock_endpoint to read buffer.
319 grpc_event_engine::experimental::Slice slice(
320 grpc_slice_from_cpp_string(kBuffer.substr(0, 4)));
321 buffer->Append(std::move(slice));
322 return true;
323 }));
324 EXPECT_CALL(mock_endpoint_, Read)
325 .InSequence(s)
326 .WillOnce(WithArg<1>(
327 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
328 // Schedule mock_endpoint to read buffer.
329 grpc_event_engine::experimental::Slice slice(
330 grpc_slice_from_cpp_string(kBuffer.substr(4)));
331 buffer->Append(std::move(slice));
332 return true;
333 }));
334 {
335 auto promise = promise_endpoint_->ReadSlice(4u);
336 auto poll = promise();
337 ASSERT_TRUE(poll.ready());
338 ASSERT_TRUE(poll.value().ok());
339 EXPECT_EQ(poll.value()->as_string_view(), kBuffer.substr(0, 4));
340 }
341 {
342 auto promise = promise_endpoint_->ReadSlice(4u);
343 auto poll = promise();
344 ASSERT_TRUE(poll.ready());
345 ASSERT_TRUE(poll.value().ok());
346 EXPECT_EQ(poll.value()->as_string_view(), kBuffer.substr(4));
347 }
348 activity.Deactivate();
349 }
350
TEST_F(PromiseEndpointTest,OnePendingReadSliceSuccessful)351 TEST_F(PromiseEndpointTest, OnePendingReadSliceSuccessful) {
352 MockActivity activity;
353 const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
354 absl::AnyInvocable<void(absl::Status)> read_callback;
355 activity.Activate();
356 EXPECT_CALL(activity, WakeupRequested).Times(1);
357 EXPECT_CALL(mock_endpoint_, Read)
358 .WillOnce(WithArgs<0, 1>(
359 [&read_callback, &kBuffer](
360 absl::AnyInvocable<void(absl::Status)> on_read,
361 grpc_event_engine::experimental::SliceBuffer* buffer) {
362 read_callback = std::move(on_read);
363 // Schedule mock_endpoint to read buffer.
364 grpc_event_engine::experimental::Slice slice(
365 grpc_slice_from_cpp_string(kBuffer));
366 buffer->Append(std::move(slice));
367 // Return false to mock EventEngine read not finish..
368 return false;
369 }));
370 auto promise = promise_endpoint_->ReadSlice(kBuffer.size());
371 EXPECT_TRUE(promise().pending());
372 // Mock EventEngine read succeeds, and promise resolves.
373 read_callback(absl::OkStatus());
374 auto poll = promise();
375 ASSERT_TRUE(poll.ready());
376 ASSERT_TRUE(poll.value().ok());
377 EXPECT_EQ(poll.value()->as_string_view(), kBuffer);
378 activity.Deactivate();
379 }
380
TEST_F(PromiseEndpointTest,OnePendingReadSliceFailed)381 TEST_F(PromiseEndpointTest, OnePendingReadSliceFailed) {
382 MockActivity activity;
383 absl::AnyInvocable<void(absl::Status)> read_callback;
384 activity.Activate();
385 EXPECT_CALL(activity, WakeupRequested).Times(1);
386 EXPECT_CALL(mock_endpoint_, Read)
387 .WillOnce(WithArgs<0>(
388 [&read_callback](absl::AnyInvocable<void(absl::Status)> on_read) {
389 read_callback = std::move(on_read);
390 // Return false to mock EventEngine read not finish.
391 return false;
392 }));
393 auto promise = promise_endpoint_->ReadSlice(kDummyRequestSize);
394 EXPECT_TRUE(promise().pending());
395 // Mock EventEngine read fails, and promise returns error.
396 read_callback(kDummyErrorStatus);
397 auto poll = promise();
398 ASSERT_TRUE(poll.ready());
399 ASSERT_FALSE(poll.value().ok());
400 EXPECT_EQ(kDummyErrorStatus, poll.value().status());
401 activity.Deactivate();
402 }
403
TEST_F(PromiseEndpointTest,OneReadByteSuccessful)404 TEST_F(PromiseEndpointTest, OneReadByteSuccessful) {
405 MockActivity activity;
406 const std::string kBuffer = {0x01};
407 activity.Activate();
408 EXPECT_CALL(activity, WakeupRequested).Times(0);
409 EXPECT_CALL(mock_endpoint_, Read)
410 .WillOnce(WithArgs<1>(
411 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
412 // Schedule mock_endpoint to read buffer.
413 grpc_event_engine::experimental::Slice slice(
414 grpc_slice_from_cpp_string(kBuffer));
415 buffer->Append(std::move(slice));
416 return true;
417 }));
418 auto promise = promise_endpoint_->ReadByte();
419 auto poll = promise();
420 ASSERT_TRUE(poll.ready());
421 ASSERT_TRUE(poll.value().ok());
422 EXPECT_EQ(*poll.value(), kBuffer[0]);
423 activity.Deactivate();
424 }
425
TEST_F(PromiseEndpointTest,OneReadByteFailed)426 TEST_F(PromiseEndpointTest, OneReadByteFailed) {
427 MockActivity activity;
428 activity.Activate();
429 EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1));
430 EXPECT_CALL(mock_endpoint_, Read)
431 .WillOnce(WithArgs<0>(
432 [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
433 // Mock EventEngine enpoint read fails.
434 read_callback(this->kDummyErrorStatus);
435 return false;
436 }));
437 auto promise = promise_endpoint_->ReadByte();
438 auto poll = promise();
439 ASSERT_TRUE(poll.ready());
440 ASSERT_FALSE(poll.value().ok());
441 EXPECT_EQ(kDummyErrorStatus, poll.value().status());
442 activity.Deactivate();
443 }
444
TEST_F(PromiseEndpointTest,MutipleReadBytesSuccessful)445 TEST_F(PromiseEndpointTest, MutipleReadBytesSuccessful) {
446 MockActivity activity;
447 const std::string kBuffer = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
448 activity.Activate();
449 EXPECT_CALL(activity, WakeupRequested).Times(0);
450 EXPECT_CALL(mock_endpoint_, Read)
451 .WillOnce(WithArg<1>(
452 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
453 // Schedule mock_endpoint to read buffer.
454 grpc_event_engine::experimental::Slice slice(
455 grpc_slice_from_cpp_string(kBuffer));
456 buffer->Append(std::move(slice));
457 return true;
458 }));
459 for (size_t i = 0; i < kBuffer.size(); ++i) {
460 auto promise = promise_endpoint_->ReadByte();
461 auto poll = promise();
462 ASSERT_TRUE(poll.ready());
463 ASSERT_TRUE(poll.value().ok());
464 EXPECT_EQ(*poll.value(), kBuffer[i]);
465 }
466 activity.Deactivate();
467 }
468
TEST_F(PromiseEndpointTest,OnePendingReadByteSuccessful)469 TEST_F(PromiseEndpointTest, OnePendingReadByteSuccessful) {
470 MockActivity activity;
471 const std::string kBuffer = {0x01};
472 absl::AnyInvocable<void(absl::Status)> read_callback;
473 activity.Activate();
474 EXPECT_CALL(activity, WakeupRequested).Times(1);
475 EXPECT_CALL(mock_endpoint_, Read)
476 .WillOnce(WithArgs<0, 1>(
477 [&read_callback, &kBuffer](
478 absl::AnyInvocable<void(absl::Status)> on_read,
479 grpc_event_engine::experimental::SliceBuffer* buffer) {
480 read_callback = std::move(on_read);
481 // Schedule mock_endpoint to read buffer.
482 grpc_event_engine::experimental::Slice slice(
483 grpc_slice_from_cpp_string(kBuffer));
484 buffer->Append(std::move(slice));
485 // Return false to mock EventEngine read not finish..
486 return false;
487 }));
488 auto promise = promise_endpoint_->ReadByte();
489 ASSERT_TRUE(promise().pending());
490 // Mock EventEngine read succeeds, and promise resolves.
491 read_callback(absl::OkStatus());
492 auto poll = promise();
493 ASSERT_TRUE(poll.ready());
494 ASSERT_TRUE(poll.value().ok());
495 EXPECT_EQ(*poll.value(), kBuffer[0]);
496 activity.Deactivate();
497 }
498
TEST_F(PromiseEndpointTest,OnePendingReadByteFailed)499 TEST_F(PromiseEndpointTest, OnePendingReadByteFailed) {
500 MockActivity activity;
501 absl::AnyInvocable<void(absl::Status)> read_callback;
502 activity.Activate();
503 EXPECT_CALL(activity, WakeupRequested).Times(1);
504 EXPECT_CALL(mock_endpoint_, Read)
505 .WillOnce(WithArgs<0>(
506 [&read_callback](absl::AnyInvocable<void(absl::Status)> on_read) {
507 read_callback = std::move(on_read);
508 // Return false to mock EventEngine read not finish.
509 return false;
510 }));
511 auto promise = promise_endpoint_->ReadByte();
512 ASSERT_TRUE(promise().pending());
513 // Mock EventEngine read fails, and promise returns error.
514 read_callback(kDummyErrorStatus);
515 auto poll = promise();
516 ASSERT_TRUE(poll.ready());
517 ASSERT_FALSE(poll.value().ok());
518 EXPECT_EQ(kDummyErrorStatus, poll.value().status());
519 activity.Deactivate();
520 }
521
TEST_F(PromiseEndpointTest,OneWriteSuccessful)522 TEST_F(PromiseEndpointTest, OneWriteSuccessful) {
523 MockActivity activity;
524 activity.Activate();
525 EXPECT_CALL(activity, WakeupRequested).Times(0);
526 EXPECT_CALL(mock_endpoint_, Write).WillOnce(Return(true));
527 auto promise = promise_endpoint_->Write(
528 SliceBuffer(Slice::FromCopiedString("hello world")));
529 auto poll = promise();
530 ASSERT_TRUE(poll.ready());
531 EXPECT_EQ(absl::OkStatus(), poll.value());
532 activity.Deactivate();
533 }
534
TEST_F(PromiseEndpointTest,EmptyWriteIsNoOp)535 TEST_F(PromiseEndpointTest, EmptyWriteIsNoOp) {
536 MockActivity activity;
537 activity.Activate();
538 EXPECT_CALL(activity, WakeupRequested).Times(0);
539 EXPECT_CALL(mock_endpoint_, Write).Times(0);
540 auto promise = promise_endpoint_->Write(SliceBuffer());
541 auto poll = promise();
542 ASSERT_TRUE(poll.ready());
543 EXPECT_EQ(absl::OkStatus(), poll.value());
544 activity.Deactivate();
545 }
546
TEST_F(PromiseEndpointTest,OneWriteFailed)547 TEST_F(PromiseEndpointTest, OneWriteFailed) {
548 MockActivity activity;
549 activity.Activate();
550 EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1));
551 EXPECT_CALL(mock_endpoint_, Write)
552 .WillOnce(
553 WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
554 on_write(this->kDummyErrorStatus);
555 return false;
556 }));
557 auto promise = promise_endpoint_->Write(
558 SliceBuffer(Slice::FromCopiedString("hello world")));
559 auto poll = promise();
560 ASSERT_TRUE(poll.ready());
561 EXPECT_EQ(kDummyErrorStatus, poll.value());
562 activity.Deactivate();
563 }
564
TEST_F(PromiseEndpointTest,OnePendingWriteSuccessful)565 TEST_F(PromiseEndpointTest, OnePendingWriteSuccessful) {
566 MockActivity activity;
567 absl::AnyInvocable<void(absl::Status)> write_callback;
568 activity.Activate();
569 EXPECT_CALL(activity, WakeupRequested).Times(1);
570 EXPECT_CALL(mock_endpoint_, Write)
571 .WillOnce(WithArgs<0, 1>(
572 [&write_callback](
573 absl::AnyInvocable<void(absl::Status)> on_write,
574 grpc_event_engine::experimental::SliceBuffer* buffer) {
575 write_callback = std::move(on_write);
576 // Schedule mock_endpoint to write buffer.
577 buffer->Append(grpc_event_engine::experimental::Slice());
578 // Return false to mock EventEngine write pending..
579 return false;
580 }));
581 auto promise = promise_endpoint_->Write(
582 SliceBuffer(Slice::FromCopiedString("hello world")));
583 EXPECT_TRUE(promise().pending());
584 // Mock EventEngine write succeeds, and promise resolves.
585 write_callback(absl::OkStatus());
586 auto poll = promise();
587 ASSERT_TRUE(poll.ready());
588 EXPECT_EQ(absl::OkStatus(), poll.value());
589 activity.Deactivate();
590 }
591
TEST_F(PromiseEndpointTest,OnePendingWriteFailed)592 TEST_F(PromiseEndpointTest, OnePendingWriteFailed) {
593 MockActivity activity;
594 absl::AnyInvocable<void(absl::Status)> write_callback;
595 activity.Activate();
596 EXPECT_CALL(activity, WakeupRequested).Times(1);
597 EXPECT_CALL(mock_endpoint_, Write)
598 .WillOnce(WithArg<0>(
599 [&write_callback](absl::AnyInvocable<void(absl::Status)> on_write) {
600 write_callback = std::move(on_write);
601 // Return false to mock EventEngine write pending..
602 return false;
603 }));
604 auto promise = promise_endpoint_->Write(
605 SliceBuffer(Slice::FromCopiedString("hello world")));
606 EXPECT_TRUE(promise().pending());
607 write_callback(kDummyErrorStatus);
608 auto poll = promise();
609 ASSERT_TRUE(poll.ready());
610 EXPECT_EQ(kDummyErrorStatus, poll.value());
611 activity.Deactivate();
612 }
613
TEST_F(PromiseEndpointTest,GetPeerAddress)614 TEST_F(PromiseEndpointTest, GetPeerAddress) {
615 const char raw_test_address[] = {0x55, 0x66, 0x01, 0x55, 0x66, 0x01};
616 grpc_event_engine::experimental::EventEngine::ResolvedAddress test_address(
617 reinterpret_cast<const sockaddr*>(raw_test_address),
618 sizeof(raw_test_address));
619 EXPECT_CALL(mock_endpoint_, GetPeerAddress).WillOnce(ReturnRef(test_address));
620 auto peer_address = promise_endpoint_->GetPeerAddress();
621 EXPECT_EQ(0, std::memcmp(test_address.address(), test_address.address(),
622 test_address.size()));
623 EXPECT_EQ(test_address.size(), peer_address.size());
624 }
625
TEST_F(PromiseEndpointTest,GetLocalAddress)626 TEST_F(PromiseEndpointTest, GetLocalAddress) {
627 const char raw_test_address[] = {0x52, 0x55, 0x66, 0x52, 0x55, 0x66};
628 grpc_event_engine::experimental::EventEngine::ResolvedAddress test_address(
629 reinterpret_cast<const sockaddr*>(raw_test_address),
630 sizeof(raw_test_address));
631 EXPECT_CALL(mock_endpoint_, GetLocalAddress)
632 .WillOnce(ReturnRef(test_address));
633 auto local_address = promise_endpoint_->GetLocalAddress();
634 EXPECT_EQ(0, std::memcmp(test_address.address(), local_address.address(),
635 test_address.size()));
636 EXPECT_EQ(test_address.size(), local_address.size());
637 }
638
TEST_F(PromiseEndpointTest,DestroyedBeforeReadCompletes)639 TEST_F(PromiseEndpointTest, DestroyedBeforeReadCompletes) {
640 MockActivity activity;
641 const std::string kBuffer = {0x01};
642 absl::AnyInvocable<void(absl::Status)> read_callback;
643 activity.Activate();
644 EXPECT_CALL(activity, WakeupRequested).Times(1);
645 EXPECT_CALL(mock_endpoint_, Read)
646 .WillOnce(WithArgs<0, 1>(
647 [&read_callback, &kBuffer](
648 absl::AnyInvocable<void(absl::Status)> on_read,
649 grpc_event_engine::experimental::SliceBuffer* buffer) {
650 read_callback = std::move(on_read);
651 // Schedule mock_endpoint to read buffer.
652 grpc_event_engine::experimental::Slice slice(
653 grpc_slice_from_cpp_string(kBuffer));
654 buffer->Append(std::move(slice));
655 // Return false to mock EventEngine read not finish..
656 return false;
657 }));
658 auto promise = promise_endpoint_->ReadByte();
659 ASSERT_TRUE(promise().pending());
660 promise_endpoint_.reset();
661 // Mock EventEngine read succeeds, and promise resolves.
662 read_callback(absl::OkStatus());
663 auto poll = promise();
664 ASSERT_TRUE(poll.ready());
665 ASSERT_TRUE(poll.value().ok());
666 EXPECT_EQ(*poll.value(), kBuffer[0]);
667 activity.Deactivate();
668 }
669
670 class MultiplePromiseEndpointTest : public ::testing::Test {
671 public:
MultiplePromiseEndpointTest()672 MultiplePromiseEndpointTest()
673 : first_mock_endpoint_ptr_(new StrictMock<MockEndpoint>()),
674 second_mock_endpoint_ptr_(new StrictMock<MockEndpoint>()),
675 first_mock_endpoint_(*first_mock_endpoint_ptr_),
676 second_mock_endpoint_(*second_mock_endpoint_ptr_),
677 first_promise_endpoint_(
678 std::unique_ptr<
679 grpc_event_engine::experimental::EventEngine::Endpoint>(
680 first_mock_endpoint_ptr_),
681 SliceBuffer()),
682 second_promise_endpoint_(
683 std::unique_ptr<
684 grpc_event_engine::experimental::EventEngine::Endpoint>(
685 second_mock_endpoint_ptr_),
686 SliceBuffer()) {}
687
688 private:
689 MockEndpoint* first_mock_endpoint_ptr_;
690 MockEndpoint* second_mock_endpoint_ptr_;
691
692 protected:
693 MockEndpoint& first_mock_endpoint_;
694 MockEndpoint& second_mock_endpoint_;
695 PromiseEndpoint first_promise_endpoint_;
696 PromiseEndpoint second_promise_endpoint_;
697
698 const absl::Status kDummyErrorStatus =
699 absl::ErrnoToStatus(5566, "just an error");
700 static constexpr size_t kDummyRequestSize = 5566u;
701 };
702
TEST_F(MultiplePromiseEndpointTest,JoinReadsSuccessful)703 TEST_F(MultiplePromiseEndpointTest, JoinReadsSuccessful) {
704 const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
705 EXPECT_CALL(first_mock_endpoint_, Read)
706 .WillOnce(WithArgs<1>(
707 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
708 // Schedule mock_endpoint to read buffer.
709 grpc_event_engine::experimental::Slice slice(
710 grpc_slice_from_cpp_string(kBuffer));
711 buffer->Append(std::move(slice));
712 return true;
713 }));
714 EXPECT_CALL(second_mock_endpoint_, Read)
715 .WillOnce(WithArgs<1>(
716 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
717 // Schedule mock_endpoint to read buffer.
718 grpc_event_engine::experimental::Slice slice(
719 grpc_slice_from_cpp_string(kBuffer));
720 buffer->Append(std::move(slice));
721 return true;
722 }));
723 StrictMock<MockFunction<void(absl::Status)>> on_done;
724 EXPECT_CALL(on_done, Call(absl::OkStatus()));
725 auto activity = MakeActivity(
726 [this, &kBuffer] {
727 return Seq(Join(this->first_promise_endpoint_.Read(kBuffer.size()),
728 this->second_promise_endpoint_.Read(kBuffer.size())),
729 [](std::tuple<absl::StatusOr<SliceBuffer>,
730 absl::StatusOr<SliceBuffer>>
731 ret) {
732 // Both reads finish with `absl::OkStatus`.
733 EXPECT_TRUE(std::get<0>(ret).ok());
734 EXPECT_TRUE(std::get<1>(ret).ok());
735 return absl::OkStatus();
736 });
737 },
738 InlineWakeupScheduler(),
739 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
740 }
741
TEST_F(MultiplePromiseEndpointTest,JoinOneReadSuccessfulOneReadFailed)742 TEST_F(MultiplePromiseEndpointTest, JoinOneReadSuccessfulOneReadFailed) {
743 const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
744 EXPECT_CALL(first_mock_endpoint_, Read)
745 .WillOnce(WithArgs<1>(
746 [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
747 // Schedule mock_endpoint to read buffer.
748 grpc_event_engine::experimental::Slice slice(
749 grpc_slice_from_cpp_string(kBuffer));
750 buffer->Append(std::move(slice));
751 return true;
752 }));
753 EXPECT_CALL(second_mock_endpoint_, Read)
754 .WillOnce(WithArgs<0>(
755 [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
756 // Mock EventEngine enpoint read fails.
757 read_callback(this->kDummyErrorStatus);
758 return false;
759 }));
760 StrictMock<MockFunction<void(absl::Status)>> on_done;
761 EXPECT_CALL(on_done, Call(kDummyErrorStatus));
762 auto activity = MakeActivity(
763 [this, &kBuffer] {
764 return Seq(
765 Join(this->first_promise_endpoint_.Read(kBuffer.size()),
766 this->second_promise_endpoint_.Read(this->kDummyRequestSize)),
767 [this](std::tuple<absl::StatusOr<SliceBuffer>,
768 absl::StatusOr<SliceBuffer>>
769 ret) {
770 // One read finishes with `absl::OkStatus` and the other read
771 // fails.
772 EXPECT_TRUE(std::get<0>(ret).ok());
773 EXPECT_FALSE(std::get<1>(ret).ok());
774 EXPECT_EQ(std::get<1>(ret).status(), this->kDummyErrorStatus);
775 return this->kDummyErrorStatus;
776 });
777 },
778 InlineWakeupScheduler(),
779 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
780 }
781
TEST_F(MultiplePromiseEndpointTest,JoinReadsFailed)782 TEST_F(MultiplePromiseEndpointTest, JoinReadsFailed) {
783 EXPECT_CALL(first_mock_endpoint_, Read)
784 .WillOnce(WithArgs<0>(
785 [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
786 // Mock EventEngine enpoint read fails.
787 read_callback(this->kDummyErrorStatus);
788 return false;
789 }));
790 EXPECT_CALL(second_mock_endpoint_, Read)
791 .WillOnce(WithArgs<0>(
792 [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
793 // Mock EventEngine enpoint read fails.
794 read_callback(this->kDummyErrorStatus);
795 return false;
796 }));
797 StrictMock<MockFunction<void(absl::Status)>> on_done;
798 EXPECT_CALL(on_done, Call(kDummyErrorStatus));
799 auto activity = MakeActivity(
800 [this] {
801 return Seq(
802 Join(this->first_promise_endpoint_.Read(this->kDummyRequestSize),
803 this->second_promise_endpoint_.Read(this->kDummyRequestSize)),
804 [this](std::tuple<absl::StatusOr<SliceBuffer>,
805 absl::StatusOr<SliceBuffer>>
806 ret) {
807 // Both reads finish with errors.
808 EXPECT_FALSE(std::get<0>(ret).ok());
809 EXPECT_FALSE(std::get<1>(ret).ok());
810 EXPECT_EQ(std::get<0>(ret).status(), this->kDummyErrorStatus);
811 EXPECT_EQ(std::get<1>(ret).status(), this->kDummyErrorStatus);
812 return this->kDummyErrorStatus;
813 });
814 },
815 InlineWakeupScheduler(),
816 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
817 }
818
TEST_F(MultiplePromiseEndpointTest,JoinWritesSuccessful)819 TEST_F(MultiplePromiseEndpointTest, JoinWritesSuccessful) {
820 EXPECT_CALL(first_mock_endpoint_, Write).WillOnce(Return(true));
821 EXPECT_CALL(second_mock_endpoint_, Write).WillOnce(Return(true));
822 StrictMock<MockFunction<void(absl::Status)>> on_done;
823 EXPECT_CALL(on_done, Call(absl::OkStatus()));
824 auto activity = MakeActivity(
825 [this] {
826 return Seq(Join(this->first_promise_endpoint_.Write(SliceBuffer(
827 Slice::FromCopiedString("hello world"))),
828 this->second_promise_endpoint_.Write(SliceBuffer(
829 Slice::FromCopiedString("hello world")))),
830 [](std::tuple<absl::Status, absl::Status> ret) {
831 // Both writes finish with `absl::OkStatus`.
832 EXPECT_TRUE(std::get<0>(ret).ok());
833 EXPECT_TRUE(std::get<1>(ret).ok());
834 return absl::OkStatus();
835 });
836 },
837 InlineWakeupScheduler(),
838 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
839 }
840
TEST_F(MultiplePromiseEndpointTest,JoinOneWriteSuccessfulOneWriteFailed)841 TEST_F(MultiplePromiseEndpointTest, JoinOneWriteSuccessfulOneWriteFailed) {
842 EXPECT_CALL(first_mock_endpoint_, Write).WillOnce(Return(true));
843 EXPECT_CALL(second_mock_endpoint_, Write)
844 .WillOnce(
845 WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
846 on_write(this->kDummyErrorStatus);
847 return false;
848 }));
849 StrictMock<MockFunction<void(absl::Status)>> on_done;
850 EXPECT_CALL(on_done, Call(kDummyErrorStatus));
851 auto activity = MakeActivity(
852 [this] {
853 return Seq(Join(this->first_promise_endpoint_.Write(SliceBuffer(
854 Slice::FromCopiedString("hello world"))),
855 this->second_promise_endpoint_.Write(SliceBuffer(
856 Slice::FromCopiedString("hello world")))),
857 [this](std::tuple<absl::Status, absl::Status> ret) {
858 // One write finish with `absl::OkStatus` and the other
859 // write fails.
860 EXPECT_TRUE(std::get<0>(ret).ok());
861 EXPECT_FALSE(std::get<1>(ret).ok());
862 EXPECT_EQ(std::get<1>(ret), this->kDummyErrorStatus);
863 return this->kDummyErrorStatus;
864 });
865 },
866 InlineWakeupScheduler(),
867 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
868 }
869
TEST_F(MultiplePromiseEndpointTest,JoinWritesFailed)870 TEST_F(MultiplePromiseEndpointTest, JoinWritesFailed) {
871 EXPECT_CALL(first_mock_endpoint_, Write)
872 .WillOnce(
873 WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
874 on_write(this->kDummyErrorStatus);
875 return false;
876 }));
877 EXPECT_CALL(second_mock_endpoint_, Write)
878 .WillOnce(
879 WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
880 on_write(this->kDummyErrorStatus);
881 return false;
882 }));
883 StrictMock<MockFunction<void(absl::Status)>> on_done;
884 EXPECT_CALL(on_done, Call(kDummyErrorStatus));
885 auto activity = MakeActivity(
886 [this] {
887 return Seq(Join(this->first_promise_endpoint_.Write(SliceBuffer(
888 Slice::FromCopiedString("hello world"))),
889 this->second_promise_endpoint_.Write(SliceBuffer(
890 Slice::FromCopiedString("hello world")))),
891 [this](std::tuple<absl::Status, absl::Status> ret) {
892 // Both writes fail with errors.
893 EXPECT_FALSE(std::get<0>(ret).ok());
894 EXPECT_FALSE(std::get<1>(ret).ok());
895 EXPECT_EQ(std::get<0>(ret), this->kDummyErrorStatus);
896 EXPECT_EQ(std::get<1>(ret), this->kDummyErrorStatus);
897 return this->kDummyErrorStatus;
898 });
899 },
900 InlineWakeupScheduler(),
901 [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
902 }
903
904 } // namespace testing
905 } // namespace grpc_core
906
main(int argc,char ** argv)907 int main(int argc, char** argv) {
908 ::testing::InitGoogleTest(&argc, argv);
909 return RUN_ALL_TESTS();
910 }
911