1/* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19#import <XCTest/XCTest.h> 20 21#include "src/core/lib/iomgr/port.h" 22 23#ifdef GRPC_CFSTREAM 24 25#include <limits.h> 26 27#include <netinet/in.h> 28 29#include <grpc/grpc.h> 30#include <grpc/impl/codegen/sync.h> 31#include <grpc/support/sync.h> 32 33#include "src/core/lib/address_utils/parse_address.h" 34#include "src/core/lib/address_utils/sockaddr_utils.h" 35#include "src/core/lib/event_engine/channel_args_endpoint_config.h" 36#include "src/core/lib/iomgr/endpoint.h" 37#include "src/core/lib/iomgr/resolve_address.h" 38#include "src/core/lib/iomgr/tcp_client.h" 39#include "src/core/lib/resource_quota/api.h" 40#include "test/core/util/test_config.h" 41 42#include <chrono> 43#include <future> 44 45static const int kConnectTimeout = 5; 46static const int kWriteTimeout = 5; 47static const int kReadTimeout = 5; 48 49static const int kBufferSize = 10000; 50 51static const int kRunLoopTimeout = 1; 52 53static void set_error_handle_promise(void *arg, grpc_error_handle error) { 54 std::promise<grpc_error_handle> *p = static_cast<std::promise<grpc_error_handle> *>(arg); 55 p->set_value(error); 56} 57 58static void init_event_closure(grpc_closure *closure, 59 std::promise<grpc_error_handle> *error_handle) { 60 GRPC_CLOSURE_INIT(closure, set_error_handle_promise, static_cast<void *>(error_handle), 61 grpc_schedule_on_exec_ctx); 62} 63 64static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const char *buffer, 65 size_t buffer_len) { 66 if (slices->length != buffer_len) { 67 return false; 68 } 69 70 for (int i = 0; i < slices->count; i++) { 71 grpc_slice slice = slices->slices[i]; 72 if (0 != memcmp(buffer, GRPC_SLICE_START_PTR(slice), GRPC_SLICE_LENGTH(slice))) { 73 return false; 74 } 75 buffer += GRPC_SLICE_LENGTH(slice); 76 } 77 78 return true; 79} 80 81@interface CFStreamEndpointTests : XCTestCase 82 83@end 84 85@implementation CFStreamEndpointTests { 86 grpc_endpoint *ep_; 87 int svr_fd_; 88} 89 90- (BOOL)waitForEvent:(std::future<grpc_error_handle> *)event timeout:(int)timeout { 91 grpc_core::ExecCtx::Get()->Flush(); 92 return event->wait_for(std::chrono::seconds(timeout)) != std::future_status::timeout; 93} 94 95+ (void)setUp { 96 grpc_init(); 97} 98 99+ (void)tearDown { 100 grpc_shutdown(); 101} 102 103- (void)setUp { 104 self.continueAfterFailure = NO; 105 106 // Set up CFStream connection before testing the endpoint 107 108 grpc_core::ExecCtx exec_ctx; 109 110 int svr_fd; 111 int r; 112 std::promise<grpc_error_handle> connected_promise; 113 grpc_closure done; 114 115 gpr_log(GPR_DEBUG, "test_succeeds"); 116 117 auto resolved_addr = grpc_core::StringToSockaddr("127.0.0.1:0"); 118 struct sockaddr_in *addr = reinterpret_cast<struct sockaddr_in *>(resolved_addr->addr); 119 120 /* create a phony server */ 121 svr_fd = socket(AF_INET, SOCK_STREAM, 0); 122 XCTAssertGreaterThanOrEqual(svr_fd, 0); 123 XCTAssertEqual(bind(svr_fd, (struct sockaddr *)addr, (socklen_t)resolved_addr->len), 0); 124 XCTAssertEqual(listen(svr_fd, 1), 0); 125 126 /* connect to it */ 127 XCTAssertEqual(getsockname(svr_fd, (struct sockaddr *)addr, (socklen_t *)&resolved_addr->len), 0); 128 init_event_closure(&done, &connected_promise); 129 auto args = 130 grpc_core::CoreConfiguration::Get().channel_args_preconditioning().PreconditionChannelArgs( 131 nullptr); 132 grpc_tcp_client_connect(&done, &ep_, nullptr, 133 grpc_event_engine::experimental::ChannelArgsEndpointConfig(args), 134 &*resolved_addr, grpc_core::Timestamp::InfFuture()); 135 136 /* await the connection */ 137 do { 138 resolved_addr->len = sizeof(addr); 139 r = accept(svr_fd, reinterpret_cast<struct sockaddr *>(addr), 140 reinterpret_cast<socklen_t *>(&resolved_addr->len)); 141 } while (r == -1 && errno == EINTR); 142 XCTAssertGreaterThanOrEqual(r, 0, @"connection failed with return code %@ and errno %@", @(r), 143 @(errno)); 144 svr_fd_ = r; 145 146 /* wait for the connection callback to finish */ 147 std::future<grpc_error_handle> connected_future = connected_promise.get_future(); 148 XCTAssertEqual([self waitForEvent:&connected_future timeout:kConnectTimeout], YES); 149 XCTAssertEqual(connected_future.get(), absl::OkStatus()); 150} 151 152- (void)tearDown { 153 grpc_core::ExecCtx exec_ctx; 154 close(svr_fd_); 155 grpc_endpoint_destroy(ep_); 156} 157 158- (void)testReadWrite { 159 grpc_core::ExecCtx exec_ctx; 160 161 grpc_closure read_done; 162 grpc_slice_buffer read_slices; 163 grpc_slice_buffer read_one_slice; 164 std::promise<grpc_error_handle> write_promise; 165 grpc_closure write_done; 166 grpc_slice_buffer write_slices; 167 168 grpc_slice slice; 169 char write_buffer[kBufferSize]; 170 char read_buffer[kBufferSize]; 171 size_t recv_size = 0; 172 173 grpc_slice_buffer_init(&write_slices); 174 slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize); 175 grpc_slice_buffer_add(&write_slices, slice); 176 init_event_closure(&write_done, &write_promise); 177 grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr, /*max_frame_size=*/INT_MAX); 178 179 std::future<grpc_error_handle> write_future = write_promise.get_future(); 180 XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES); 181 XCTAssertEqual(write_future.get(), absl::OkStatus()); 182 183 while (recv_size < kBufferSize) { 184 ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0); 185 XCTAssertGreaterThanOrEqual(size, 0); 186 recv_size += size; 187 } 188 189 XCTAssertEqual(recv_size, kBufferSize); 190 XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0); 191 ssize_t send_size = send(svr_fd_, read_buffer, kBufferSize, 0); 192 XCTAssertGreaterThanOrEqual(send_size, 0); 193 194 grpc_slice_buffer_init(&read_slices); 195 grpc_slice_buffer_init(&read_one_slice); 196 while (read_slices.length < kBufferSize) { 197 std::promise<grpc_error_handle> read_promise; 198 init_event_closure(&read_done, &read_promise); 199 grpc_endpoint_read(ep_, &read_one_slice, &read_done, /*urgent=*/false, 200 /*min_progress_size=*/1); 201 std::future<grpc_error_handle> read_future = read_promise.get_future(); 202 XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES); 203 XCTAssertEqual(read_future.get(), absl::OkStatus()); 204 grpc_slice_buffer_move_into(&read_one_slice, &read_slices); 205 XCTAssertLessThanOrEqual(read_slices.length, kBufferSize); 206 } 207 XCTAssertTrue(compare_slice_buffer_with_buffer(&read_slices, read_buffer, kBufferSize)); 208 209 grpc_endpoint_shutdown(ep_, absl::OkStatus()); 210 grpc_slice_buffer_reset_and_unref(&read_slices); 211 grpc_slice_buffer_reset_and_unref(&write_slices); 212 grpc_slice_buffer_reset_and_unref(&read_one_slice); 213} 214 215- (void)testShutdownBeforeRead { 216 grpc_core::ExecCtx exec_ctx; 217 218 std::promise<grpc_error_handle> read_promise; 219 grpc_closure read_done; 220 grpc_slice_buffer read_slices; 221 std::promise<grpc_error_handle> write_promise; 222 grpc_closure write_done; 223 grpc_slice_buffer write_slices; 224 225 grpc_slice slice; 226 char write_buffer[kBufferSize]; 227 char read_buffer[kBufferSize]; 228 size_t recv_size = 0; 229 230 grpc_slice_buffer_init(&read_slices); 231 init_event_closure(&read_done, &read_promise); 232 grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false, 233 /*min_progress_size=*/1); 234 235 grpc_slice_buffer_init(&write_slices); 236 slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize); 237 grpc_slice_buffer_add(&write_slices, slice); 238 init_event_closure(&write_done, &write_promise); 239 grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr, /*max_frame_size=*/INT_MAX); 240 241 std::future<grpc_error_handle> write_future = write_promise.get_future(); 242 XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES); 243 XCTAssertEqual(write_future.get(), absl::OkStatus()); 244 245 while (recv_size < kBufferSize) { 246 ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0); 247 XCTAssertGreaterThanOrEqual(size, 0); 248 recv_size += size; 249 } 250 251 XCTAssertEqual(recv_size, kBufferSize); 252 XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0); 253 254 std::future<grpc_error_handle> read_future = read_promise.get_future(); 255 XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], NO); 256 257 grpc_endpoint_shutdown(ep_, absl::OkStatus()); 258 259 grpc_core::ExecCtx::Get()->Flush(); 260 XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES); 261 XCTAssertNotEqual(read_future.get(), absl::OkStatus()); 262 263 grpc_slice_buffer_reset_and_unref(&read_slices); 264 grpc_slice_buffer_reset_and_unref(&write_slices); 265} 266 267- (void)testRemoteClosed { 268 grpc_core::ExecCtx exec_ctx; 269 270 std::promise<grpc_error_handle> read_promise; 271 grpc_closure read_done; 272 grpc_slice_buffer read_slices; 273 std::promise<grpc_error_handle> write_promise; 274 grpc_closure write_done; 275 grpc_slice_buffer write_slices; 276 277 grpc_slice slice; 278 char write_buffer[kBufferSize]; 279 char read_buffer[kBufferSize]; 280 size_t recv_size = 0; 281 282 init_event_closure(&read_done, &read_promise); 283 grpc_slice_buffer_init(&read_slices); 284 grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false, 285 /*min_progress_size=*/1); 286 287 grpc_slice_buffer_init(&write_slices); 288 slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize); 289 grpc_slice_buffer_add(&write_slices, slice); 290 291 init_event_closure(&write_done, &write_promise); 292 grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr, /*max_frame_size=*/INT_MAX); 293 294 std::future<grpc_error_handle> write_future = write_promise.get_future(); 295 XCTAssertEqual([self waitForEvent:&write_future timeout:kWriteTimeout], YES); 296 XCTAssertEqual(write_future.get(), absl::OkStatus()); 297 298 while (recv_size < kBufferSize) { 299 ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0); 300 XCTAssertGreaterThanOrEqual(size, 0); 301 recv_size += size; 302 } 303 304 XCTAssertEqual(recv_size, kBufferSize); 305 XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0); 306 307 close(svr_fd_); 308 309 std::future<grpc_error_handle> read_future = read_promise.get_future(); 310 XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES); 311 XCTAssertNotEqual(read_future.get(), absl::OkStatus()); 312 313 grpc_endpoint_shutdown(ep_, absl::OkStatus()); 314 grpc_slice_buffer_reset_and_unref(&read_slices); 315 grpc_slice_buffer_reset_and_unref(&write_slices); 316} 317 318- (void)testRemoteReset { 319 grpc_core::ExecCtx exec_ctx; 320 321 std::promise<grpc_error_handle> read_promise; 322 grpc_closure read_done; 323 grpc_slice_buffer read_slices; 324 325 init_event_closure(&read_done, &read_promise); 326 grpc_slice_buffer_init(&read_slices); 327 grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false, 328 /*min_progress_size=*/1); 329 330 struct linger so_linger; 331 so_linger.l_onoff = 1; 332 so_linger.l_linger = 0; 333 setsockopt(svr_fd_, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)); 334 335 close(svr_fd_); 336 337 std::future<grpc_error_handle> read_future = read_promise.get_future(); 338 XCTAssertEqual([self waitForEvent:&read_future timeout:kReadTimeout], YES); 339 XCTAssertNotEqual(read_future.get(), absl::OkStatus()); 340 341 grpc_endpoint_shutdown(ep_, absl::OkStatus()); 342 grpc_slice_buffer_reset_and_unref(&read_slices); 343} 344 345@end 346 347#else // GRPC_CFSTREAM 348 349// Phony test suite 350@interface CFStreamEndpointTests : XCTestCase 351@end 352 353@implementation CFStreamEndpointTests 354- (void)setUp { 355 [super setUp]; 356} 357 358- (void)tearDown { 359 [super tearDown]; 360} 361 362@end 363 364#endif // GRPC_CFSTREAM 365