xref: /aosp_15_r20/external/grpc-grpc/src/objective-c/tests/CFStreamTests/CFStreamEndpointTests.mm (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1/*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#import <XCTest/XCTest.h>
20
21#include "src/core/lib/iomgr/port.h"
22
23#ifdef GRPC_CFSTREAM
24
25#include <limits.h>
26
27#include <netinet/in.h>
28
29#include <grpc/grpc.h>
30#include <grpc/impl/codegen/sync.h>
31#include <grpc/support/sync.h>
32
33#include "src/core/lib/address_utils/parse_address.h"
34#include "src/core/lib/address_utils/sockaddr_utils.h"
35#include "src/core/lib/event_engine/channel_args_endpoint_config.h"
36#include "src/core/lib/iomgr/endpoint.h"
37#include "src/core/lib/iomgr/resolve_address.h"
38#include "src/core/lib/iomgr/tcp_client.h"
39#include "src/core/lib/resource_quota/api.h"
40#include "test/core/util/test_config.h"
41
42#include <chrono>
43#include <future>
44
45static const int kConnectTimeout = 5;
46static const int kWriteTimeout = 5;
47static const int kReadTimeout = 5;
48
49static const int kBufferSize = 10000;
50
51static const int kRunLoopTimeout = 1;
52
53static void set_error_handle_promise(void *arg, grpc_error_handle error) {
54  std::promise<grpc_error_handle> *p = static_cast<std::promise<grpc_error_handle> *>(arg);
55  p->set_value(error);
56}
57
58static void init_event_closure(grpc_closure *closure,
59                               std::promise<grpc_error_handle> *error_handle) {
60  GRPC_CLOSURE_INIT(closure, set_error_handle_promise, static_cast<void *>(error_handle),
61                    grpc_schedule_on_exec_ctx);
62}
63
64static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const char *buffer,
65                                             size_t buffer_len) {
66  if (slices->length != buffer_len) {
67    return false;
68  }
69
70  for (int i = 0; i < slices->count; i++) {
71    grpc_slice slice = slices->slices[i];
72    if (0 != memcmp(buffer, GRPC_SLICE_START_PTR(slice), GRPC_SLICE_LENGTH(slice))) {
73      return false;
74    }
75    buffer += GRPC_SLICE_LENGTH(slice);
76  }
77
78  return true;
79}
80
81@interface CFStreamEndpointTests : XCTestCase
82
83@end
84
85@implementation CFStreamEndpointTests {
86  grpc_endpoint *ep_;
87  int svr_fd_;
88}
89
90- (BOOL)waitForEvent:(std::future<grpc_error_handle> *)event timeout:(int)timeout {
91  grpc_core::ExecCtx::Get()->Flush();
92  return event->wait_for(std::chrono::seconds(timeout)) != std::future_status::timeout;
93}
94
95+ (void)setUp {
96  grpc_init();
97}
98
99+ (void)tearDown {
100  grpc_shutdown();
101}
102
103- (void)setUp {
104  self.continueAfterFailure = NO;
105
106  // Set up CFStream connection before testing the endpoint
107
108  grpc_core::ExecCtx exec_ctx;
109
110  int svr_fd;
111  int r;
112  std::promise<grpc_error_handle> connected_promise;
113  grpc_closure done;
114
115  gpr_log(GPR_DEBUG, "test_succeeds");
116
117  auto resolved_addr = grpc_core::StringToSockaddr("127.0.0.1:0");
118  struct sockaddr_in *addr = reinterpret_cast<struct sockaddr_in *>(resolved_addr->addr);
119
120  /* create a phony server */
121  svr_fd = socket(AF_INET, SOCK_STREAM, 0);
122  XCTAssertGreaterThanOrEqual(svr_fd, 0);
123  XCTAssertEqual(bind(svr_fd, (struct sockaddr *)addr, (socklen_t)resolved_addr->len), 0);
124  XCTAssertEqual(listen(svr_fd, 1), 0);
125
126  /* connect to it */
127  XCTAssertEqual(getsockname(svr_fd, (struct sockaddr *)addr, (socklen_t *)&resolved_addr->len), 0);
128  init_event_closure(&done, &connected_promise);
129  auto args =
130      grpc_core::CoreConfiguration::Get().channel_args_preconditioning().PreconditionChannelArgs(
131          nullptr);
132  grpc_tcp_client_connect(&done, &ep_, nullptr,
133                          grpc_event_engine::experimental::ChannelArgsEndpointConfig(args),
134                          &*resolved_addr, grpc_core::Timestamp::InfFuture());
135
136  /* await the connection */
137  do {
138    resolved_addr->len = sizeof(addr);
139    r = accept(svr_fd, reinterpret_cast<struct sockaddr *>(addr),
140               reinterpret_cast<socklen_t *>(&resolved_addr->len));
141  } while (r == -1 && errno == EINTR);
142  XCTAssertGreaterThanOrEqual(r, 0, @"connection failed with return code %@ and errno %@", @(r),
143                              @(errno));
144  svr_fd_ = r;
145
146  /* wait for the connection callback to finish */
147  std::future<grpc_error_handle> connected_future = connected_promise.get_future();
148  XCTAssertEqual([self waitForEvent:&connected_future timeout:kConnectTimeout], YES);
149  XCTAssertEqual(connected_future.get(), absl::OkStatus());
150}
151
152- (void)tearDown {
153  grpc_core::ExecCtx exec_ctx;
154  close(svr_fd_);
155  grpc_endpoint_destroy(ep_);
156}
157
158- (void)testReadWrite {
159  grpc_core::ExecCtx exec_ctx;
160
161  grpc_closure read_done;
162  grpc_slice_buffer read_slices;
163  grpc_slice_buffer read_one_slice;
164  std::promise<grpc_error_handle> write_promise;
165  grpc_closure write_done;
166  grpc_slice_buffer write_slices;
167
168  grpc_slice slice;
169  char write_buffer[kBufferSize];
170  char read_buffer[kBufferSize];
171  size_t recv_size = 0;
172
173  grpc_slice_buffer_init(&write_slices);
174  slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
175  grpc_slice_buffer_add(&write_slices, slice);
176  init_event_closure(&write_done, &write_promise);
177  grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr, /*max_frame_size=*/INT_MAX);
178
179  std::future<grpc_error_handle> write_future = write_promise.get_future();
180  XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
181  XCTAssertEqual(write_future.get(), absl::OkStatus());
182
183  while (recv_size < kBufferSize) {
184    ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
185    XCTAssertGreaterThanOrEqual(size, 0);
186    recv_size += size;
187  }
188
189  XCTAssertEqual(recv_size, kBufferSize);
190  XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
191  ssize_t send_size = send(svr_fd_, read_buffer, kBufferSize, 0);
192  XCTAssertGreaterThanOrEqual(send_size, 0);
193
194  grpc_slice_buffer_init(&read_slices);
195  grpc_slice_buffer_init(&read_one_slice);
196  while (read_slices.length < kBufferSize) {
197    std::promise<grpc_error_handle> read_promise;
198    init_event_closure(&read_done, &read_promise);
199    grpc_endpoint_read(ep_, &read_one_slice, &read_done, /*urgent=*/false,
200                       /*min_progress_size=*/1);
201    std::future<grpc_error_handle> read_future = read_promise.get_future();
202    XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
203    XCTAssertEqual(read_future.get(), absl::OkStatus());
204    grpc_slice_buffer_move_into(&read_one_slice, &read_slices);
205    XCTAssertLessThanOrEqual(read_slices.length, kBufferSize);
206  }
207  XCTAssertTrue(compare_slice_buffer_with_buffer(&read_slices, read_buffer, kBufferSize));
208
209  grpc_endpoint_shutdown(ep_, absl::OkStatus());
210  grpc_slice_buffer_reset_and_unref(&read_slices);
211  grpc_slice_buffer_reset_and_unref(&write_slices);
212  grpc_slice_buffer_reset_and_unref(&read_one_slice);
213}
214
215- (void)testShutdownBeforeRead {
216  grpc_core::ExecCtx exec_ctx;
217
218  std::promise<grpc_error_handle> read_promise;
219  grpc_closure read_done;
220  grpc_slice_buffer read_slices;
221  std::promise<grpc_error_handle> write_promise;
222  grpc_closure write_done;
223  grpc_slice_buffer write_slices;
224
225  grpc_slice slice;
226  char write_buffer[kBufferSize];
227  char read_buffer[kBufferSize];
228  size_t recv_size = 0;
229
230  grpc_slice_buffer_init(&read_slices);
231  init_event_closure(&read_done, &read_promise);
232  grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false,
233                     /*min_progress_size=*/1);
234
235  grpc_slice_buffer_init(&write_slices);
236  slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
237  grpc_slice_buffer_add(&write_slices, slice);
238  init_event_closure(&write_done, &write_promise);
239  grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr, /*max_frame_size=*/INT_MAX);
240
241  std::future<grpc_error_handle> write_future = write_promise.get_future();
242  XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
243  XCTAssertEqual(write_future.get(), absl::OkStatus());
244
245  while (recv_size < kBufferSize) {
246    ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
247    XCTAssertGreaterThanOrEqual(size, 0);
248    recv_size += size;
249  }
250
251  XCTAssertEqual(recv_size, kBufferSize);
252  XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
253
254  std::future<grpc_error_handle> read_future = read_promise.get_future();
255  XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], NO);
256
257  grpc_endpoint_shutdown(ep_, absl::OkStatus());
258
259  grpc_core::ExecCtx::Get()->Flush();
260  XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
261  XCTAssertNotEqual(read_future.get(), absl::OkStatus());
262
263  grpc_slice_buffer_reset_and_unref(&read_slices);
264  grpc_slice_buffer_reset_and_unref(&write_slices);
265}
266
267- (void)testRemoteClosed {
268  grpc_core::ExecCtx exec_ctx;
269
270  std::promise<grpc_error_handle> read_promise;
271  grpc_closure read_done;
272  grpc_slice_buffer read_slices;
273  std::promise<grpc_error_handle> write_promise;
274  grpc_closure write_done;
275  grpc_slice_buffer write_slices;
276
277  grpc_slice slice;
278  char write_buffer[kBufferSize];
279  char read_buffer[kBufferSize];
280  size_t recv_size = 0;
281
282  init_event_closure(&read_done, &read_promise);
283  grpc_slice_buffer_init(&read_slices);
284  grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false,
285                     /*min_progress_size=*/1);
286
287  grpc_slice_buffer_init(&write_slices);
288  slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
289  grpc_slice_buffer_add(&write_slices, slice);
290
291  init_event_closure(&write_done, &write_promise);
292  grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr, /*max_frame_size=*/INT_MAX);
293
294  std::future<grpc_error_handle> write_future = write_promise.get_future();
295  XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES);
296  XCTAssertEqual(write_future.get(), absl::OkStatus());
297
298  while (recv_size < kBufferSize) {
299    ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
300    XCTAssertGreaterThanOrEqual(size, 0);
301    recv_size += size;
302  }
303
304  XCTAssertEqual(recv_size, kBufferSize);
305  XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
306
307  close(svr_fd_);
308
309  std::future<grpc_error_handle> read_future = read_promise.get_future();
310  XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
311  XCTAssertNotEqual(read_future.get(), absl::OkStatus());
312
313  grpc_endpoint_shutdown(ep_, absl::OkStatus());
314  grpc_slice_buffer_reset_and_unref(&read_slices);
315  grpc_slice_buffer_reset_and_unref(&write_slices);
316}
317
318- (void)testRemoteReset {
319  grpc_core::ExecCtx exec_ctx;
320
321  std::promise<grpc_error_handle> read_promise;
322  grpc_closure read_done;
323  grpc_slice_buffer read_slices;
324
325  init_event_closure(&read_done, &read_promise);
326  grpc_slice_buffer_init(&read_slices);
327  grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false,
328                     /*min_progress_size=*/1);
329
330  struct linger so_linger;
331  so_linger.l_onoff = 1;
332  so_linger.l_linger = 0;
333  setsockopt(svr_fd_, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
334
335  close(svr_fd_);
336
337  std::future<grpc_error_handle> read_future = read_promise.get_future();
338  XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES);
339  XCTAssertNotEqual(read_future.get(), absl::OkStatus());
340
341  grpc_endpoint_shutdown(ep_, absl::OkStatus());
342  grpc_slice_buffer_reset_and_unref(&read_slices);
343}
344
345@end
346
347#else  // GRPC_CFSTREAM
348
349// Phony test suite
350@interface CFStreamEndpointTests : XCTestCase
351@end
352
353@implementation CFStreamEndpointTests
354- (void)setUp {
355  [super setUp];
356}
357
358- (void)tearDown {
359  [super tearDown];
360}
361
362@end
363
364#endif  // GRPC_CFSTREAM
365