xref: /aosp_15_r20/external/grpc-grpc/src/ruby/lib/grpc/generic/active_call.rb (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15require 'forwardable'
16require 'weakref'
17require_relative 'bidi_call'
18
19class Struct
20  # BatchResult is the struct returned by calls to call#start_batch.
21  class BatchResult
22    # check_status returns the status, raising an error if the status
23    # is non-nil and not OK.
24    def check_status
25      return nil if status.nil?
26      if status.code != GRPC::Core::StatusCodes::OK
27        GRPC.logger.debug("Failing with status #{status}")
28        # raise BadStatus, propagating the metadata if present.
29        fail GRPC::BadStatus.new_status_exception(
30          status.code, status.details, status.metadata,
31          status.debug_error_string)
32      end
33      status
34    end
35  end
36end
37
38# GRPC contains the General RPC module.
39module GRPC
40  # The ActiveCall class provides simple methods for sending marshallable
41  # data to a call
42  class ActiveCall # rubocop:disable Metrics/ClassLength
43    include Core::TimeConsts
44    include Core::CallOps
45    extend Forwardable
46    attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert
47    def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=,
48                   :trailing_metadata, :status
49
50    # client_invoke begins a client invocation.
51    #
52    # Flow Control note: this blocks until flow control accepts that client
53    # request can go ahead.
54    #
55    # deadline is the absolute deadline for the call.
56    #
57    # == Keyword Arguments ==
58    # any keyword arguments are treated as metadata to be sent to the server
59    # if a keyword value is a list, multiple metadata for it's key are sent
60    #
61    # @param call [Call] a call on which to start and invocation
62    # @param metadata [Hash] the metadata
63    def self.client_invoke(call, metadata = {})
64      fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
65      call.run_batch(SEND_INITIAL_METADATA => metadata)
66    end
67
68    # Creates an ActiveCall.
69    #
70    # ActiveCall should only be created after a call is accepted.  That
71    # means different things on a client and a server.  On the client, the
72    # call is accepted after calling call.invoke.  On the server, this is
73    # after call.accept.
74    #
75    # #initialize cannot determine if the call is accepted or not; so if a
76    # call that's not accepted is used here, the error won't be visible until
77    # the ActiveCall methods are called.
78    #
79    # deadline is the absolute deadline for the call.
80    #
81    # @param call [Call] the call used by the ActiveCall
82    # @param marshal [Function] f(obj)->string that marshal requests
83    # @param unmarshal [Function] f(string)->obj that unmarshals responses
84    # @param deadline [Fixnum] the deadline for the call to complete
85    # @param started [true|false] indicates that metadata was sent
86    # @param metadata_received [true|false] indicates if metadata has already
87    #     been received. Should always be true for server calls
88    def initialize(call, marshal, unmarshal, deadline, started: true,
89                   metadata_received: false, metadata_to_send: nil)
90      fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
91      @call = call
92      @deadline = deadline
93      @marshal = marshal
94      @unmarshal = unmarshal
95      @metadata_received = metadata_received
96      @metadata_sent = started
97      @op_notifier = nil
98
99      fail(ArgumentError, 'Already sent md') if started && metadata_to_send
100      @metadata_to_send = metadata_to_send || {} unless started
101      @send_initial_md_mutex = Mutex.new
102
103      @output_stream_done = false
104      @input_stream_done = false
105      @call_finished = false
106      @call_finished_mu = Mutex.new
107
108      @client_call_executed = false
109      @client_call_executed_mu = Mutex.new
110
111      # set the peer now so that the accessor can still function
112      # after the server closes the call
113      @peer = call.peer
114    end
115
116    # Sends the initial metadata that has yet to be sent.
117    # Does nothing if metadata has already been sent for this call.
118    def send_initial_metadata(new_metadata = {})
119      @send_initial_md_mutex.synchronize do
120        return if @metadata_sent
121        @metadata_to_send.merge!(new_metadata)
122        ActiveCall.client_invoke(@call, @metadata_to_send)
123        @metadata_sent = true
124      end
125    end
126
127    # output_metadata are provides access to hash that can be used to
128    # save metadata to be sent as trailer
129    def output_metadata
130      @output_metadata ||= {}
131    end
132
133    # cancelled indicates if the call was cancelled
134    def cancelled?
135      !@call.status.nil? && @call.status.code == Core::StatusCodes::CANCELLED
136    end
137
138    # multi_req_view provides a restricted view of this ActiveCall for use
139    # in a server client-streaming handler.
140    def multi_req_view
141      MultiReqView.new(self)
142    end
143
144    # single_req_view provides a restricted view of this ActiveCall for use in
145    # a server request-response handler.
146    def single_req_view
147      SingleReqView.new(self)
148    end
149
150    # operation provides a restricted view of this ActiveCall for use as
151    # a Operation.
152    def operation
153      @op_notifier = Notifier.new
154      Operation.new(self)
155    end
156
157    ##
158    # Returns a restricted view of this ActiveCall for use in interceptors
159    #
160    # @return [InterceptableView]
161    #
162    def interceptable
163      InterceptableView.new(self)
164    end
165
166    def receive_and_check_status
167      ops = { RECV_STATUS_ON_CLIENT => nil }
168      ops[RECV_INITIAL_METADATA] = nil unless @metadata_received
169      batch_result = @call.run_batch(ops)
170      unless @metadata_received
171        @call.metadata = batch_result.metadata
172      end
173      set_input_stream_done
174      attach_status_results_and_complete_call(batch_result)
175    ensure
176      # Ensure we don't attempt to request the initial metadata again
177      # in case an exception occurs.
178      @metadata_received = true
179    end
180
181    def attach_status_results_and_complete_call(recv_status_batch_result)
182      unless recv_status_batch_result.status.nil?
183        @call.trailing_metadata = recv_status_batch_result.status.metadata
184      end
185      @call.status = recv_status_batch_result.status
186
187      # The RECV_STATUS in run_batch always succeeds
188      # Check the status for a bad status or failed run batch
189      recv_status_batch_result.check_status
190    end
191
192    # remote_send sends a request to the remote endpoint.
193    #
194    # It blocks until the remote endpoint accepts the message.
195    #
196    # @param req [Object, String] the object to send or it's marshal form.
197    # @param marshalled [false, true] indicates if the object is already
198    # marshalled.
199    def remote_send(req, marshalled = false)
200      send_initial_metadata
201      GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}")
202      payload = marshalled ? req : @marshal.call(req)
203      @call.run_batch(SEND_MESSAGE => payload)
204    end
205
206    # send_status sends a status to the remote endpoint.
207    #
208    # @param code [int] the status code to send
209    # @param details [String] details
210    # @param assert_finished [true, false] when true(default), waits for
211    # FINISHED.
212    # @param metadata [Hash] metadata to send to the server. If a value is a
213    # list, mulitple metadata for its key are sent
214    def send_status(code = OK, details = '', assert_finished = false,
215                    metadata: {})
216      send_initial_metadata
217      ops = {
218        SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, metadata)
219      }
220      ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished
221      @call.run_batch(ops)
222      set_output_stream_done
223
224      nil
225    end
226
227    # Intended for use on server-side calls when a single request from
228    # the client is expected (i.e., unary and server-streaming RPC types).
229    def read_unary_request
230      req = remote_read
231      set_input_stream_done
232      req
233    end
234
235    def server_unary_response(req, trailing_metadata: {},
236                              code: Core::StatusCodes::OK, details: 'OK')
237      ops = {}
238      ops[SEND_MESSAGE] = @marshal.call(req)
239      ops[SEND_STATUS_FROM_SERVER] = Struct::Status.new(
240        code, details, trailing_metadata)
241      ops[RECV_CLOSE_ON_SERVER] = nil
242
243      @send_initial_md_mutex.synchronize do
244        ops[SEND_INITIAL_METADATA] = @metadata_to_send unless @metadata_sent
245        @metadata_sent = true
246      end
247
248      @call.run_batch(ops)
249      set_output_stream_done
250    end
251
252    # remote_read reads a response from the remote endpoint.
253    #
254    # It blocks until the remote endpoint replies with a message or status.
255    # On receiving a message, it returns the response after unmarshalling it.
256    # On receiving a status, it returns nil if the status is OK, otherwise
257    # raising BadStatus
258    def remote_read
259      ops = { RECV_MESSAGE => nil }
260      ops[RECV_INITIAL_METADATA] = nil unless @metadata_received
261      batch_result = @call.run_batch(ops)
262      unless @metadata_received
263        @call.metadata = batch_result.metadata
264      end
265      get_message_from_batch_result(batch_result)
266    rescue GRPC::Core::CallError => e
267      GRPC.logger.info("remote_read: #{e}")
268      nil
269    ensure
270      # Ensure we don't attempt to request the initial metadata again
271      # in case an exception occurs.
272      @metadata_received = true
273    end
274
275    def get_message_from_batch_result(recv_message_batch_result)
276      unless recv_message_batch_result.nil? ||
277             recv_message_batch_result.message.nil?
278        return @unmarshal.call(recv_message_batch_result.message)
279      end
280      GRPC.logger.debug('found nil; the final response has been sent')
281      nil
282    end
283
284    # each_remote_read passes each response to the given block or returns an
285    # enumerator the responses if no block is given.
286    # Used to generate the request enumerable for
287    # server-side client-streaming RPC's.
288    #
289    # == Enumerator ==
290    #
291    # * #next blocks until the remote endpoint sends a READ or FINISHED
292    # * for each read, enumerator#next yields the response
293    # * on status
294    #    * if it's is OK, enumerator#next raises StopException
295    #    * if is not OK, enumerator#next raises RuntimeException
296    #
297    # == Block ==
298    #
299    # * if provided it is executed for each response
300    # * the call blocks until no more responses are provided
301    #
302    # @return [Enumerator] if no block was given
303    def each_remote_read
304      return enum_for(:each_remote_read) unless block_given?
305      begin
306        loop do
307          resp = remote_read
308          break if resp.nil?  # the last response was received
309          yield resp
310        end
311      ensure
312        set_input_stream_done
313      end
314    end
315
316    # each_remote_read_then_finish passes each response to the given block or
317    # returns an enumerator of the responses if no block is given.
318    #
319    # It is like each_remote_read, but it blocks on finishing on detecting
320    # the final message.
321    #
322    # == Enumerator ==
323    #
324    # * #next blocks until the remote endpoint sends a READ or FINISHED
325    # * for each read, enumerator#next yields the response
326    # * on status
327    #    * if it's is OK, enumerator#next raises StopException
328    #    * if is not OK, enumerator#next raises RuntimeException
329    #
330    # == Block ==
331    #
332    # * if provided it is executed for each response
333    # * the call blocks until no more responses are provided
334    #
335    # @return [Enumerator] if no block was given
336    def each_remote_read_then_finish
337      return enum_for(:each_remote_read_then_finish) unless block_given?
338      loop do
339        resp = remote_read
340        break if resp.nil?  # the last response was received
341        yield resp
342      end
343
344      receive_and_check_status
345    ensure
346      set_input_stream_done
347    end
348
349    # request_response sends a request to a GRPC server, and returns the
350    # response.
351    #
352    # @param req [Object] the request sent to the server
353    # @param metadata [Hash] metadata to be sent to the server. If a value is
354    # a list, multiple metadata for its key are sent
355    # @return [Object] the response received from the server
356    def request_response(req, metadata: {})
357      raise_error_if_already_executed
358      ops = {
359        SEND_MESSAGE => @marshal.call(req),
360        SEND_CLOSE_FROM_CLIENT => nil,
361        RECV_INITIAL_METADATA => nil,
362        RECV_MESSAGE => nil,
363        RECV_STATUS_ON_CLIENT => nil
364      }
365      @send_initial_md_mutex.synchronize do
366        # Metadata might have already been sent if this is an operation view
367        unless @metadata_sent
368          ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata)
369        end
370        @metadata_sent = true
371      end
372
373      begin
374        batch_result = @call.run_batch(ops)
375        # no need to check for cancellation after a CallError because this
376        # batch contains a RECV_STATUS op
377      ensure
378        set_input_stream_done
379        set_output_stream_done
380      end
381
382      @call.metadata = batch_result.metadata
383      attach_status_results_and_complete_call(batch_result)
384      get_message_from_batch_result(batch_result)
385    end
386
387    # client_streamer sends a stream of requests to a GRPC server, and
388    # returns a single response.
389    #
390    # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
391    # #each enumeration protocol. In the simplest case, requests will be an
392    # array of marshallable objects; in typical case it will be an Enumerable
393    # that allows dynamic construction of the marshallable objects.
394    #
395    # @param requests [Object] an Enumerable of requests to send
396    # @param metadata [Hash] metadata to be sent to the server. If a value is
397    # a list, multiple metadata for its key are sent
398    # @return [Object] the response received from the server
399    def client_streamer(requests, metadata: {})
400      raise_error_if_already_executed
401      begin
402        send_initial_metadata(metadata)
403        requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
404      rescue GRPC::Core::CallError => e
405        receive_and_check_status # check for Cancelled
406        raise e
407      rescue => e
408        set_input_stream_done
409        raise e
410      ensure
411        set_output_stream_done
412      end
413
414      batch_result = @call.run_batch(
415        SEND_CLOSE_FROM_CLIENT => nil,
416        RECV_INITIAL_METADATA => nil,
417        RECV_MESSAGE => nil,
418        RECV_STATUS_ON_CLIENT => nil
419      )
420
421      set_input_stream_done
422
423      @call.metadata = batch_result.metadata
424      attach_status_results_and_complete_call(batch_result)
425      get_message_from_batch_result(batch_result)
426    end
427
428    # server_streamer sends one request to the GRPC server, which yields a
429    # stream of responses.
430    #
431    # responses provides an enumerator over the streamed responses, i.e. it
432    # follows Ruby's #each iteration protocol.  The enumerator blocks while
433    # waiting for each response, stops when the server signals that no
434    # further responses will be supplied.  If the implicit block is provided,
435    # it is executed with each response as the argument and no result is
436    # returned.
437    #
438    # @param req [Object] the request sent to the server
439    # @param metadata [Hash] metadata to be sent to the server. If a value is
440    # a list, multiple metadata for its key are sent
441    # @return [Enumerator|nil] a response Enumerator
442    def server_streamer(req, metadata: {})
443      raise_error_if_already_executed
444      ops = {
445        SEND_MESSAGE => @marshal.call(req),
446        SEND_CLOSE_FROM_CLIENT => nil
447      }
448      @send_initial_md_mutex.synchronize do
449        # Metadata might have already been sent if this is an operation view
450        unless @metadata_sent
451          ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata)
452        end
453        @metadata_sent = true
454      end
455
456      begin
457        @call.run_batch(ops)
458      rescue GRPC::Core::CallError => e
459        receive_and_check_status # checks for Cancelled
460        raise e
461      rescue => e
462        set_input_stream_done
463        raise e
464      ensure
465        set_output_stream_done
466      end
467
468      replies = enum_for(:each_remote_read_then_finish)
469      return replies unless block_given?
470      replies.each { |r| yield r }
471    end
472
473    # bidi_streamer sends a stream of requests to the GRPC server, and yields
474    # a stream of responses.
475    #
476    # This method takes an Enumerable of requests, and returns and enumerable
477    # of responses.
478    #
479    # == requests ==
480    #
481    # requests provides an 'iterable' of Requests. I.e. it follows Ruby's
482    # #each enumeration protocol. In the simplest case, requests will be an
483    # array of marshallable objects; in typical case it will be an
484    # Enumerable that allows dynamic construction of the marshallable
485    # objects.
486    #
487    # == responses ==
488    #
489    # This is an enumerator of responses.  I.e, its #next method blocks
490    # waiting for the next response.  Also, if at any point the block needs
491    # to consume all the remaining responses, this can be done using #each or
492    # #collect.  Calling #each or #collect should only be done if
493    # the_call#writes_done has been called, otherwise the block will loop
494    # forever.
495    #
496    # @param requests [Object] an Enumerable of requests to send
497    # @param metadata [Hash] metadata to be sent to the server. If a value is
498    # a list, multiple metadata for its key are sent
499    # @return [Enumerator, nil] a response Enumerator
500    def bidi_streamer(requests, metadata: {}, &blk)
501      raise_error_if_already_executed
502      # Metadata might have already been sent if this is an operation view
503      begin
504        send_initial_metadata(metadata)
505      rescue GRPC::Core::CallError => e
506        batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
507        set_input_stream_done
508        set_output_stream_done
509        attach_status_results_and_complete_call(batch_result)
510        raise e
511      rescue => e
512        set_input_stream_done
513        set_output_stream_done
514        raise e
515      end
516
517      bd = BidiCall.new(@call,
518                        @marshal,
519                        @unmarshal,
520                        metadata_received: @metadata_received)
521
522      bd.run_on_client(requests,
523                       proc { set_input_stream_done },
524                       proc { set_output_stream_done },
525                       &blk)
526    end
527
528    # run_server_bidi orchestrates a BiDi stream processing on a server.
529    #
530    # N.B. gen_each_reply is a func(Enumerable<Requests>)
531    #
532    # It takes an enumerable of requests as an arg, in case there is a
533    # relationship between the stream of requests and the stream of replies.
534    #
535    # This does not mean that must necessarily be one.  E.g, the replies
536    # produced by gen_each_reply could ignore the received_msgs
537    #
538    # @param mth [Proc] generates the BiDi stream replies
539    # @param interception_ctx [InterceptionContext]
540    #
541    def run_server_bidi(mth, interception_ctx)
542      view = multi_req_view
543      bidi_call = BidiCall.new(
544        @call,
545        @marshal,
546        @unmarshal,
547        metadata_received: @metadata_received,
548        req_view: view
549      )
550      requests = bidi_call.read_next_loop(proc { set_input_stream_done }, false)
551      interception_ctx.intercept!(
552        :bidi_streamer,
553        call: view,
554        method: mth,
555        requests: requests
556      ) do
557        bidi_call.run_on_server(mth, requests)
558      end
559    end
560
561    # Waits till an operation completes
562    def wait
563      return if @op_notifier.nil?
564      GRPC.logger.debug("active_call.wait: on #{@op_notifier}")
565      @op_notifier.wait
566    end
567
568    # Signals that an operation is done.
569    # Only relevant on the client-side (this is a no-op on the server-side)
570    def op_is_done
571      return if @op_notifier.nil?
572      @op_notifier.notify(self)
573    end
574
575    # Add to the metadata that will be sent from the server.
576    # Fails if metadata has already been sent.
577    # Unused by client calls.
578    def merge_metadata_to_send(new_metadata = {})
579      @send_initial_md_mutex.synchronize do
580        fail('cant change metadata after already sent') if @metadata_sent
581        @metadata_to_send.merge!(new_metadata)
582      end
583    end
584
585    def attach_peer_cert(peer_cert)
586      @peer_cert = peer_cert
587    end
588
589    private
590
591    # To be called once the "input stream" has been completelly
592    # read through (i.e, done reading from client or received status)
593    # note this is idempotent
594    def set_input_stream_done
595      @call_finished_mu.synchronize do
596        @input_stream_done = true
597        maybe_finish_and_close_call_locked
598      end
599    end
600
601    # To be called once the "output stream" has been completelly
602    # sent through (i.e, done sending from client or sent status)
603    # note this is idempotent
604    def set_output_stream_done
605      @call_finished_mu.synchronize do
606        @output_stream_done = true
607        maybe_finish_and_close_call_locked
608      end
609    end
610
611    def maybe_finish_and_close_call_locked
612      return unless @output_stream_done && @input_stream_done
613      return if @call_finished
614      @call_finished = true
615      op_is_done
616      @call.close
617    end
618
619    # Starts the call if not already started
620    # @param metadata [Hash] metadata to be sent to the server. If a value is
621    # a list, multiple metadata for its key are sent
622    def start_call(metadata = {})
623      merge_metadata_to_send(metadata) && send_initial_metadata
624    end
625
626    def raise_error_if_already_executed
627      @client_call_executed_mu.synchronize do
628        if @client_call_executed
629          fail GRPC::Core::CallError, 'attempting to re-run a call'
630        end
631        @client_call_executed = true
632      end
633    end
634
635    def self.view_class(*visible_methods)
636      Class.new do
637        extend ::Forwardable
638        def_delegators :@wrapped, *visible_methods
639
640        # @param wrapped [ActiveCall] the call whose methods are shielded
641        def initialize(wrapped)
642          @wrapped = wrapped
643        end
644      end
645    end
646
647    # SingleReqView limits access to an ActiveCall's methods for use in server
648    # handlers that receive just one request.
649    SingleReqView = view_class(:cancelled?, :deadline, :metadata,
650                               :output_metadata, :peer, :peer_cert,
651                               :send_initial_metadata,
652                               :metadata_to_send,
653                               :merge_metadata_to_send,
654                               :metadata_sent)
655
656    # MultiReqView limits access to an ActiveCall's methods for use in
657    # server client_streamer handlers.
658    MultiReqView = view_class(:cancelled?, :deadline,
659                              :each_remote_read, :metadata, :output_metadata,
660                              :peer, :peer_cert,
661                              :send_initial_metadata,
662                              :metadata_to_send,
663                              :merge_metadata_to_send,
664                              :metadata_sent)
665
666    # Operation limits access to an ActiveCall's methods for use as
667    # a Operation on the client.
668    Operation = view_class(:cancel, :cancelled?, :deadline, :execute,
669                           :metadata, :status, :start_call, :wait, :write_flag,
670                           :write_flag=, :trailing_metadata)
671
672    # InterceptableView further limits access to an ActiveCall's methods
673    # for use in interceptors on the client, exposing only the deadline
674    InterceptableView = view_class(:deadline)
675  end
676end
677