1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15require 'spec_helper' 16 17include GRPC::Core::StatusCodes 18 19describe GRPC::ActiveCall do 20 ActiveCall = GRPC::ActiveCall 21 Call = GRPC::Core::Call 22 CallOps = GRPC::Core::CallOps 23 WriteFlags = GRPC::Core::WriteFlags 24 25 def ok_status 26 Struct::Status.new(OK, 'OK') 27 end 28 29 def send_and_receive_close_and_status(client_call, server_call) 30 client_call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 31 server_call.run_batch(CallOps::RECV_CLOSE_ON_SERVER => nil, 32 CallOps::SEND_STATUS_FROM_SERVER => ok_status) 33 client_call.run_batch(CallOps::RECV_STATUS_ON_CLIENT => nil) 34 end 35 36 def inner_call_of_active_call(active_call) 37 active_call.instance_variable_get(:@call) 38 end 39 40 before(:each) do 41 @pass_through = proc { |x| x } 42 host = '0.0.0.0:0' 43 @server = new_core_server_for_testing(nil) 44 server_port = @server.add_http2_port(host, :this_port_is_insecure) 45 @server.start 46 @received_rpcs_queue = Queue.new 47 @server_thread = Thread.new do 48 begin 49 received_rpc = @server.request_call 50 rescue GRPC::Core::CallError, StandardError => e 51 # enqueue the exception in this case as a way to indicate the error 52 received_rpc = e 53 end 54 @received_rpcs_queue.push(received_rpc) 55 end 56 @ch = GRPC::Core::Channel.new("0.0.0.0:#{server_port}", nil, 57 :this_channel_is_insecure) 58 end 59 60 after(:each) do 61 @server.shutdown_and_notify(deadline) 62 @server.close 63 @server_thread.join 64 end 65 66 describe 'restricted view methods' do 67 before(:each) do 68 @call = make_test_call 69 ActiveCall.client_invoke(@call) 70 @client_call = ActiveCall.new(@call, @pass_through, 71 @pass_through, deadline) 72 end 73 74 after(:each) do 75 # terminate the RPC that was started in before(:each) 76 recvd_rpc = @received_rpcs_queue.pop 77 recvd_call = recvd_rpc.call 78 recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => nil) 79 @call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) 80 send_and_receive_close_and_status(@call, recvd_call) 81 end 82 83 describe '#multi_req_view' do 84 it 'exposes a fixed subset of the ActiveCall.methods' do 85 want = %w(cancelled?, deadline, each_remote_read, metadata, \ 86 shutdown, peer, peer_cert, send_initial_metadata, \ 87 initial_metadata_sent) 88 v = @client_call.multi_req_view 89 want.each do |w| 90 expect(v.methods.include?(w)) 91 end 92 end 93 end 94 95 describe '#single_req_view' do 96 it 'exposes a fixed subset of the ActiveCall.methods' do 97 want = %w(cancelled?, deadline, metadata, shutdown, \ 98 send_initial_metadata, metadata_to_send, \ 99 merge_metadata_to_send, initial_metadata_sent) 100 v = @client_call.single_req_view 101 want.each do |w| 102 expect(v.methods.include?(w)) 103 end 104 end 105 end 106 107 describe '#interceptable' do 108 it 'exposes a fixed subset of the ActiveCall.methods' do 109 want = %w(deadline) 110 v = @client_call.interceptable 111 want.each do |w| 112 expect(v.methods.include?(w)) 113 end 114 end 115 end 116 end 117 118 describe '#remote_send' do 119 it 'allows a client to send a payload to the server', test: true do 120 call = make_test_call 121 ActiveCall.client_invoke(call) 122 client_call = ActiveCall.new(call, @pass_through, 123 @pass_through, deadline) 124 msg = 'message is a string' 125 client_call.remote_send(msg) 126 127 # check that server rpc new was received 128 recvd_rpc = @received_rpcs_queue.pop 129 expect(recvd_rpc).to_not eq nil 130 recvd_call = recvd_rpc.call 131 132 # Accept the call, and verify that the server reads the response ok. 133 server_call = ActiveCall.new(recvd_call, @pass_through, 134 @pass_through, deadline, 135 metadata_received: true, 136 started: false) 137 expect(server_call.remote_read).to eq(msg) 138 # finish the call 139 server_call.send_initial_metadata 140 call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) 141 send_and_receive_close_and_status(call, recvd_call) 142 end 143 144 it 'marshals the payload using the marshal func' do 145 call = make_test_call 146 ActiveCall.client_invoke(call) 147 marshal = proc { |x| 'marshalled:' + x } 148 client_call = ActiveCall.new(call, marshal, @pass_through, deadline) 149 msg = 'message is a string' 150 client_call.remote_send(msg) 151 152 # confirm that the message was marshalled 153 recvd_rpc = @received_rpcs_queue.pop 154 recvd_call = recvd_rpc.call 155 server_ops = { 156 CallOps::SEND_INITIAL_METADATA => nil 157 } 158 recvd_call.run_batch(server_ops) 159 server_call = ActiveCall.new(recvd_call, @pass_through, 160 @pass_through, deadline, 161 metadata_received: true) 162 expect(server_call.remote_read).to eq('marshalled:' + msg) 163 # finish the call 164 call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) 165 send_and_receive_close_and_status(call, recvd_call) 166 end 167 168 TEST_WRITE_FLAGS = [WriteFlags::BUFFER_HINT, WriteFlags::NO_COMPRESS] 169 TEST_WRITE_FLAGS.each do |f| 170 it "successfully makes calls with write_flag set to #{f}" do 171 call = make_test_call 172 ActiveCall.client_invoke(call) 173 marshal = proc { |x| 'marshalled:' + x } 174 client_call = ActiveCall.new(call, marshal, 175 @pass_through, deadline) 176 msg = 'message is a string' 177 client_call.write_flag = f 178 client_call.remote_send(msg) 179 # flush the message in case writes are set to buffered 180 call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) if f == 1 181 182 # confirm that the message was marshalled 183 recvd_rpc = @received_rpcs_queue.pop 184 recvd_call = recvd_rpc.call 185 server_ops = { 186 CallOps::SEND_INITIAL_METADATA => nil 187 } 188 recvd_call.run_batch(server_ops) 189 server_call = ActiveCall.new(recvd_call, @pass_through, 190 @pass_through, deadline, 191 metadata_received: true) 192 expect(server_call.remote_read).to eq('marshalled:' + msg) 193 # finish the call 194 server_call.send_status(OK, '', true) 195 client_call.receive_and_check_status 196 end 197 end 198 end 199 200 describe 'sending initial metadata', send_initial_metadata: true do 201 it 'sends metadata before sending a message if it hasnt been sent yet' do 202 call = make_test_call 203 @client_call = ActiveCall.new( 204 call, 205 @pass_through, 206 @pass_through, 207 deadline, 208 started: false) 209 210 metadata = { key: 'phony_val', other: 'other_val' } 211 expect(@client_call.metadata_sent).to eq(false) 212 @client_call.merge_metadata_to_send(metadata) 213 214 message = 'phony message' 215 216 expect(call).to( 217 receive(:run_batch) 218 .with( 219 hash_including( 220 CallOps::SEND_INITIAL_METADATA => metadata)).once) 221 222 expect(call).to( 223 receive(:run_batch).with(hash_including( 224 CallOps::SEND_MESSAGE => message)).once) 225 @client_call.remote_send(message) 226 227 expect(@client_call.metadata_sent).to eq(true) 228 end 229 230 it 'doesnt send metadata if it thinks its already been sent' do 231 call = make_test_call 232 233 @client_call = ActiveCall.new(call, 234 @pass_through, 235 @pass_through, 236 deadline) 237 expect(@client_call.metadata_sent).to eql(true) 238 expect(call).to( 239 receive(:run_batch).with(hash_including( 240 CallOps::SEND_INITIAL_METADATA)).never) 241 242 @client_call.remote_send('test message') 243 end 244 245 it 'sends metadata if it is explicitly sent and ok to do so' do 246 call = make_test_call 247 248 @client_call = ActiveCall.new(call, 249 @pass_through, 250 @pass_through, 251 deadline, 252 started: false) 253 254 expect(@client_call.metadata_sent).to eql(false) 255 256 metadata = { test_key: 'val' } 257 @client_call.merge_metadata_to_send(metadata) 258 expect(@client_call.metadata_to_send).to eq(metadata) 259 260 expect(call).to( 261 receive(:run_batch).with(hash_including( 262 CallOps::SEND_INITIAL_METADATA => 263 metadata)).once) 264 @client_call.send_initial_metadata 265 end 266 267 it 'explicit sending does nothing if metadata has already been sent' do 268 call = make_test_call 269 270 @client_call = ActiveCall.new(call, 271 @pass_through, 272 @pass_through, 273 deadline) 274 275 expect(@client_call.metadata_sent).to eql(true) 276 277 blk = proc do 278 @client_call.send_initial_metadata 279 end 280 281 expect { blk.call }.to_not raise_error 282 end 283 end 284 285 describe '#merge_metadata_to_send', merge_metadata_to_send: true do 286 it 'adds to existing metadata when there is existing metadata to send' do 287 call = make_test_call 288 starting_metadata = { 289 k1: 'key1_val', 290 k2: 'key2_val', 291 k3: 'key3_val' 292 } 293 294 @client_call = ActiveCall.new( 295 call, 296 @pass_through, @pass_through, 297 deadline, 298 started: false, 299 metadata_to_send: starting_metadata) 300 301 expect(@client_call.metadata_to_send).to eq(starting_metadata) 302 303 @client_call.merge_metadata_to_send( 304 k3: 'key3_new_val', 305 k4: 'key4_val') 306 307 expected_md_to_send = { 308 k1: 'key1_val', 309 k2: 'key2_val', 310 k3: 'key3_new_val', 311 k4: 'key4_val' } 312 313 expect(@client_call.metadata_to_send).to eq(expected_md_to_send) 314 315 @client_call.merge_metadata_to_send(k5: 'key5_val') 316 expected_md_to_send.merge!(k5: 'key5_val') 317 expect(@client_call.metadata_to_send).to eq(expected_md_to_send) 318 end 319 320 it 'fails when initial metadata has already been sent' do 321 call = make_test_call 322 @client_call = ActiveCall.new( 323 call, 324 @pass_through, 325 @pass_through, 326 deadline, 327 started: true) 328 329 expect(@client_call.metadata_sent).to eq(true) 330 331 blk = proc do 332 @client_call.merge_metadata_to_send(k1: 'key1_val') 333 end 334 335 expect { blk.call }.to raise_error 336 end 337 end 338 339 describe '#client_invoke' do 340 it 'sends metadata to the server when present' do 341 call = make_test_call 342 metadata = { k1: 'v1', k2: 'v2' } 343 ActiveCall.client_invoke(call, metadata) 344 recvd_rpc = @received_rpcs_queue.pop 345 recvd_call = recvd_rpc.call 346 expect(recvd_call).to_not be_nil 347 expect(recvd_rpc.metadata).to_not be_nil 348 expect(recvd_rpc.metadata['k1']).to eq('v1') 349 expect(recvd_rpc.metadata['k2']).to eq('v2') 350 # finish the call 351 recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => {}) 352 call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) 353 send_and_receive_close_and_status(call, recvd_call) 354 end 355 end 356 357 describe '#send_status', send_status: true do 358 it 'works when no metadata or messages have been sent yet' do 359 call = make_test_call 360 ActiveCall.client_invoke(call) 361 362 recvd_rpc = @received_rpcs_queue.pop 363 server_call = ActiveCall.new( 364 recvd_rpc.call, 365 @pass_through, 366 @pass_through, 367 deadline, 368 started: false) 369 370 expect(server_call.metadata_sent).to eq(false) 371 blk = proc { server_call.send_status(OK) } 372 expect { blk.call }.to_not raise_error 373 end 374 end 375 376 describe '#remote_read', remote_read: true do 377 it 'reads the response sent by a server' do 378 call = make_test_call 379 ActiveCall.client_invoke(call) 380 client_call = ActiveCall.new(call, @pass_through, 381 @pass_through, deadline) 382 msg = 'message is a string' 383 client_call.remote_send(msg) 384 server_call = expect_server_to_receive(msg) 385 server_call.remote_send('server_response') 386 expect(client_call.remote_read).to eq('server_response') 387 send_and_receive_close_and_status( 388 call, inner_call_of_active_call(server_call)) 389 end 390 391 it 'saves no metadata when the server adds no metadata' do 392 call = make_test_call 393 ActiveCall.client_invoke(call) 394 client_call = ActiveCall.new(call, @pass_through, 395 @pass_through, deadline) 396 msg = 'message is a string' 397 client_call.remote_send(msg) 398 server_call = expect_server_to_receive(msg) 399 server_call.remote_send('ignore me') 400 expect(client_call.metadata).to be_nil 401 client_call.remote_read 402 expect(client_call.metadata).to eq({}) 403 send_and_receive_close_and_status( 404 call, inner_call_of_active_call(server_call)) 405 end 406 407 it 'saves metadata add by the server' do 408 call = make_test_call 409 ActiveCall.client_invoke(call) 410 client_call = ActiveCall.new(call, @pass_through, 411 @pass_through, deadline) 412 msg = 'message is a string' 413 client_call.remote_send(msg) 414 server_call = expect_server_to_receive(msg, k1: 'v1', k2: 'v2') 415 server_call.remote_send('ignore me') 416 expect(client_call.metadata).to be_nil 417 client_call.remote_read 418 expected = { 'k1' => 'v1', 'k2' => 'v2' } 419 expect(client_call.metadata).to eq(expected) 420 send_and_receive_close_and_status( 421 call, inner_call_of_active_call(server_call)) 422 end 423 424 it 'get a status from server when nothing else sent from server' do 425 client_call = make_test_call 426 ActiveCall.client_invoke(client_call) 427 428 recvd_rpc = @received_rpcs_queue.pop 429 recvd_call = recvd_rpc.call 430 431 server_call = ActiveCall.new( 432 recvd_call, 433 @pass_through, 434 @pass_through, 435 deadline, 436 started: false) 437 438 server_call.send_status(OK, 'OK') 439 440 # Check that we can receive initial metadata and a status 441 client_call.run_batch( 442 CallOps::RECV_INITIAL_METADATA => nil) 443 batch_result = client_call.run_batch( 444 CallOps::RECV_STATUS_ON_CLIENT => nil) 445 446 expect(batch_result.status.code).to eq(OK) 447 end 448 449 it 'get a nil msg before a status when an OK status is sent' do 450 call = make_test_call 451 ActiveCall.client_invoke(call) 452 client_call = ActiveCall.new(call, @pass_through, 453 @pass_through, deadline) 454 msg = 'message is a string' 455 client_call.remote_send(msg) 456 call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 457 server_call = expect_server_to_receive(msg) 458 server_call.remote_send('server_response') 459 server_call.send_status(OK, 'OK') 460 expect(client_call.remote_read).to eq('server_response') 461 res = client_call.remote_read 462 expect(res).to be_nil 463 end 464 465 it 'unmarshals the response using the unmarshal func' do 466 call = make_test_call 467 ActiveCall.client_invoke(call) 468 unmarshal = proc { |x| 'unmarshalled:' + x } 469 client_call = ActiveCall.new(call, @pass_through, 470 unmarshal, deadline) 471 472 # confirm the client receives the unmarshalled message 473 msg = 'message is a string' 474 client_call.remote_send(msg) 475 server_call = expect_server_to_receive(msg) 476 server_call.remote_send('server_response') 477 expect(client_call.remote_read).to eq('unmarshalled:server_response') 478 send_and_receive_close_and_status( 479 call, inner_call_of_active_call(server_call)) 480 end 481 end 482 483 describe '#each_remote_read' do 484 it 'creates an Enumerator' do 485 call = make_test_call 486 client_call = ActiveCall.new(call, @pass_through, 487 @pass_through, deadline) 488 expect(client_call.each_remote_read).to be_a(Enumerator) 489 # finish the call 490 client_call.cancel 491 end 492 493 it 'the returned enumerator can read n responses' do 494 call = make_test_call 495 ActiveCall.client_invoke(call) 496 client_call = ActiveCall.new(call, @pass_through, 497 @pass_through, deadline) 498 msg = 'message is a string' 499 reply = 'server_response' 500 client_call.remote_send(msg) 501 server_call = expect_server_to_receive(msg) 502 e = client_call.each_remote_read 503 n = 3 # arbitrary value > 1 504 n.times do 505 server_call.remote_send(reply) 506 expect(e.next).to eq(reply) 507 end 508 send_and_receive_close_and_status( 509 call, inner_call_of_active_call(server_call)) 510 end 511 512 it 'the returns an enumerator that stops after an OK Status' do 513 call = make_test_call 514 ActiveCall.client_invoke(call) 515 client_call = ActiveCall.new(call, @pass_through, 516 @pass_through, deadline) 517 msg = 'message is a string' 518 reply = 'server_response' 519 client_call.remote_send(msg) 520 call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 521 server_call = expect_server_to_receive(msg) 522 e = client_call.each_remote_read 523 n = 3 # arbitrary value > 1 524 n.times do 525 server_call.remote_send(reply) 526 expect(e.next).to eq(reply) 527 end 528 server_call.send_status(OK, 'OK', true) 529 expect { e.next }.to raise_error(StopIteration) 530 end 531 end 532 533 describe '#closing the call from the client' do 534 it 'finishes ok if the server sends a status response' do 535 call = make_test_call 536 ActiveCall.client_invoke(call) 537 client_call = ActiveCall.new(call, @pass_through, 538 @pass_through, deadline) 539 msg = 'message is a string' 540 client_call.remote_send(msg) 541 expect do 542 call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 543 end.to_not raise_error 544 server_call = expect_server_to_receive(msg) 545 server_call.remote_send('server_response') 546 expect(client_call.remote_read).to eq('server_response') 547 server_call.send_status(OK, 'status code is OK') 548 expect { client_call.receive_and_check_status }.to_not raise_error 549 end 550 551 it 'finishes ok if the server sends an early status response' do 552 call = make_test_call 553 ActiveCall.client_invoke(call) 554 client_call = ActiveCall.new(call, @pass_through, 555 @pass_through, deadline) 556 msg = 'message is a string' 557 client_call.remote_send(msg) 558 server_call = expect_server_to_receive(msg) 559 server_call.remote_send('server_response') 560 server_call.send_status(OK, 'status code is OK') 561 expect(client_call.remote_read).to eq('server_response') 562 expect do 563 call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 564 end.to_not raise_error 565 expect { client_call.receive_and_check_status }.to_not raise_error 566 end 567 568 it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do 569 call = make_test_call 570 ActiveCall.client_invoke(call) 571 client_call = ActiveCall.new(call, @pass_through, 572 @pass_through, deadline) 573 msg = 'message is a string' 574 client_call.remote_send(msg) 575 server_call = expect_server_to_receive(msg) 576 server_call.remote_send('server_response') 577 server_call.send_status(OK, 'status code is OK') 578 expect(client_call.remote_read).to eq('server_response') 579 expect do 580 call.run_batch( 581 CallOps::SEND_CLOSE_FROM_CLIENT => nil, 582 CallOps::RECV_STATUS_ON_CLIENT => nil) 583 end.to_not raise_error 584 end 585 end 586 587 # Test sending of the initial metadata in #run_server_bidi 588 # from the server handler both implicitly and explicitly. 589 describe '#run_server_bidi metadata sending tests', run_server_bidi: true do 590 before(:each) do 591 @requests = ['first message', 'second message'] 592 @server_to_client_metadata = { 'test_key' => 'test_val' } 593 @server_status = OK 594 595 @client_call = make_test_call 596 @client_call.run_batch(CallOps::SEND_INITIAL_METADATA => {}) 597 598 recvd_rpc = @received_rpcs_queue.pop 599 recvd_call = recvd_rpc.call 600 @server_call = ActiveCall.new( 601 recvd_call, 602 @pass_through, 603 @pass_through, 604 deadline, 605 metadata_received: true, 606 started: false, 607 metadata_to_send: @server_to_client_metadata) 608 end 609 610 after(:each) do 611 # Send the requests and send a close so the server can send a status 612 @requests.each do |message| 613 @client_call.run_batch(CallOps::SEND_MESSAGE => message) 614 end 615 @client_call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 616 617 @server_thread.join 618 619 # Expect that initial metadata was sent, 620 # the requests were echoed, and a status was sent 621 batch_result = @client_call.run_batch( 622 CallOps::RECV_INITIAL_METADATA => nil) 623 expect(batch_result.metadata).to eq(@server_to_client_metadata) 624 625 @requests.each do |message| 626 batch_result = @client_call.run_batch( 627 CallOps::RECV_MESSAGE => nil) 628 expect(batch_result.message).to eq(message) 629 end 630 631 batch_result = @client_call.run_batch( 632 CallOps::RECV_STATUS_ON_CLIENT => nil) 633 expect(batch_result.status.code).to eq(@server_status) 634 end 635 636 it 'sends the initial metadata implicitly if not already sent' do 637 # Server handler that doesn't have access to a "call" 638 # It echoes the requests 639 fake_gen_each_reply_with_no_call_param = proc do |msgs| 640 msgs 641 end 642 643 int_ctx = GRPC::InterceptionContext.new 644 645 @server_thread = Thread.new do 646 @server_call.run_server_bidi( 647 fake_gen_each_reply_with_no_call_param, int_ctx) 648 @server_call.send_status(@server_status) 649 end 650 end 651 652 it 'sends the metadata when sent explicitly and not already sent' do 653 # Fake server handler that has access to a "call" object and 654 # uses it to explicitly update and send the initial metadata 655 fake_gen_each_reply_with_call_param = proc do |msgs, call_param| 656 call_param.merge_metadata_to_send(@server_to_client_metadata) 657 call_param.send_initial_metadata 658 msgs 659 end 660 int_ctx = GRPC::InterceptionContext.new 661 662 @server_thread = Thread.new do 663 @server_call.run_server_bidi( 664 fake_gen_each_reply_with_call_param, int_ctx) 665 @server_call.send_status(@server_status) 666 end 667 end 668 end 669 670 def expect_server_to_receive(sent_text, **kw) 671 c = expect_server_to_be_invoked(**kw) 672 expect(c.remote_read).to eq(sent_text) 673 c 674 end 675 676 def expect_server_to_be_invoked(**kw) 677 recvd_rpc = @received_rpcs_queue.pop 678 expect(recvd_rpc).to_not eq nil 679 recvd_call = recvd_rpc.call 680 recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => kw) 681 ActiveCall.new(recvd_call, @pass_through, @pass_through, deadline, 682 metadata_received: true, started: true) 683 end 684 685 def make_test_call 686 @ch.create_call(nil, nil, '/method', nil, deadline) 687 end 688 689 def deadline 690 Time.now + 2 # in 2 seconds; arbitrary 691 end 692end 693