1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15require 'forwardable' 16require 'weakref' 17require_relative 'bidi_call' 18 19class Struct 20 # BatchResult is the struct returned by calls to call#start_batch. 21 class BatchResult 22 # check_status returns the status, raising an error if the status 23 # is non-nil and not OK. 24 def check_status 25 return nil if status.nil? 26 if status.code != GRPC::Core::StatusCodes::OK 27 GRPC.logger.debug("Failing with status #{status}") 28 # raise BadStatus, propagating the metadata if present. 29 fail GRPC::BadStatus.new_status_exception( 30 status.code, status.details, status.metadata, 31 status.debug_error_string) 32 end 33 status 34 end 35 end 36end 37 38# GRPC contains the General RPC module. 39module GRPC 40 # The ActiveCall class provides simple methods for sending marshallable 41 # data to a call 42 class ActiveCall # rubocop:disable Metrics/ClassLength 43 include Core::TimeConsts 44 include Core::CallOps 45 extend Forwardable 46 attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert 47 def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=, 48 :trailing_metadata, :status 49 50 # client_invoke begins a client invocation. 51 # 52 # Flow Control note: this blocks until flow control accepts that client 53 # request can go ahead. 54 # 55 # deadline is the absolute deadline for the call. 56 # 57 # == Keyword Arguments == 58 # any keyword arguments are treated as metadata to be sent to the server 59 # if a keyword value is a list, multiple metadata for it's key are sent 60 # 61 # @param call [Call] a call on which to start and invocation 62 # @param metadata [Hash] the metadata 63 def self.client_invoke(call, metadata = {}) 64 fail(TypeError, '!Core::Call') unless call.is_a? Core::Call 65 call.run_batch(SEND_INITIAL_METADATA => metadata) 66 end 67 68 # Creates an ActiveCall. 69 # 70 # ActiveCall should only be created after a call is accepted. That 71 # means different things on a client and a server. On the client, the 72 # call is accepted after calling call.invoke. On the server, this is 73 # after call.accept. 74 # 75 # #initialize cannot determine if the call is accepted or not; so if a 76 # call that's not accepted is used here, the error won't be visible until 77 # the ActiveCall methods are called. 78 # 79 # deadline is the absolute deadline for the call. 80 # 81 # @param call [Call] the call used by the ActiveCall 82 # @param marshal [Function] f(obj)->string that marshal requests 83 # @param unmarshal [Function] f(string)->obj that unmarshals responses 84 # @param deadline [Fixnum] the deadline for the call to complete 85 # @param started [true|false] indicates that metadata was sent 86 # @param metadata_received [true|false] indicates if metadata has already 87 # been received. Should always be true for server calls 88 def initialize(call, marshal, unmarshal, deadline, started: true, 89 metadata_received: false, metadata_to_send: nil) 90 fail(TypeError, '!Core::Call') unless call.is_a? Core::Call 91 @call = call 92 @deadline = deadline 93 @marshal = marshal 94 @unmarshal = unmarshal 95 @metadata_received = metadata_received 96 @metadata_sent = started 97 @op_notifier = nil 98 99 fail(ArgumentError, 'Already sent md') if started && metadata_to_send 100 @metadata_to_send = metadata_to_send || {} unless started 101 @send_initial_md_mutex = Mutex.new 102 103 @output_stream_done = false 104 @input_stream_done = false 105 @call_finished = false 106 @call_finished_mu = Mutex.new 107 108 @client_call_executed = false 109 @client_call_executed_mu = Mutex.new 110 111 # set the peer now so that the accessor can still function 112 # after the server closes the call 113 @peer = call.peer 114 end 115 116 # Sends the initial metadata that has yet to be sent. 117 # Does nothing if metadata has already been sent for this call. 118 def send_initial_metadata(new_metadata = {}) 119 @send_initial_md_mutex.synchronize do 120 return if @metadata_sent 121 @metadata_to_send.merge!(new_metadata) 122 ActiveCall.client_invoke(@call, @metadata_to_send) 123 @metadata_sent = true 124 end 125 end 126 127 # output_metadata are provides access to hash that can be used to 128 # save metadata to be sent as trailer 129 def output_metadata 130 @output_metadata ||= {} 131 end 132 133 # cancelled indicates if the call was cancelled 134 def cancelled? 135 !@call.status.nil? && @call.status.code == Core::StatusCodes::CANCELLED 136 end 137 138 # multi_req_view provides a restricted view of this ActiveCall for use 139 # in a server client-streaming handler. 140 def multi_req_view 141 MultiReqView.new(self) 142 end 143 144 # single_req_view provides a restricted view of this ActiveCall for use in 145 # a server request-response handler. 146 def single_req_view 147 SingleReqView.new(self) 148 end 149 150 # operation provides a restricted view of this ActiveCall for use as 151 # a Operation. 152 def operation 153 @op_notifier = Notifier.new 154 Operation.new(self) 155 end 156 157 ## 158 # Returns a restricted view of this ActiveCall for use in interceptors 159 # 160 # @return [InterceptableView] 161 # 162 def interceptable 163 InterceptableView.new(self) 164 end 165 166 def receive_and_check_status 167 ops = { RECV_STATUS_ON_CLIENT => nil } 168 ops[RECV_INITIAL_METADATA] = nil unless @metadata_received 169 batch_result = @call.run_batch(ops) 170 unless @metadata_received 171 @call.metadata = batch_result.metadata 172 end 173 set_input_stream_done 174 attach_status_results_and_complete_call(batch_result) 175 ensure 176 # Ensure we don't attempt to request the initial metadata again 177 # in case an exception occurs. 178 @metadata_received = true 179 end 180 181 def attach_status_results_and_complete_call(recv_status_batch_result) 182 unless recv_status_batch_result.status.nil? 183 @call.trailing_metadata = recv_status_batch_result.status.metadata 184 end 185 @call.status = recv_status_batch_result.status 186 187 # The RECV_STATUS in run_batch always succeeds 188 # Check the status for a bad status or failed run batch 189 recv_status_batch_result.check_status 190 end 191 192 # remote_send sends a request to the remote endpoint. 193 # 194 # It blocks until the remote endpoint accepts the message. 195 # 196 # @param req [Object, String] the object to send or it's marshal form. 197 # @param marshalled [false, true] indicates if the object is already 198 # marshalled. 199 def remote_send(req, marshalled = false) 200 send_initial_metadata 201 GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}") 202 payload = marshalled ? req : @marshal.call(req) 203 @call.run_batch(SEND_MESSAGE => payload) 204 end 205 206 # send_status sends a status to the remote endpoint. 207 # 208 # @param code [int] the status code to send 209 # @param details [String] details 210 # @param assert_finished [true, false] when true(default), waits for 211 # FINISHED. 212 # @param metadata [Hash] metadata to send to the server. If a value is a 213 # list, mulitple metadata for its key are sent 214 def send_status(code = OK, details = '', assert_finished = false, 215 metadata: {}) 216 send_initial_metadata 217 ops = { 218 SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, metadata) 219 } 220 ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished 221 @call.run_batch(ops) 222 set_output_stream_done 223 224 nil 225 end 226 227 # Intended for use on server-side calls when a single request from 228 # the client is expected (i.e., unary and server-streaming RPC types). 229 def read_unary_request 230 req = remote_read 231 set_input_stream_done 232 req 233 end 234 235 def server_unary_response(req, trailing_metadata: {}, 236 code: Core::StatusCodes::OK, details: 'OK') 237 ops = {} 238 ops[SEND_MESSAGE] = @marshal.call(req) 239 ops[SEND_STATUS_FROM_SERVER] = Struct::Status.new( 240 code, details, trailing_metadata) 241 ops[RECV_CLOSE_ON_SERVER] = nil 242 243 @send_initial_md_mutex.synchronize do 244 ops[SEND_INITIAL_METADATA] = @metadata_to_send unless @metadata_sent 245 @metadata_sent = true 246 end 247 248 @call.run_batch(ops) 249 set_output_stream_done 250 end 251 252 # remote_read reads a response from the remote endpoint. 253 # 254 # It blocks until the remote endpoint replies with a message or status. 255 # On receiving a message, it returns the response after unmarshalling it. 256 # On receiving a status, it returns nil if the status is OK, otherwise 257 # raising BadStatus 258 def remote_read 259 ops = { RECV_MESSAGE => nil } 260 ops[RECV_INITIAL_METADATA] = nil unless @metadata_received 261 batch_result = @call.run_batch(ops) 262 unless @metadata_received 263 @call.metadata = batch_result.metadata 264 end 265 get_message_from_batch_result(batch_result) 266 rescue GRPC::Core::CallError => e 267 GRPC.logger.info("remote_read: #{e}") 268 nil 269 ensure 270 # Ensure we don't attempt to request the initial metadata again 271 # in case an exception occurs. 272 @metadata_received = true 273 end 274 275 def get_message_from_batch_result(recv_message_batch_result) 276 unless recv_message_batch_result.nil? || 277 recv_message_batch_result.message.nil? 278 return @unmarshal.call(recv_message_batch_result.message) 279 end 280 GRPC.logger.debug('found nil; the final response has been sent') 281 nil 282 end 283 284 # each_remote_read passes each response to the given block or returns an 285 # enumerator the responses if no block is given. 286 # Used to generate the request enumerable for 287 # server-side client-streaming RPC's. 288 # 289 # == Enumerator == 290 # 291 # * #next blocks until the remote endpoint sends a READ or FINISHED 292 # * for each read, enumerator#next yields the response 293 # * on status 294 # * if it's is OK, enumerator#next raises StopException 295 # * if is not OK, enumerator#next raises RuntimeException 296 # 297 # == Block == 298 # 299 # * if provided it is executed for each response 300 # * the call blocks until no more responses are provided 301 # 302 # @return [Enumerator] if no block was given 303 def each_remote_read 304 return enum_for(:each_remote_read) unless block_given? 305 begin 306 loop do 307 resp = remote_read 308 break if resp.nil? # the last response was received 309 yield resp 310 end 311 ensure 312 set_input_stream_done 313 end 314 end 315 316 # each_remote_read_then_finish passes each response to the given block or 317 # returns an enumerator of the responses if no block is given. 318 # 319 # It is like each_remote_read, but it blocks on finishing on detecting 320 # the final message. 321 # 322 # == Enumerator == 323 # 324 # * #next blocks until the remote endpoint sends a READ or FINISHED 325 # * for each read, enumerator#next yields the response 326 # * on status 327 # * if it's is OK, enumerator#next raises StopException 328 # * if is not OK, enumerator#next raises RuntimeException 329 # 330 # == Block == 331 # 332 # * if provided it is executed for each response 333 # * the call blocks until no more responses are provided 334 # 335 # @return [Enumerator] if no block was given 336 def each_remote_read_then_finish 337 return enum_for(:each_remote_read_then_finish) unless block_given? 338 loop do 339 resp = remote_read 340 break if resp.nil? # the last response was received 341 yield resp 342 end 343 344 receive_and_check_status 345 ensure 346 set_input_stream_done 347 end 348 349 # request_response sends a request to a GRPC server, and returns the 350 # response. 351 # 352 # @param req [Object] the request sent to the server 353 # @param metadata [Hash] metadata to be sent to the server. If a value is 354 # a list, multiple metadata for its key are sent 355 # @return [Object] the response received from the server 356 def request_response(req, metadata: {}) 357 raise_error_if_already_executed 358 ops = { 359 SEND_MESSAGE => @marshal.call(req), 360 SEND_CLOSE_FROM_CLIENT => nil, 361 RECV_INITIAL_METADATA => nil, 362 RECV_MESSAGE => nil, 363 RECV_STATUS_ON_CLIENT => nil 364 } 365 @send_initial_md_mutex.synchronize do 366 # Metadata might have already been sent if this is an operation view 367 unless @metadata_sent 368 ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata) 369 end 370 @metadata_sent = true 371 end 372 373 begin 374 batch_result = @call.run_batch(ops) 375 # no need to check for cancellation after a CallError because this 376 # batch contains a RECV_STATUS op 377 ensure 378 set_input_stream_done 379 set_output_stream_done 380 end 381 382 @call.metadata = batch_result.metadata 383 attach_status_results_and_complete_call(batch_result) 384 get_message_from_batch_result(batch_result) 385 end 386 387 # client_streamer sends a stream of requests to a GRPC server, and 388 # returns a single response. 389 # 390 # requests provides an 'iterable' of Requests. I.e. it follows Ruby's 391 # #each enumeration protocol. In the simplest case, requests will be an 392 # array of marshallable objects; in typical case it will be an Enumerable 393 # that allows dynamic construction of the marshallable objects. 394 # 395 # @param requests [Object] an Enumerable of requests to send 396 # @param metadata [Hash] metadata to be sent to the server. If a value is 397 # a list, multiple metadata for its key are sent 398 # @return [Object] the response received from the server 399 def client_streamer(requests, metadata: {}) 400 raise_error_if_already_executed 401 begin 402 send_initial_metadata(metadata) 403 requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) } 404 rescue GRPC::Core::CallError => e 405 receive_and_check_status # check for Cancelled 406 raise e 407 rescue => e 408 set_input_stream_done 409 raise e 410 ensure 411 set_output_stream_done 412 end 413 414 batch_result = @call.run_batch( 415 SEND_CLOSE_FROM_CLIENT => nil, 416 RECV_INITIAL_METADATA => nil, 417 RECV_MESSAGE => nil, 418 RECV_STATUS_ON_CLIENT => nil 419 ) 420 421 set_input_stream_done 422 423 @call.metadata = batch_result.metadata 424 attach_status_results_and_complete_call(batch_result) 425 get_message_from_batch_result(batch_result) 426 end 427 428 # server_streamer sends one request to the GRPC server, which yields a 429 # stream of responses. 430 # 431 # responses provides an enumerator over the streamed responses, i.e. it 432 # follows Ruby's #each iteration protocol. The enumerator blocks while 433 # waiting for each response, stops when the server signals that no 434 # further responses will be supplied. If the implicit block is provided, 435 # it is executed with each response as the argument and no result is 436 # returned. 437 # 438 # @param req [Object] the request sent to the server 439 # @param metadata [Hash] metadata to be sent to the server. If a value is 440 # a list, multiple metadata for its key are sent 441 # @return [Enumerator|nil] a response Enumerator 442 def server_streamer(req, metadata: {}) 443 raise_error_if_already_executed 444 ops = { 445 SEND_MESSAGE => @marshal.call(req), 446 SEND_CLOSE_FROM_CLIENT => nil 447 } 448 @send_initial_md_mutex.synchronize do 449 # Metadata might have already been sent if this is an operation view 450 unless @metadata_sent 451 ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata) 452 end 453 @metadata_sent = true 454 end 455 456 begin 457 @call.run_batch(ops) 458 rescue GRPC::Core::CallError => e 459 receive_and_check_status # checks for Cancelled 460 raise e 461 rescue => e 462 set_input_stream_done 463 raise e 464 ensure 465 set_output_stream_done 466 end 467 468 replies = enum_for(:each_remote_read_then_finish) 469 return replies unless block_given? 470 replies.each { |r| yield r } 471 end 472 473 # bidi_streamer sends a stream of requests to the GRPC server, and yields 474 # a stream of responses. 475 # 476 # This method takes an Enumerable of requests, and returns and enumerable 477 # of responses. 478 # 479 # == requests == 480 # 481 # requests provides an 'iterable' of Requests. I.e. it follows Ruby's 482 # #each enumeration protocol. In the simplest case, requests will be an 483 # array of marshallable objects; in typical case it will be an 484 # Enumerable that allows dynamic construction of the marshallable 485 # objects. 486 # 487 # == responses == 488 # 489 # This is an enumerator of responses. I.e, its #next method blocks 490 # waiting for the next response. Also, if at any point the block needs 491 # to consume all the remaining responses, this can be done using #each or 492 # #collect. Calling #each or #collect should only be done if 493 # the_call#writes_done has been called, otherwise the block will loop 494 # forever. 495 # 496 # @param requests [Object] an Enumerable of requests to send 497 # @param metadata [Hash] metadata to be sent to the server. If a value is 498 # a list, multiple metadata for its key are sent 499 # @return [Enumerator, nil] a response Enumerator 500 def bidi_streamer(requests, metadata: {}, &blk) 501 raise_error_if_already_executed 502 # Metadata might have already been sent if this is an operation view 503 begin 504 send_initial_metadata(metadata) 505 rescue GRPC::Core::CallError => e 506 batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) 507 set_input_stream_done 508 set_output_stream_done 509 attach_status_results_and_complete_call(batch_result) 510 raise e 511 rescue => e 512 set_input_stream_done 513 set_output_stream_done 514 raise e 515 end 516 517 bd = BidiCall.new(@call, 518 @marshal, 519 @unmarshal, 520 metadata_received: @metadata_received) 521 522 bd.run_on_client(requests, 523 proc { set_input_stream_done }, 524 proc { set_output_stream_done }, 525 &blk) 526 end 527 528 # run_server_bidi orchestrates a BiDi stream processing on a server. 529 # 530 # N.B. gen_each_reply is a func(Enumerable<Requests>) 531 # 532 # It takes an enumerable of requests as an arg, in case there is a 533 # relationship between the stream of requests and the stream of replies. 534 # 535 # This does not mean that must necessarily be one. E.g, the replies 536 # produced by gen_each_reply could ignore the received_msgs 537 # 538 # @param mth [Proc] generates the BiDi stream replies 539 # @param interception_ctx [InterceptionContext] 540 # 541 def run_server_bidi(mth, interception_ctx) 542 view = multi_req_view 543 bidi_call = BidiCall.new( 544 @call, 545 @marshal, 546 @unmarshal, 547 metadata_received: @metadata_received, 548 req_view: view 549 ) 550 requests = bidi_call.read_next_loop(proc { set_input_stream_done }, false) 551 interception_ctx.intercept!( 552 :bidi_streamer, 553 call: view, 554 method: mth, 555 requests: requests 556 ) do 557 bidi_call.run_on_server(mth, requests) 558 end 559 end 560 561 # Waits till an operation completes 562 def wait 563 return if @op_notifier.nil? 564 GRPC.logger.debug("active_call.wait: on #{@op_notifier}") 565 @op_notifier.wait 566 end 567 568 # Signals that an operation is done. 569 # Only relevant on the client-side (this is a no-op on the server-side) 570 def op_is_done 571 return if @op_notifier.nil? 572 @op_notifier.notify(self) 573 end 574 575 # Add to the metadata that will be sent from the server. 576 # Fails if metadata has already been sent. 577 # Unused by client calls. 578 def merge_metadata_to_send(new_metadata = {}) 579 @send_initial_md_mutex.synchronize do 580 fail('cant change metadata after already sent') if @metadata_sent 581 @metadata_to_send.merge!(new_metadata) 582 end 583 end 584 585 def attach_peer_cert(peer_cert) 586 @peer_cert = peer_cert 587 end 588 589 private 590 591 # To be called once the "input stream" has been completelly 592 # read through (i.e, done reading from client or received status) 593 # note this is idempotent 594 def set_input_stream_done 595 @call_finished_mu.synchronize do 596 @input_stream_done = true 597 maybe_finish_and_close_call_locked 598 end 599 end 600 601 # To be called once the "output stream" has been completelly 602 # sent through (i.e, done sending from client or sent status) 603 # note this is idempotent 604 def set_output_stream_done 605 @call_finished_mu.synchronize do 606 @output_stream_done = true 607 maybe_finish_and_close_call_locked 608 end 609 end 610 611 def maybe_finish_and_close_call_locked 612 return unless @output_stream_done && @input_stream_done 613 return if @call_finished 614 @call_finished = true 615 op_is_done 616 @call.close 617 end 618 619 # Starts the call if not already started 620 # @param metadata [Hash] metadata to be sent to the server. If a value is 621 # a list, multiple metadata for its key are sent 622 def start_call(metadata = {}) 623 merge_metadata_to_send(metadata) && send_initial_metadata 624 end 625 626 def raise_error_if_already_executed 627 @client_call_executed_mu.synchronize do 628 if @client_call_executed 629 fail GRPC::Core::CallError, 'attempting to re-run a call' 630 end 631 @client_call_executed = true 632 end 633 end 634 635 def self.view_class(*visible_methods) 636 Class.new do 637 extend ::Forwardable 638 def_delegators :@wrapped, *visible_methods 639 640 # @param wrapped [ActiveCall] the call whose methods are shielded 641 def initialize(wrapped) 642 @wrapped = wrapped 643 end 644 end 645 end 646 647 # SingleReqView limits access to an ActiveCall's methods for use in server 648 # handlers that receive just one request. 649 SingleReqView = view_class(:cancelled?, :deadline, :metadata, 650 :output_metadata, :peer, :peer_cert, 651 :send_initial_metadata, 652 :metadata_to_send, 653 :merge_metadata_to_send, 654 :metadata_sent) 655 656 # MultiReqView limits access to an ActiveCall's methods for use in 657 # server client_streamer handlers. 658 MultiReqView = view_class(:cancelled?, :deadline, 659 :each_remote_read, :metadata, :output_metadata, 660 :peer, :peer_cert, 661 :send_initial_metadata, 662 :metadata_to_send, 663 :merge_metadata_to_send, 664 :metadata_sent) 665 666 # Operation limits access to an ActiveCall's methods for use as 667 # a Operation on the client. 668 Operation = view_class(:cancel, :cancelled?, :deadline, :execute, 669 :metadata, :status, :start_call, :wait, :write_flag, 670 :write_flag=, :trailing_metadata) 671 672 # InterceptableView further limits access to an ActiveCall's methods 673 # for use in interceptors on the client, exposing only the deadline 674 InterceptableView = view_class(:deadline) 675 end 676end 677