xref: /aosp_15_r20/external/grpc-grpc/src/ruby/lib/grpc/generic/bidi_call.rb (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15require 'forwardable'
16require_relative '../grpc'
17
18# GRPC contains the General RPC module.
19module GRPC
20  # The BiDiCall class orchestrates execution of a BiDi stream on a client or
21  # server.
22  class BidiCall
23    include Core::CallOps
24    include Core::StatusCodes
25    include Core::TimeConsts
26
27    # Creates a BidiCall.
28    #
29    # BidiCall should only be created after a call is accepted.  That means
30    # different things on a client and a server.  On the client, the call is
31    # accepted after call.invoke. On the server, this is after call.accept.
32    #
33    # #initialize cannot determine if the call is accepted or not; so if a
34    # call that's not accepted is used here, the error won't be visible until
35    # the BidiCall#run is called.
36    #
37    # deadline is the absolute deadline for the call.
38    #
39    # @param call [Call] the call used by the ActiveCall
40    # @param marshal [Function] f(obj)->string that marshal requests
41    # @param unmarshal [Function] f(string)->obj that unmarshals responses
42    # @param metadata_received [true|false] indicates if metadata has already
43    #     been received. Should always be true for server calls
44    def initialize(call, marshal, unmarshal, metadata_received: false,
45                   req_view: nil)
46      fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
47      @call = call
48      @marshal = marshal
49      @op_notifier = nil  # signals completion on clients
50      @unmarshal = unmarshal
51      @metadata_received = metadata_received
52      @reads_complete = false
53      @writes_complete = false
54      @complete = false
55      @done_mutex = Mutex.new
56      @req_view = req_view
57    end
58
59    # Begins orchestration of the Bidi stream for a client sending requests.
60    #
61    # The method either returns an Enumerator of the responses, or accepts a
62    # block that can be invoked with each response.
63    #
64    # @param requests the Enumerable of requests to send
65    # @param set_input_stream_done [Proc] called back when we're done
66    #   reading the input stream
67    # @param set_output_stream_done [Proc] called back when we're done
68    #   sending data on the output stream
69    # @return an Enumerator of requests to yield
70    def run_on_client(requests,
71                      set_input_stream_done,
72                      set_output_stream_done,
73                      &blk)
74      @enq_th = Thread.new do
75        write_loop(requests, set_output_stream_done: set_output_stream_done)
76      end
77      read_loop(set_input_stream_done, &blk)
78    end
79
80    # Begins orchestration of the Bidi stream for a server generating replies.
81    #
82    # N.B. gen_each_reply is a func(Enumerable<Requests>)
83    #
84    # It takes an enumerable of requests as an arg, in case there is a
85    # relationship between the stream of requests and the stream of replies.
86    #
87    # This does not mean that must necessarily be one.  E.g, the replies
88    # produced by gen_each_reply could ignore the received_msgs
89    #
90    # @param [Proc] gen_each_reply generates the BiDi stream replies.
91    # @param [Enumerable] requests The enumerable of requests to run
92    def run_on_server(gen_each_reply, requests)
93      replies = nil
94
95      # Pass in the optional call object parameter if possible
96      if gen_each_reply.arity == 1
97        replies = gen_each_reply.call(requests)
98      elsif gen_each_reply.arity == 2
99        replies = gen_each_reply.call(requests, @req_view)
100      else
101        fail 'Illegal arity of reply generator'
102      end
103
104      write_loop(replies, is_client: false)
105    end
106
107    ##
108    # Read the next stream iteration
109    #
110    # @param [Proc] finalize_stream callback to call when the reads have been
111    #   completely read through.
112    # @param [Boolean] is_client If this is a client or server request
113    #
114    def read_next_loop(finalize_stream, is_client = false)
115      read_loop(finalize_stream, is_client: is_client)
116    end
117
118    private
119
120    END_OF_READS = :end_of_reads
121    END_OF_WRITES = :end_of_writes
122
123    # performs a read using @call.run_batch, ensures metadata is set up
124    def read_using_run_batch
125      ops = { RECV_MESSAGE => nil }
126      ops[RECV_INITIAL_METADATA] = nil unless @metadata_received
127      begin
128        batch_result = @call.run_batch(ops)
129        unless @metadata_received
130          @call.metadata = batch_result.metadata
131          @metadata_received = true
132        end
133        batch_result
134      rescue GRPC::Core::CallError => e
135        GRPC.logger.warn('bidi call: read_using_run_batch failed')
136        GRPC.logger.warn(e)
137        nil
138      end
139    end
140
141    # set_output_stream_done is relevant on client-side
142    # rubocop:disable Metrics/PerceivedComplexity
143    def write_loop(requests, is_client: true, set_output_stream_done: nil)
144      GRPC::Core.fork_unsafe_begin
145      GRPC.logger.debug('bidi-write-loop: starting')
146      count = 0
147      requests.each do |req|
148        GRPC.logger.debug("bidi-write-loop: #{count}")
149        count += 1
150        payload = @marshal.call(req)
151        # Fails if status already received
152        begin
153          @req_view.send_initial_metadata unless @req_view.nil?
154          @call.run_batch(SEND_MESSAGE => payload)
155        rescue GRPC::Core::CallError => e
156          # This is almost definitely caused by a status arriving while still
157          # writing. Don't re-throw the error
158          GRPC.logger.warn('bidi-write-loop: ended with error')
159          GRPC.logger.warn(e)
160          break
161        end
162      end
163      GRPC.logger.debug("bidi-write-loop: #{count} writes done")
164      if is_client
165        GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
166        begin
167          @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil)
168        rescue GRPC::Core::CallError => e
169          GRPC.logger.warn('bidi-write-loop: send close failed')
170          GRPC.logger.warn(e)
171        end
172        GRPC.logger.debug('bidi-write-loop: done')
173      end
174      GRPC.logger.debug('bidi-write-loop: finished')
175    rescue StandardError => e
176      GRPC.logger.warn('bidi-write-loop: failed')
177      GRPC.logger.warn(e)
178      if is_client
179        @call.cancel_with_status(GRPC::Core::StatusCodes::UNKNOWN,
180                                 "GRPC bidi call error: #{e.inspect}")
181      else
182        raise e
183      end
184    ensure
185      GRPC::Core.fork_unsafe_end
186      set_output_stream_done.call if is_client
187    end
188    # rubocop:enable Metrics/PerceivedComplexity
189
190    # Provides an enumerator that yields results of remote reads
191    def read_loop(set_input_stream_done, is_client: true)
192      return enum_for(:read_loop,
193                      set_input_stream_done,
194                      is_client: is_client) unless block_given?
195      GRPC.logger.debug('bidi-read-loop: starting')
196      begin
197        count = 0
198        # queue the initial read before beginning the loop
199        loop do
200          GRPC.logger.debug("bidi-read-loop: #{count}")
201          count += 1
202          batch_result = read_using_run_batch
203
204          # handle the next message
205          if batch_result.nil? || batch_result.message.nil?
206            GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")
207
208            if is_client
209              batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
210              @call.status = batch_result.status
211              @call.trailing_metadata = @call.status.metadata if @call.status
212              GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
213              batch_result.check_status
214            end
215
216            GRPC.logger.debug('bidi-read-loop: done reading!')
217            break
218          end
219
220          res = @unmarshal.call(batch_result.message)
221          yield res
222        end
223      rescue StandardError => e
224        GRPC.logger.warn('bidi: read-loop failed')
225        GRPC.logger.warn(e)
226        raise e
227      ensure
228        set_input_stream_done.call
229      end
230      GRPC.logger.debug('bidi-read-loop: finished')
231      # Make sure that the write loop is done before finishing the call.
232      # Note that blocking is ok at this point because we've already received
233      # a status
234      @enq_th.join if is_client
235    end
236  end
237end
238