xref: /aosp_15_r20/external/grpc-grpc/test/core/transport/promise_endpoint_test.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/transport/promise_endpoint.h"
16 
17 // IWYU pragma: no_include <sys/socket.h>
18 
19 #include <cstring>
20 #include <memory>
21 #include <string>
22 #include <tuple>
23 
24 #include "absl/functional/any_invocable.h"
25 #include "gmock/gmock.h"
26 #include "gtest/gtest.h"
27 
28 #include <grpc/event_engine/event_engine.h>
29 #include <grpc/event_engine/port.h>  // IWYU pragma: keep
30 #include <grpc/event_engine/slice_buffer.h>
31 
32 #include "src/core/lib/promise/activity.h"
33 #include "src/core/lib/promise/join.h"
34 #include "src/core/lib/promise/seq.h"
35 #include "src/core/lib/slice/slice.h"
36 #include "src/core/lib/slice/slice_buffer.h"
37 #include "src/core/lib/slice/slice_internal.h"
38 #include "test/core/promise/test_wakeup_schedulers.h"
39 
40 using testing::AtMost;
41 using testing::MockFunction;
42 using testing::Return;
43 using testing::ReturnRef;
44 using testing::Sequence;
45 using testing::StrictMock;
46 using testing::WithArg;
47 using testing::WithArgs;
48 
49 namespace grpc_core {
50 namespace testing {
51 
52 class MockEndpoint
53     : public grpc_event_engine::experimental::EventEngine::Endpoint {
54  public:
55   MOCK_METHOD(
56       bool, Read,
57       (absl::AnyInvocable<void(absl::Status)> on_read,
58        grpc_event_engine::experimental::SliceBuffer* buffer,
59        const grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs*
60            args),
61       (override));
62 
63   MOCK_METHOD(
64       bool, Write,
65       (absl::AnyInvocable<void(absl::Status)> on_writable,
66        grpc_event_engine::experimental::SliceBuffer* data,
67        const grpc_event_engine::experimental::EventEngine::Endpoint::WriteArgs*
68            args),
69       (override));
70 
71   MOCK_METHOD(
72       const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
73       GetPeerAddress, (), (const, override));
74   MOCK_METHOD(
75       const grpc_event_engine::experimental::EventEngine::ResolvedAddress&,
76       GetLocalAddress, (), (const, override));
77 };
78 
79 class MockActivity : public Activity, public Wakeable {
80  public:
81   MOCK_METHOD(void, WakeupRequested, ());
82 
ForceImmediateRepoll(WakeupMask)83   void ForceImmediateRepoll(WakeupMask /*mask*/) override { WakeupRequested(); }
Orphan()84   void Orphan() override {}
MakeOwningWaker()85   Waker MakeOwningWaker() override { return Waker(this, 0); }
MakeNonOwningWaker()86   Waker MakeNonOwningWaker() override { return Waker(this, 0); }
Wakeup(WakeupMask)87   void Wakeup(WakeupMask /*mask*/) override { WakeupRequested(); }
WakeupAsync(WakeupMask)88   void WakeupAsync(WakeupMask /*mask*/) override { WakeupRequested(); }
Drop(WakeupMask)89   void Drop(WakeupMask /*mask*/) override {}
DebugTag() const90   std::string DebugTag() const override { return "MockActivity"; }
ActivityDebugTag(WakeupMask) const91   std::string ActivityDebugTag(WakeupMask /*mask*/) const override {
92     return DebugTag();
93   }
94 
Activate()95   void Activate() {
96     if (scoped_activity_ == nullptr) {
97       scoped_activity_ = std::make_unique<ScopedActivity>(this);
98     }
99   }
100 
Deactivate()101   void Deactivate() { scoped_activity_.reset(); }
102 
103  private:
104   std::unique_ptr<ScopedActivity> scoped_activity_;
105 };
106 
107 class PromiseEndpointTest : public ::testing::Test {
108  public:
PromiseEndpointTest()109   PromiseEndpointTest()
110       : mock_endpoint_ptr_(new StrictMock<MockEndpoint>()),
111         mock_endpoint_(*mock_endpoint_ptr_),
112         promise_endpoint_(std::make_unique<PromiseEndpoint>(
113             std::unique_ptr<
114                 grpc_event_engine::experimental::EventEngine::Endpoint>(
115                 mock_endpoint_ptr_),
116             SliceBuffer())) {}
117 
118  private:
119   MockEndpoint* mock_endpoint_ptr_;
120 
121  protected:
122   MockEndpoint& mock_endpoint_;
123   std::unique_ptr<PromiseEndpoint> promise_endpoint_;
124 
125   const absl::Status kDummyErrorStatus =
126       absl::ErrnoToStatus(5566, "just an error");
127   static constexpr size_t kDummyRequestSize = 5566u;
128 };
129 
TEST_F(PromiseEndpointTest,OneReadSuccessful)130 TEST_F(PromiseEndpointTest, OneReadSuccessful) {
131   MockActivity activity;
132   const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
133   activity.Activate();
134   EXPECT_CALL(activity, WakeupRequested).Times(0);
135   EXPECT_CALL(mock_endpoint_, Read)
136       .WillOnce(WithArgs<1>(
137           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
138             // Schedule mock_endpoint to read buffer.
139             grpc_event_engine::experimental::Slice slice(
140                 grpc_slice_from_cpp_string(kBuffer));
141             buffer->Append(std::move(slice));
142             return true;
143           }));
144   auto promise = promise_endpoint_->Read(kBuffer.size());
145   auto poll = promise();
146   ASSERT_TRUE(poll.ready());
147   ASSERT_TRUE(poll.value().ok());
148   EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer);
149   activity.Deactivate();
150 }
151 
TEST_F(PromiseEndpointTest,OneReadFailed)152 TEST_F(PromiseEndpointTest, OneReadFailed) {
153   MockActivity activity;
154   activity.Activate();
155   EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1));
156   EXPECT_CALL(mock_endpoint_, Read)
157       .WillOnce(WithArgs<0>(
158           [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
159             // Mock EventEngine enpoint read fails.
160             read_callback(this->kDummyErrorStatus);
161             return false;
162           }));
163   auto promise = promise_endpoint_->Read(kDummyRequestSize);
164   auto poll = promise();
165   ASSERT_TRUE(poll.ready());
166   ASSERT_FALSE(poll.value().ok());
167   EXPECT_EQ(kDummyErrorStatus, poll.value().status());
168   activity.Deactivate();
169 }
170 
TEST_F(PromiseEndpointTest,MutipleReadsSuccessful)171 TEST_F(PromiseEndpointTest, MutipleReadsSuccessful) {
172   MockActivity activity;
173   const std::string kBuffer = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
174   activity.Activate();
175   EXPECT_CALL(activity, WakeupRequested).Times(0);
176   Sequence s;
177   EXPECT_CALL(mock_endpoint_, Read)
178       .InSequence(s)
179       .WillOnce(WithArg<1>(
180           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
181             // Schedule mock_endpoint to read buffer.
182             grpc_event_engine::experimental::Slice slice(
183                 grpc_slice_from_cpp_string(kBuffer.substr(0, 4)));
184             buffer->Append(std::move(slice));
185             return true;
186           }));
187   EXPECT_CALL(mock_endpoint_, Read)
188       .InSequence(s)
189       .WillOnce(WithArg<1>(
190           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
191             // Schedule mock_endpoint to read buffer.
192             grpc_event_engine::experimental::Slice slice(
193                 grpc_slice_from_cpp_string(kBuffer.substr(4)));
194             buffer->Append(std::move(slice));
195             return true;
196           }));
197   {
198     auto promise = promise_endpoint_->Read(4u);
199     auto poll = promise();
200     ASSERT_TRUE(poll.ready());
201     ASSERT_TRUE(poll.value().ok());
202     EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer.substr(0, 4));
203   }
204   {
205     auto promise = promise_endpoint_->Read(4u);
206     auto poll = promise();
207     ASSERT_TRUE(poll.ready());
208     ASSERT_TRUE(poll.value().ok());
209     EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer.substr(4));
210   }
211   activity.Deactivate();
212 }
213 
TEST_F(PromiseEndpointTest,OnePendingReadSuccessful)214 TEST_F(PromiseEndpointTest, OnePendingReadSuccessful) {
215   MockActivity activity;
216   const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
217   absl::AnyInvocable<void(absl::Status)> read_callback;
218   activity.Activate();
219   EXPECT_CALL(activity, WakeupRequested).Times(1);
220   EXPECT_CALL(mock_endpoint_, Read)
221       .WillOnce(WithArgs<0, 1>(
222           [&read_callback, &kBuffer](
223               absl::AnyInvocable<void(absl::Status)> on_read,
224               grpc_event_engine::experimental::SliceBuffer* buffer) {
225             read_callback = std::move(on_read);
226             // Schedule mock_endpoint to read buffer.
227             grpc_event_engine::experimental::Slice slice(
228                 grpc_slice_from_cpp_string(kBuffer));
229             buffer->Append(std::move(slice));
230             // Return false to mock EventEngine read not finish..
231             return false;
232           }));
233   auto promise = promise_endpoint_->Read(kBuffer.size());
234   EXPECT_TRUE(promise().pending());
235   // Mock EventEngine read succeeds, and promise resolves.
236   read_callback(absl::OkStatus());
237   auto poll = promise();
238   ASSERT_TRUE(poll.ready());
239   ASSERT_TRUE(poll.value().ok());
240   EXPECT_EQ(poll.value()->JoinIntoString(), kBuffer);
241   activity.Deactivate();
242 }
243 
TEST_F(PromiseEndpointTest,OnePendingReadFailed)244 TEST_F(PromiseEndpointTest, OnePendingReadFailed) {
245   MockActivity activity;
246   absl::AnyInvocable<void(absl::Status)> read_callback;
247   activity.Activate();
248   EXPECT_CALL(activity, WakeupRequested).Times(1);
249   EXPECT_CALL(mock_endpoint_, Read)
250       .WillOnce(WithArgs<0>(
251           [&read_callback](absl::AnyInvocable<void(absl::Status)> on_read) {
252             read_callback = std::move(on_read);
253             // Return false to mock EventEngine read not finish.
254             return false;
255           }));
256   auto promise = promise_endpoint_->Read(kDummyRequestSize);
257   EXPECT_TRUE(promise().pending());
258   // Mock EventEngine read fails, and promise returns error.
259   read_callback(kDummyErrorStatus);
260   auto poll = promise();
261   ASSERT_TRUE(poll.ready());
262   ASSERT_FALSE(poll.value().ok());
263   EXPECT_EQ(kDummyErrorStatus, poll.value().status());
264   activity.Deactivate();
265 }
266 
TEST_F(PromiseEndpointTest,OneReadSliceSuccessful)267 TEST_F(PromiseEndpointTest, OneReadSliceSuccessful) {
268   MockActivity activity;
269   const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
270   activity.Activate();
271   EXPECT_CALL(activity, WakeupRequested).Times(0);
272   EXPECT_CALL(mock_endpoint_, Read)
273       .WillOnce(WithArgs<1>(
274           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
275             // Schedule mock_endpoint to read buffer.
276             grpc_event_engine::experimental::Slice slice(
277                 grpc_slice_from_cpp_string(kBuffer));
278             buffer->Append(std::move(slice));
279             return true;
280           }));
281   auto promise = promise_endpoint_->ReadSlice(kBuffer.size());
282   auto poll = promise();
283   ASSERT_TRUE(poll.ready());
284   ASSERT_TRUE(poll.value().ok());
285   EXPECT_EQ(poll.value()->as_string_view(), kBuffer);
286   activity.Deactivate();
287 }
288 
TEST_F(PromiseEndpointTest,OneReadSliceFailed)289 TEST_F(PromiseEndpointTest, OneReadSliceFailed) {
290   MockActivity activity;
291   activity.Activate();
292   EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1));
293   EXPECT_CALL(mock_endpoint_, Read)
294       .WillOnce(WithArgs<0>(
295           [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
296             // Mock EventEngine enpoint read fails.
297             read_callback(this->kDummyErrorStatus);
298             return false;
299           }));
300   auto promise = promise_endpoint_->ReadSlice(kDummyRequestSize);
301   auto poll = promise();
302   ASSERT_TRUE(poll.ready());
303   ASSERT_FALSE(poll.value().ok());
304   EXPECT_EQ(kDummyErrorStatus, poll.value().status());
305   activity.Deactivate();
306 }
307 
TEST_F(PromiseEndpointTest,MutipleReadSlicesSuccessful)308 TEST_F(PromiseEndpointTest, MutipleReadSlicesSuccessful) {
309   MockActivity activity;
310   const std::string kBuffer = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
311   activity.Activate();
312   EXPECT_CALL(activity, WakeupRequested).Times(0);
313   Sequence s;
314   EXPECT_CALL(mock_endpoint_, Read)
315       .InSequence(s)
316       .WillOnce(WithArg<1>(
317           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
318             // Schedule mock_endpoint to read buffer.
319             grpc_event_engine::experimental::Slice slice(
320                 grpc_slice_from_cpp_string(kBuffer.substr(0, 4)));
321             buffer->Append(std::move(slice));
322             return true;
323           }));
324   EXPECT_CALL(mock_endpoint_, Read)
325       .InSequence(s)
326       .WillOnce(WithArg<1>(
327           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
328             // Schedule mock_endpoint to read buffer.
329             grpc_event_engine::experimental::Slice slice(
330                 grpc_slice_from_cpp_string(kBuffer.substr(4)));
331             buffer->Append(std::move(slice));
332             return true;
333           }));
334   {
335     auto promise = promise_endpoint_->ReadSlice(4u);
336     auto poll = promise();
337     ASSERT_TRUE(poll.ready());
338     ASSERT_TRUE(poll.value().ok());
339     EXPECT_EQ(poll.value()->as_string_view(), kBuffer.substr(0, 4));
340   }
341   {
342     auto promise = promise_endpoint_->ReadSlice(4u);
343     auto poll = promise();
344     ASSERT_TRUE(poll.ready());
345     ASSERT_TRUE(poll.value().ok());
346     EXPECT_EQ(poll.value()->as_string_view(), kBuffer.substr(4));
347   }
348   activity.Deactivate();
349 }
350 
TEST_F(PromiseEndpointTest,OnePendingReadSliceSuccessful)351 TEST_F(PromiseEndpointTest, OnePendingReadSliceSuccessful) {
352   MockActivity activity;
353   const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
354   absl::AnyInvocable<void(absl::Status)> read_callback;
355   activity.Activate();
356   EXPECT_CALL(activity, WakeupRequested).Times(1);
357   EXPECT_CALL(mock_endpoint_, Read)
358       .WillOnce(WithArgs<0, 1>(
359           [&read_callback, &kBuffer](
360               absl::AnyInvocable<void(absl::Status)> on_read,
361               grpc_event_engine::experimental::SliceBuffer* buffer) {
362             read_callback = std::move(on_read);
363             // Schedule mock_endpoint to read buffer.
364             grpc_event_engine::experimental::Slice slice(
365                 grpc_slice_from_cpp_string(kBuffer));
366             buffer->Append(std::move(slice));
367             // Return false to mock EventEngine read not finish..
368             return false;
369           }));
370   auto promise = promise_endpoint_->ReadSlice(kBuffer.size());
371   EXPECT_TRUE(promise().pending());
372   // Mock EventEngine read succeeds, and promise resolves.
373   read_callback(absl::OkStatus());
374   auto poll = promise();
375   ASSERT_TRUE(poll.ready());
376   ASSERT_TRUE(poll.value().ok());
377   EXPECT_EQ(poll.value()->as_string_view(), kBuffer);
378   activity.Deactivate();
379 }
380 
TEST_F(PromiseEndpointTest,OnePendingReadSliceFailed)381 TEST_F(PromiseEndpointTest, OnePendingReadSliceFailed) {
382   MockActivity activity;
383   absl::AnyInvocable<void(absl::Status)> read_callback;
384   activity.Activate();
385   EXPECT_CALL(activity, WakeupRequested).Times(1);
386   EXPECT_CALL(mock_endpoint_, Read)
387       .WillOnce(WithArgs<0>(
388           [&read_callback](absl::AnyInvocable<void(absl::Status)> on_read) {
389             read_callback = std::move(on_read);
390             // Return false to mock EventEngine read not finish.
391             return false;
392           }));
393   auto promise = promise_endpoint_->ReadSlice(kDummyRequestSize);
394   EXPECT_TRUE(promise().pending());
395   // Mock EventEngine read fails, and promise returns error.
396   read_callback(kDummyErrorStatus);
397   auto poll = promise();
398   ASSERT_TRUE(poll.ready());
399   ASSERT_FALSE(poll.value().ok());
400   EXPECT_EQ(kDummyErrorStatus, poll.value().status());
401   activity.Deactivate();
402 }
403 
TEST_F(PromiseEndpointTest,OneReadByteSuccessful)404 TEST_F(PromiseEndpointTest, OneReadByteSuccessful) {
405   MockActivity activity;
406   const std::string kBuffer = {0x01};
407   activity.Activate();
408   EXPECT_CALL(activity, WakeupRequested).Times(0);
409   EXPECT_CALL(mock_endpoint_, Read)
410       .WillOnce(WithArgs<1>(
411           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
412             // Schedule mock_endpoint to read buffer.
413             grpc_event_engine::experimental::Slice slice(
414                 grpc_slice_from_cpp_string(kBuffer));
415             buffer->Append(std::move(slice));
416             return true;
417           }));
418   auto promise = promise_endpoint_->ReadByte();
419   auto poll = promise();
420   ASSERT_TRUE(poll.ready());
421   ASSERT_TRUE(poll.value().ok());
422   EXPECT_EQ(*poll.value(), kBuffer[0]);
423   activity.Deactivate();
424 }
425 
TEST_F(PromiseEndpointTest,OneReadByteFailed)426 TEST_F(PromiseEndpointTest, OneReadByteFailed) {
427   MockActivity activity;
428   activity.Activate();
429   EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1));
430   EXPECT_CALL(mock_endpoint_, Read)
431       .WillOnce(WithArgs<0>(
432           [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
433             // Mock EventEngine enpoint read fails.
434             read_callback(this->kDummyErrorStatus);
435             return false;
436           }));
437   auto promise = promise_endpoint_->ReadByte();
438   auto poll = promise();
439   ASSERT_TRUE(poll.ready());
440   ASSERT_FALSE(poll.value().ok());
441   EXPECT_EQ(kDummyErrorStatus, poll.value().status());
442   activity.Deactivate();
443 }
444 
TEST_F(PromiseEndpointTest,MutipleReadBytesSuccessful)445 TEST_F(PromiseEndpointTest, MutipleReadBytesSuccessful) {
446   MockActivity activity;
447   const std::string kBuffer = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
448   activity.Activate();
449   EXPECT_CALL(activity, WakeupRequested).Times(0);
450   EXPECT_CALL(mock_endpoint_, Read)
451       .WillOnce(WithArg<1>(
452           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
453             // Schedule mock_endpoint to read buffer.
454             grpc_event_engine::experimental::Slice slice(
455                 grpc_slice_from_cpp_string(kBuffer));
456             buffer->Append(std::move(slice));
457             return true;
458           }));
459   for (size_t i = 0; i < kBuffer.size(); ++i) {
460     auto promise = promise_endpoint_->ReadByte();
461     auto poll = promise();
462     ASSERT_TRUE(poll.ready());
463     ASSERT_TRUE(poll.value().ok());
464     EXPECT_EQ(*poll.value(), kBuffer[i]);
465   }
466   activity.Deactivate();
467 }
468 
TEST_F(PromiseEndpointTest,OnePendingReadByteSuccessful)469 TEST_F(PromiseEndpointTest, OnePendingReadByteSuccessful) {
470   MockActivity activity;
471   const std::string kBuffer = {0x01};
472   absl::AnyInvocable<void(absl::Status)> read_callback;
473   activity.Activate();
474   EXPECT_CALL(activity, WakeupRequested).Times(1);
475   EXPECT_CALL(mock_endpoint_, Read)
476       .WillOnce(WithArgs<0, 1>(
477           [&read_callback, &kBuffer](
478               absl::AnyInvocable<void(absl::Status)> on_read,
479               grpc_event_engine::experimental::SliceBuffer* buffer) {
480             read_callback = std::move(on_read);
481             // Schedule mock_endpoint to read buffer.
482             grpc_event_engine::experimental::Slice slice(
483                 grpc_slice_from_cpp_string(kBuffer));
484             buffer->Append(std::move(slice));
485             // Return false to mock EventEngine read not finish..
486             return false;
487           }));
488   auto promise = promise_endpoint_->ReadByte();
489   ASSERT_TRUE(promise().pending());
490   // Mock EventEngine read succeeds, and promise resolves.
491   read_callback(absl::OkStatus());
492   auto poll = promise();
493   ASSERT_TRUE(poll.ready());
494   ASSERT_TRUE(poll.value().ok());
495   EXPECT_EQ(*poll.value(), kBuffer[0]);
496   activity.Deactivate();
497 }
498 
TEST_F(PromiseEndpointTest,OnePendingReadByteFailed)499 TEST_F(PromiseEndpointTest, OnePendingReadByteFailed) {
500   MockActivity activity;
501   absl::AnyInvocable<void(absl::Status)> read_callback;
502   activity.Activate();
503   EXPECT_CALL(activity, WakeupRequested).Times(1);
504   EXPECT_CALL(mock_endpoint_, Read)
505       .WillOnce(WithArgs<0>(
506           [&read_callback](absl::AnyInvocable<void(absl::Status)> on_read) {
507             read_callback = std::move(on_read);
508             // Return false to mock EventEngine read not finish.
509             return false;
510           }));
511   auto promise = promise_endpoint_->ReadByte();
512   ASSERT_TRUE(promise().pending());
513   // Mock EventEngine read fails, and promise returns error.
514   read_callback(kDummyErrorStatus);
515   auto poll = promise();
516   ASSERT_TRUE(poll.ready());
517   ASSERT_FALSE(poll.value().ok());
518   EXPECT_EQ(kDummyErrorStatus, poll.value().status());
519   activity.Deactivate();
520 }
521 
TEST_F(PromiseEndpointTest,OneWriteSuccessful)522 TEST_F(PromiseEndpointTest, OneWriteSuccessful) {
523   MockActivity activity;
524   activity.Activate();
525   EXPECT_CALL(activity, WakeupRequested).Times(0);
526   EXPECT_CALL(mock_endpoint_, Write).WillOnce(Return(true));
527   auto promise = promise_endpoint_->Write(
528       SliceBuffer(Slice::FromCopiedString("hello world")));
529   auto poll = promise();
530   ASSERT_TRUE(poll.ready());
531   EXPECT_EQ(absl::OkStatus(), poll.value());
532   activity.Deactivate();
533 }
534 
TEST_F(PromiseEndpointTest,EmptyWriteIsNoOp)535 TEST_F(PromiseEndpointTest, EmptyWriteIsNoOp) {
536   MockActivity activity;
537   activity.Activate();
538   EXPECT_CALL(activity, WakeupRequested).Times(0);
539   EXPECT_CALL(mock_endpoint_, Write).Times(0);
540   auto promise = promise_endpoint_->Write(SliceBuffer());
541   auto poll = promise();
542   ASSERT_TRUE(poll.ready());
543   EXPECT_EQ(absl::OkStatus(), poll.value());
544   activity.Deactivate();
545 }
546 
TEST_F(PromiseEndpointTest,OneWriteFailed)547 TEST_F(PromiseEndpointTest, OneWriteFailed) {
548   MockActivity activity;
549   activity.Activate();
550   EXPECT_CALL(activity, WakeupRequested).Times(AtMost(1));
551   EXPECT_CALL(mock_endpoint_, Write)
552       .WillOnce(
553           WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
554             on_write(this->kDummyErrorStatus);
555             return false;
556           }));
557   auto promise = promise_endpoint_->Write(
558       SliceBuffer(Slice::FromCopiedString("hello world")));
559   auto poll = promise();
560   ASSERT_TRUE(poll.ready());
561   EXPECT_EQ(kDummyErrorStatus, poll.value());
562   activity.Deactivate();
563 }
564 
TEST_F(PromiseEndpointTest,OnePendingWriteSuccessful)565 TEST_F(PromiseEndpointTest, OnePendingWriteSuccessful) {
566   MockActivity activity;
567   absl::AnyInvocable<void(absl::Status)> write_callback;
568   activity.Activate();
569   EXPECT_CALL(activity, WakeupRequested).Times(1);
570   EXPECT_CALL(mock_endpoint_, Write)
571       .WillOnce(WithArgs<0, 1>(
572           [&write_callback](
573               absl::AnyInvocable<void(absl::Status)> on_write,
574               grpc_event_engine::experimental::SliceBuffer* buffer) {
575             write_callback = std::move(on_write);
576             // Schedule mock_endpoint to write buffer.
577             buffer->Append(grpc_event_engine::experimental::Slice());
578             // Return false to mock EventEngine write pending..
579             return false;
580           }));
581   auto promise = promise_endpoint_->Write(
582       SliceBuffer(Slice::FromCopiedString("hello world")));
583   EXPECT_TRUE(promise().pending());
584   // Mock EventEngine write succeeds, and promise resolves.
585   write_callback(absl::OkStatus());
586   auto poll = promise();
587   ASSERT_TRUE(poll.ready());
588   EXPECT_EQ(absl::OkStatus(), poll.value());
589   activity.Deactivate();
590 }
591 
TEST_F(PromiseEndpointTest,OnePendingWriteFailed)592 TEST_F(PromiseEndpointTest, OnePendingWriteFailed) {
593   MockActivity activity;
594   absl::AnyInvocable<void(absl::Status)> write_callback;
595   activity.Activate();
596   EXPECT_CALL(activity, WakeupRequested).Times(1);
597   EXPECT_CALL(mock_endpoint_, Write)
598       .WillOnce(WithArg<0>(
599           [&write_callback](absl::AnyInvocable<void(absl::Status)> on_write) {
600             write_callback = std::move(on_write);
601             // Return false to mock EventEngine write pending..
602             return false;
603           }));
604   auto promise = promise_endpoint_->Write(
605       SliceBuffer(Slice::FromCopiedString("hello world")));
606   EXPECT_TRUE(promise().pending());
607   write_callback(kDummyErrorStatus);
608   auto poll = promise();
609   ASSERT_TRUE(poll.ready());
610   EXPECT_EQ(kDummyErrorStatus, poll.value());
611   activity.Deactivate();
612 }
613 
TEST_F(PromiseEndpointTest,GetPeerAddress)614 TEST_F(PromiseEndpointTest, GetPeerAddress) {
615   const char raw_test_address[] = {0x55, 0x66, 0x01, 0x55, 0x66, 0x01};
616   grpc_event_engine::experimental::EventEngine::ResolvedAddress test_address(
617       reinterpret_cast<const sockaddr*>(raw_test_address),
618       sizeof(raw_test_address));
619   EXPECT_CALL(mock_endpoint_, GetPeerAddress).WillOnce(ReturnRef(test_address));
620   auto peer_address = promise_endpoint_->GetPeerAddress();
621   EXPECT_EQ(0, std::memcmp(test_address.address(), test_address.address(),
622                            test_address.size()));
623   EXPECT_EQ(test_address.size(), peer_address.size());
624 }
625 
TEST_F(PromiseEndpointTest,GetLocalAddress)626 TEST_F(PromiseEndpointTest, GetLocalAddress) {
627   const char raw_test_address[] = {0x52, 0x55, 0x66, 0x52, 0x55, 0x66};
628   grpc_event_engine::experimental::EventEngine::ResolvedAddress test_address(
629       reinterpret_cast<const sockaddr*>(raw_test_address),
630       sizeof(raw_test_address));
631   EXPECT_CALL(mock_endpoint_, GetLocalAddress)
632       .WillOnce(ReturnRef(test_address));
633   auto local_address = promise_endpoint_->GetLocalAddress();
634   EXPECT_EQ(0, std::memcmp(test_address.address(), local_address.address(),
635                            test_address.size()));
636   EXPECT_EQ(test_address.size(), local_address.size());
637 }
638 
TEST_F(PromiseEndpointTest,DestroyedBeforeReadCompletes)639 TEST_F(PromiseEndpointTest, DestroyedBeforeReadCompletes) {
640   MockActivity activity;
641   const std::string kBuffer = {0x01};
642   absl::AnyInvocable<void(absl::Status)> read_callback;
643   activity.Activate();
644   EXPECT_CALL(activity, WakeupRequested).Times(1);
645   EXPECT_CALL(mock_endpoint_, Read)
646       .WillOnce(WithArgs<0, 1>(
647           [&read_callback, &kBuffer](
648               absl::AnyInvocable<void(absl::Status)> on_read,
649               grpc_event_engine::experimental::SliceBuffer* buffer) {
650             read_callback = std::move(on_read);
651             // Schedule mock_endpoint to read buffer.
652             grpc_event_engine::experimental::Slice slice(
653                 grpc_slice_from_cpp_string(kBuffer));
654             buffer->Append(std::move(slice));
655             // Return false to mock EventEngine read not finish..
656             return false;
657           }));
658   auto promise = promise_endpoint_->ReadByte();
659   ASSERT_TRUE(promise().pending());
660   promise_endpoint_.reset();
661   // Mock EventEngine read succeeds, and promise resolves.
662   read_callback(absl::OkStatus());
663   auto poll = promise();
664   ASSERT_TRUE(poll.ready());
665   ASSERT_TRUE(poll.value().ok());
666   EXPECT_EQ(*poll.value(), kBuffer[0]);
667   activity.Deactivate();
668 }
669 
670 class MultiplePromiseEndpointTest : public ::testing::Test {
671  public:
MultiplePromiseEndpointTest()672   MultiplePromiseEndpointTest()
673       : first_mock_endpoint_ptr_(new StrictMock<MockEndpoint>()),
674         second_mock_endpoint_ptr_(new StrictMock<MockEndpoint>()),
675         first_mock_endpoint_(*first_mock_endpoint_ptr_),
676         second_mock_endpoint_(*second_mock_endpoint_ptr_),
677         first_promise_endpoint_(
678             std::unique_ptr<
679                 grpc_event_engine::experimental::EventEngine::Endpoint>(
680                 first_mock_endpoint_ptr_),
681             SliceBuffer()),
682         second_promise_endpoint_(
683             std::unique_ptr<
684                 grpc_event_engine::experimental::EventEngine::Endpoint>(
685                 second_mock_endpoint_ptr_),
686             SliceBuffer()) {}
687 
688  private:
689   MockEndpoint* first_mock_endpoint_ptr_;
690   MockEndpoint* second_mock_endpoint_ptr_;
691 
692  protected:
693   MockEndpoint& first_mock_endpoint_;
694   MockEndpoint& second_mock_endpoint_;
695   PromiseEndpoint first_promise_endpoint_;
696   PromiseEndpoint second_promise_endpoint_;
697 
698   const absl::Status kDummyErrorStatus =
699       absl::ErrnoToStatus(5566, "just an error");
700   static constexpr size_t kDummyRequestSize = 5566u;
701 };
702 
TEST_F(MultiplePromiseEndpointTest,JoinReadsSuccessful)703 TEST_F(MultiplePromiseEndpointTest, JoinReadsSuccessful) {
704   const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
705   EXPECT_CALL(first_mock_endpoint_, Read)
706       .WillOnce(WithArgs<1>(
707           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
708             // Schedule mock_endpoint to read buffer.
709             grpc_event_engine::experimental::Slice slice(
710                 grpc_slice_from_cpp_string(kBuffer));
711             buffer->Append(std::move(slice));
712             return true;
713           }));
714   EXPECT_CALL(second_mock_endpoint_, Read)
715       .WillOnce(WithArgs<1>(
716           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
717             // Schedule mock_endpoint to read buffer.
718             grpc_event_engine::experimental::Slice slice(
719                 grpc_slice_from_cpp_string(kBuffer));
720             buffer->Append(std::move(slice));
721             return true;
722           }));
723   StrictMock<MockFunction<void(absl::Status)>> on_done;
724   EXPECT_CALL(on_done, Call(absl::OkStatus()));
725   auto activity = MakeActivity(
726       [this, &kBuffer] {
727         return Seq(Join(this->first_promise_endpoint_.Read(kBuffer.size()),
728                         this->second_promise_endpoint_.Read(kBuffer.size())),
729                    [](std::tuple<absl::StatusOr<SliceBuffer>,
730                                  absl::StatusOr<SliceBuffer>>
731                           ret) {
732                      // Both reads finish with `absl::OkStatus`.
733                      EXPECT_TRUE(std::get<0>(ret).ok());
734                      EXPECT_TRUE(std::get<1>(ret).ok());
735                      return absl::OkStatus();
736                    });
737       },
738       InlineWakeupScheduler(),
739       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
740 }
741 
TEST_F(MultiplePromiseEndpointTest,JoinOneReadSuccessfulOneReadFailed)742 TEST_F(MultiplePromiseEndpointTest, JoinOneReadSuccessfulOneReadFailed) {
743   const std::string kBuffer = {0x01, 0x02, 0x03, 0x04};
744   EXPECT_CALL(first_mock_endpoint_, Read)
745       .WillOnce(WithArgs<1>(
746           [&kBuffer](grpc_event_engine::experimental::SliceBuffer* buffer) {
747             // Schedule mock_endpoint to read buffer.
748             grpc_event_engine::experimental::Slice slice(
749                 grpc_slice_from_cpp_string(kBuffer));
750             buffer->Append(std::move(slice));
751             return true;
752           }));
753   EXPECT_CALL(second_mock_endpoint_, Read)
754       .WillOnce(WithArgs<0>(
755           [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
756             // Mock EventEngine enpoint read fails.
757             read_callback(this->kDummyErrorStatus);
758             return false;
759           }));
760   StrictMock<MockFunction<void(absl::Status)>> on_done;
761   EXPECT_CALL(on_done, Call(kDummyErrorStatus));
762   auto activity = MakeActivity(
763       [this, &kBuffer] {
764         return Seq(
765             Join(this->first_promise_endpoint_.Read(kBuffer.size()),
766                  this->second_promise_endpoint_.Read(this->kDummyRequestSize)),
767             [this](std::tuple<absl::StatusOr<SliceBuffer>,
768                               absl::StatusOr<SliceBuffer>>
769                        ret) {
770               // One read finishes with `absl::OkStatus` and the other read
771               // fails.
772               EXPECT_TRUE(std::get<0>(ret).ok());
773               EXPECT_FALSE(std::get<1>(ret).ok());
774               EXPECT_EQ(std::get<1>(ret).status(), this->kDummyErrorStatus);
775               return this->kDummyErrorStatus;
776             });
777       },
778       InlineWakeupScheduler(),
779       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
780 }
781 
TEST_F(MultiplePromiseEndpointTest,JoinReadsFailed)782 TEST_F(MultiplePromiseEndpointTest, JoinReadsFailed) {
783   EXPECT_CALL(first_mock_endpoint_, Read)
784       .WillOnce(WithArgs<0>(
785           [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
786             // Mock EventEngine enpoint read fails.
787             read_callback(this->kDummyErrorStatus);
788             return false;
789           }));
790   EXPECT_CALL(second_mock_endpoint_, Read)
791       .WillOnce(WithArgs<0>(
792           [this](absl::AnyInvocable<void(absl::Status)> read_callback) {
793             // Mock EventEngine enpoint read fails.
794             read_callback(this->kDummyErrorStatus);
795             return false;
796           }));
797   StrictMock<MockFunction<void(absl::Status)>> on_done;
798   EXPECT_CALL(on_done, Call(kDummyErrorStatus));
799   auto activity = MakeActivity(
800       [this] {
801         return Seq(
802             Join(this->first_promise_endpoint_.Read(this->kDummyRequestSize),
803                  this->second_promise_endpoint_.Read(this->kDummyRequestSize)),
804             [this](std::tuple<absl::StatusOr<SliceBuffer>,
805                               absl::StatusOr<SliceBuffer>>
806                        ret) {
807               // Both reads finish with errors.
808               EXPECT_FALSE(std::get<0>(ret).ok());
809               EXPECT_FALSE(std::get<1>(ret).ok());
810               EXPECT_EQ(std::get<0>(ret).status(), this->kDummyErrorStatus);
811               EXPECT_EQ(std::get<1>(ret).status(), this->kDummyErrorStatus);
812               return this->kDummyErrorStatus;
813             });
814       },
815       InlineWakeupScheduler(),
816       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
817 }
818 
TEST_F(MultiplePromiseEndpointTest,JoinWritesSuccessful)819 TEST_F(MultiplePromiseEndpointTest, JoinWritesSuccessful) {
820   EXPECT_CALL(first_mock_endpoint_, Write).WillOnce(Return(true));
821   EXPECT_CALL(second_mock_endpoint_, Write).WillOnce(Return(true));
822   StrictMock<MockFunction<void(absl::Status)>> on_done;
823   EXPECT_CALL(on_done, Call(absl::OkStatus()));
824   auto activity = MakeActivity(
825       [this] {
826         return Seq(Join(this->first_promise_endpoint_.Write(SliceBuffer(
827                             Slice::FromCopiedString("hello world"))),
828                         this->second_promise_endpoint_.Write(SliceBuffer(
829                             Slice::FromCopiedString("hello world")))),
830                    [](std::tuple<absl::Status, absl::Status> ret) {
831                      // Both writes finish with `absl::OkStatus`.
832                      EXPECT_TRUE(std::get<0>(ret).ok());
833                      EXPECT_TRUE(std::get<1>(ret).ok());
834                      return absl::OkStatus();
835                    });
836       },
837       InlineWakeupScheduler(),
838       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
839 }
840 
TEST_F(MultiplePromiseEndpointTest,JoinOneWriteSuccessfulOneWriteFailed)841 TEST_F(MultiplePromiseEndpointTest, JoinOneWriteSuccessfulOneWriteFailed) {
842   EXPECT_CALL(first_mock_endpoint_, Write).WillOnce(Return(true));
843   EXPECT_CALL(second_mock_endpoint_, Write)
844       .WillOnce(
845           WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
846             on_write(this->kDummyErrorStatus);
847             return false;
848           }));
849   StrictMock<MockFunction<void(absl::Status)>> on_done;
850   EXPECT_CALL(on_done, Call(kDummyErrorStatus));
851   auto activity = MakeActivity(
852       [this] {
853         return Seq(Join(this->first_promise_endpoint_.Write(SliceBuffer(
854                             Slice::FromCopiedString("hello world"))),
855                         this->second_promise_endpoint_.Write(SliceBuffer(
856                             Slice::FromCopiedString("hello world")))),
857                    [this](std::tuple<absl::Status, absl::Status> ret) {
858                      // One write finish with `absl::OkStatus` and the other
859                      // write fails.
860                      EXPECT_TRUE(std::get<0>(ret).ok());
861                      EXPECT_FALSE(std::get<1>(ret).ok());
862                      EXPECT_EQ(std::get<1>(ret), this->kDummyErrorStatus);
863                      return this->kDummyErrorStatus;
864                    });
865       },
866       InlineWakeupScheduler(),
867       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
868 }
869 
TEST_F(MultiplePromiseEndpointTest,JoinWritesFailed)870 TEST_F(MultiplePromiseEndpointTest, JoinWritesFailed) {
871   EXPECT_CALL(first_mock_endpoint_, Write)
872       .WillOnce(
873           WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
874             on_write(this->kDummyErrorStatus);
875             return false;
876           }));
877   EXPECT_CALL(second_mock_endpoint_, Write)
878       .WillOnce(
879           WithArgs<0>([this](absl::AnyInvocable<void(absl::Status)> on_write) {
880             on_write(this->kDummyErrorStatus);
881             return false;
882           }));
883   StrictMock<MockFunction<void(absl::Status)>> on_done;
884   EXPECT_CALL(on_done, Call(kDummyErrorStatus));
885   auto activity = MakeActivity(
886       [this] {
887         return Seq(Join(this->first_promise_endpoint_.Write(SliceBuffer(
888                             Slice::FromCopiedString("hello world"))),
889                         this->second_promise_endpoint_.Write(SliceBuffer(
890                             Slice::FromCopiedString("hello world")))),
891                    [this](std::tuple<absl::Status, absl::Status> ret) {
892                      // Both writes fail with errors.
893                      EXPECT_FALSE(std::get<0>(ret).ok());
894                      EXPECT_FALSE(std::get<1>(ret).ok());
895                      EXPECT_EQ(std::get<0>(ret), this->kDummyErrorStatus);
896                      EXPECT_EQ(std::get<1>(ret), this->kDummyErrorStatus);
897                      return this->kDummyErrorStatus;
898                    });
899       },
900       InlineWakeupScheduler(),
901       [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
902 }
903 
904 }  // namespace testing
905 }  // namespace grpc_core
906 
main(int argc,char ** argv)907 int main(int argc, char** argv) {
908   ::testing::InitGoogleTest(&argc, argv);
909   return RUN_ALL_TESTS();
910 }
911