1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15require 'forwardable' 16require_relative '../grpc' 17 18# GRPC contains the General RPC module. 19module GRPC 20 # The BiDiCall class orchestrates execution of a BiDi stream on a client or 21 # server. 22 class BidiCall 23 include Core::CallOps 24 include Core::StatusCodes 25 include Core::TimeConsts 26 27 # Creates a BidiCall. 28 # 29 # BidiCall should only be created after a call is accepted. That means 30 # different things on a client and a server. On the client, the call is 31 # accepted after call.invoke. On the server, this is after call.accept. 32 # 33 # #initialize cannot determine if the call is accepted or not; so if a 34 # call that's not accepted is used here, the error won't be visible until 35 # the BidiCall#run is called. 36 # 37 # deadline is the absolute deadline for the call. 38 # 39 # @param call [Call] the call used by the ActiveCall 40 # @param marshal [Function] f(obj)->string that marshal requests 41 # @param unmarshal [Function] f(string)->obj that unmarshals responses 42 # @param metadata_received [true|false] indicates if metadata has already 43 # been received. Should always be true for server calls 44 def initialize(call, marshal, unmarshal, metadata_received: false, 45 req_view: nil) 46 fail(ArgumentError, 'not a call') unless call.is_a? Core::Call 47 @call = call 48 @marshal = marshal 49 @op_notifier = nil # signals completion on clients 50 @unmarshal = unmarshal 51 @metadata_received = metadata_received 52 @reads_complete = false 53 @writes_complete = false 54 @complete = false 55 @done_mutex = Mutex.new 56 @req_view = req_view 57 end 58 59 # Begins orchestration of the Bidi stream for a client sending requests. 60 # 61 # The method either returns an Enumerator of the responses, or accepts a 62 # block that can be invoked with each response. 63 # 64 # @param requests the Enumerable of requests to send 65 # @param set_input_stream_done [Proc] called back when we're done 66 # reading the input stream 67 # @param set_output_stream_done [Proc] called back when we're done 68 # sending data on the output stream 69 # @return an Enumerator of requests to yield 70 def run_on_client(requests, 71 set_input_stream_done, 72 set_output_stream_done, 73 &blk) 74 @enq_th = Thread.new do 75 write_loop(requests, set_output_stream_done: set_output_stream_done) 76 end 77 read_loop(set_input_stream_done, &blk) 78 end 79 80 # Begins orchestration of the Bidi stream for a server generating replies. 81 # 82 # N.B. gen_each_reply is a func(Enumerable<Requests>) 83 # 84 # It takes an enumerable of requests as an arg, in case there is a 85 # relationship between the stream of requests and the stream of replies. 86 # 87 # This does not mean that must necessarily be one. E.g, the replies 88 # produced by gen_each_reply could ignore the received_msgs 89 # 90 # @param [Proc] gen_each_reply generates the BiDi stream replies. 91 # @param [Enumerable] requests The enumerable of requests to run 92 def run_on_server(gen_each_reply, requests) 93 replies = nil 94 95 # Pass in the optional call object parameter if possible 96 if gen_each_reply.arity == 1 97 replies = gen_each_reply.call(requests) 98 elsif gen_each_reply.arity == 2 99 replies = gen_each_reply.call(requests, @req_view) 100 else 101 fail 'Illegal arity of reply generator' 102 end 103 104 write_loop(replies, is_client: false) 105 end 106 107 ## 108 # Read the next stream iteration 109 # 110 # @param [Proc] finalize_stream callback to call when the reads have been 111 # completely read through. 112 # @param [Boolean] is_client If this is a client or server request 113 # 114 def read_next_loop(finalize_stream, is_client = false) 115 read_loop(finalize_stream, is_client: is_client) 116 end 117 118 private 119 120 END_OF_READS = :end_of_reads 121 END_OF_WRITES = :end_of_writes 122 123 # performs a read using @call.run_batch, ensures metadata is set up 124 def read_using_run_batch 125 ops = { RECV_MESSAGE => nil } 126 ops[RECV_INITIAL_METADATA] = nil unless @metadata_received 127 begin 128 batch_result = @call.run_batch(ops) 129 unless @metadata_received 130 @call.metadata = batch_result.metadata 131 @metadata_received = true 132 end 133 batch_result 134 rescue GRPC::Core::CallError => e 135 GRPC.logger.warn('bidi call: read_using_run_batch failed') 136 GRPC.logger.warn(e) 137 nil 138 end 139 end 140 141 # set_output_stream_done is relevant on client-side 142 # rubocop:disable Metrics/PerceivedComplexity 143 def write_loop(requests, is_client: true, set_output_stream_done: nil) 144 GRPC::Core.fork_unsafe_begin 145 GRPC.logger.debug('bidi-write-loop: starting') 146 count = 0 147 requests.each do |req| 148 GRPC.logger.debug("bidi-write-loop: #{count}") 149 count += 1 150 payload = @marshal.call(req) 151 # Fails if status already received 152 begin 153 @req_view.send_initial_metadata unless @req_view.nil? 154 @call.run_batch(SEND_MESSAGE => payload) 155 rescue GRPC::Core::CallError => e 156 # This is almost definitely caused by a status arriving while still 157 # writing. Don't re-throw the error 158 GRPC.logger.warn('bidi-write-loop: ended with error') 159 GRPC.logger.warn(e) 160 break 161 end 162 end 163 GRPC.logger.debug("bidi-write-loop: #{count} writes done") 164 if is_client 165 GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") 166 begin 167 @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil) 168 rescue GRPC::Core::CallError => e 169 GRPC.logger.warn('bidi-write-loop: send close failed') 170 GRPC.logger.warn(e) 171 end 172 GRPC.logger.debug('bidi-write-loop: done') 173 end 174 GRPC.logger.debug('bidi-write-loop: finished') 175 rescue StandardError => e 176 GRPC.logger.warn('bidi-write-loop: failed') 177 GRPC.logger.warn(e) 178 if is_client 179 @call.cancel_with_status(GRPC::Core::StatusCodes::UNKNOWN, 180 "GRPC bidi call error: #{e.inspect}") 181 else 182 raise e 183 end 184 ensure 185 GRPC::Core.fork_unsafe_end 186 set_output_stream_done.call if is_client 187 end 188 # rubocop:enable Metrics/PerceivedComplexity 189 190 # Provides an enumerator that yields results of remote reads 191 def read_loop(set_input_stream_done, is_client: true) 192 return enum_for(:read_loop, 193 set_input_stream_done, 194 is_client: is_client) unless block_given? 195 GRPC.logger.debug('bidi-read-loop: starting') 196 begin 197 count = 0 198 # queue the initial read before beginning the loop 199 loop do 200 GRPC.logger.debug("bidi-read-loop: #{count}") 201 count += 1 202 batch_result = read_using_run_batch 203 204 # handle the next message 205 if batch_result.nil? || batch_result.message.nil? 206 GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}") 207 208 if is_client 209 batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) 210 @call.status = batch_result.status 211 @call.trailing_metadata = @call.status.metadata if @call.status 212 GRPC.logger.debug("bidi-read-loop: done status #{@call.status}") 213 batch_result.check_status 214 end 215 216 GRPC.logger.debug('bidi-read-loop: done reading!') 217 break 218 end 219 220 res = @unmarshal.call(batch_result.message) 221 yield res 222 end 223 rescue StandardError => e 224 GRPC.logger.warn('bidi: read-loop failed') 225 GRPC.logger.warn(e) 226 raise e 227 ensure 228 set_input_stream_done.call 229 end 230 GRPC.logger.debug('bidi-read-loop: finished') 231 # Make sure that the write loop is done before finishing the call. 232 # Note that blocking is ok at this point because we've already received 233 # a status 234 @enq_th.join if is_client 235 end 236 end 237end 238