xref: /aosp_15_r20/external/grpc-grpc/src/ruby/spec/generic/active_call_spec.rb (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15require 'spec_helper'
16
17include GRPC::Core::StatusCodes
18
19describe GRPC::ActiveCall do
20  ActiveCall = GRPC::ActiveCall
21  Call = GRPC::Core::Call
22  CallOps = GRPC::Core::CallOps
23  WriteFlags = GRPC::Core::WriteFlags
24
25  def ok_status
26    Struct::Status.new(OK, 'OK')
27  end
28
29  def send_and_receive_close_and_status(client_call, server_call)
30    client_call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
31    server_call.run_batch(CallOps::RECV_CLOSE_ON_SERVER => nil,
32                          CallOps::SEND_STATUS_FROM_SERVER => ok_status)
33    client_call.run_batch(CallOps::RECV_STATUS_ON_CLIENT => nil)
34  end
35
36  def inner_call_of_active_call(active_call)
37    active_call.instance_variable_get(:@call)
38  end
39
40  before(:each) do
41    @pass_through = proc { |x| x }
42    host = '0.0.0.0:0'
43    @server = new_core_server_for_testing(nil)
44    server_port = @server.add_http2_port(host, :this_port_is_insecure)
45    @server.start
46    @received_rpcs_queue = Queue.new
47    @server_thread = Thread.new do
48      begin
49        received_rpc = @server.request_call
50      rescue GRPC::Core::CallError, StandardError => e
51        # enqueue the exception in this case as a way to indicate the error
52        received_rpc = e
53      end
54      @received_rpcs_queue.push(received_rpc)
55    end
56    @ch = GRPC::Core::Channel.new("0.0.0.0:#{server_port}", nil,
57                                  :this_channel_is_insecure)
58  end
59
60  after(:each) do
61    @server.shutdown_and_notify(deadline)
62    @server.close
63    @server_thread.join
64  end
65
66  describe 'restricted view methods' do
67    before(:each) do
68      @call = make_test_call
69      ActiveCall.client_invoke(@call)
70      @client_call = ActiveCall.new(@call, @pass_through,
71                                    @pass_through, deadline)
72    end
73
74    after(:each) do
75      # terminate the RPC that was started in before(:each)
76      recvd_rpc = @received_rpcs_queue.pop
77      recvd_call = recvd_rpc.call
78      recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => nil)
79      @call.run_batch(CallOps::RECV_INITIAL_METADATA => nil)
80      send_and_receive_close_and_status(@call, recvd_call)
81    end
82
83    describe '#multi_req_view' do
84      it 'exposes a fixed subset of the ActiveCall.methods' do
85        want = %w(cancelled?, deadline, each_remote_read, metadata, \
86                  shutdown, peer, peer_cert, send_initial_metadata, \
87                  initial_metadata_sent)
88        v = @client_call.multi_req_view
89        want.each do |w|
90          expect(v.methods.include?(w))
91        end
92      end
93    end
94
95    describe '#single_req_view' do
96      it 'exposes a fixed subset of the ActiveCall.methods' do
97        want = %w(cancelled?, deadline, metadata, shutdown, \
98                  send_initial_metadata, metadata_to_send, \
99                  merge_metadata_to_send, initial_metadata_sent)
100        v = @client_call.single_req_view
101        want.each do |w|
102          expect(v.methods.include?(w))
103        end
104      end
105    end
106
107    describe '#interceptable' do
108      it 'exposes a fixed subset of the ActiveCall.methods' do
109        want = %w(deadline)
110        v = @client_call.interceptable
111        want.each do |w|
112          expect(v.methods.include?(w))
113        end
114      end
115    end
116  end
117
118  describe '#remote_send' do
119    it 'allows a client to send a payload to the server', test: true do
120      call = make_test_call
121      ActiveCall.client_invoke(call)
122      client_call = ActiveCall.new(call, @pass_through,
123                                   @pass_through, deadline)
124      msg = 'message is a string'
125      client_call.remote_send(msg)
126
127      # check that server rpc new was received
128      recvd_rpc = @received_rpcs_queue.pop
129      expect(recvd_rpc).to_not eq nil
130      recvd_call = recvd_rpc.call
131
132      # Accept the call, and verify that the server reads the response ok.
133      server_call = ActiveCall.new(recvd_call, @pass_through,
134                                   @pass_through, deadline,
135                                   metadata_received: true,
136                                   started: false)
137      expect(server_call.remote_read).to eq(msg)
138      # finish the call
139      server_call.send_initial_metadata
140      call.run_batch(CallOps::RECV_INITIAL_METADATA => nil)
141      send_and_receive_close_and_status(call, recvd_call)
142    end
143
144    it 'marshals the payload using the marshal func' do
145      call = make_test_call
146      ActiveCall.client_invoke(call)
147      marshal = proc { |x| 'marshalled:' + x }
148      client_call = ActiveCall.new(call, marshal, @pass_through, deadline)
149      msg = 'message is a string'
150      client_call.remote_send(msg)
151
152      # confirm that the message was marshalled
153      recvd_rpc =  @received_rpcs_queue.pop
154      recvd_call = recvd_rpc.call
155      server_ops = {
156        CallOps::SEND_INITIAL_METADATA => nil
157      }
158      recvd_call.run_batch(server_ops)
159      server_call = ActiveCall.new(recvd_call, @pass_through,
160                                   @pass_through, deadline,
161                                   metadata_received: true)
162      expect(server_call.remote_read).to eq('marshalled:' + msg)
163      # finish the call
164      call.run_batch(CallOps::RECV_INITIAL_METADATA => nil)
165      send_and_receive_close_and_status(call, recvd_call)
166    end
167
168    TEST_WRITE_FLAGS = [WriteFlags::BUFFER_HINT, WriteFlags::NO_COMPRESS]
169    TEST_WRITE_FLAGS.each do |f|
170      it "successfully makes calls with write_flag set to #{f}" do
171        call = make_test_call
172        ActiveCall.client_invoke(call)
173        marshal = proc { |x| 'marshalled:' + x }
174        client_call = ActiveCall.new(call, marshal,
175                                     @pass_through, deadline)
176        msg = 'message is a string'
177        client_call.write_flag = f
178        client_call.remote_send(msg)
179        # flush the message in case writes are set to buffered
180        call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) if f == 1
181
182        # confirm that the message was marshalled
183        recvd_rpc =  @received_rpcs_queue.pop
184        recvd_call = recvd_rpc.call
185        server_ops = {
186          CallOps::SEND_INITIAL_METADATA => nil
187        }
188        recvd_call.run_batch(server_ops)
189        server_call = ActiveCall.new(recvd_call, @pass_through,
190                                     @pass_through, deadline,
191                                     metadata_received: true)
192        expect(server_call.remote_read).to eq('marshalled:' + msg)
193        # finish the call
194        server_call.send_status(OK, '', true)
195        client_call.receive_and_check_status
196      end
197    end
198  end
199
200  describe 'sending initial metadata', send_initial_metadata: true do
201    it 'sends metadata before sending a message if it hasnt been sent yet' do
202      call = make_test_call
203      @client_call = ActiveCall.new(
204        call,
205        @pass_through,
206        @pass_through,
207        deadline,
208        started: false)
209
210      metadata = { key: 'phony_val', other: 'other_val' }
211      expect(@client_call.metadata_sent).to eq(false)
212      @client_call.merge_metadata_to_send(metadata)
213
214      message = 'phony message'
215
216      expect(call).to(
217        receive(:run_batch)
218          .with(
219            hash_including(
220              CallOps::SEND_INITIAL_METADATA => metadata)).once)
221
222      expect(call).to(
223        receive(:run_batch).with(hash_including(
224                                   CallOps::SEND_MESSAGE => message)).once)
225      @client_call.remote_send(message)
226
227      expect(@client_call.metadata_sent).to eq(true)
228    end
229
230    it 'doesnt send metadata if it thinks its already been sent' do
231      call = make_test_call
232
233      @client_call = ActiveCall.new(call,
234                                    @pass_through,
235                                    @pass_through,
236                                    deadline)
237      expect(@client_call.metadata_sent).to eql(true)
238      expect(call).to(
239        receive(:run_batch).with(hash_including(
240                                   CallOps::SEND_INITIAL_METADATA)).never)
241
242      @client_call.remote_send('test message')
243    end
244
245    it 'sends metadata if it is explicitly sent and ok to do so' do
246      call = make_test_call
247
248      @client_call = ActiveCall.new(call,
249                                    @pass_through,
250                                    @pass_through,
251                                    deadline,
252                                    started: false)
253
254      expect(@client_call.metadata_sent).to eql(false)
255
256      metadata = { test_key: 'val' }
257      @client_call.merge_metadata_to_send(metadata)
258      expect(@client_call.metadata_to_send).to eq(metadata)
259
260      expect(call).to(
261        receive(:run_batch).with(hash_including(
262                                   CallOps::SEND_INITIAL_METADATA =>
263                                     metadata)).once)
264      @client_call.send_initial_metadata
265    end
266
267    it 'explicit sending does nothing if metadata has already been sent' do
268      call = make_test_call
269
270      @client_call = ActiveCall.new(call,
271                                    @pass_through,
272                                    @pass_through,
273                                    deadline)
274
275      expect(@client_call.metadata_sent).to eql(true)
276
277      blk = proc do
278        @client_call.send_initial_metadata
279      end
280
281      expect { blk.call }.to_not raise_error
282    end
283  end
284
285  describe '#merge_metadata_to_send', merge_metadata_to_send: true do
286    it 'adds to existing metadata when there is existing metadata to send' do
287      call = make_test_call
288      starting_metadata = {
289        k1: 'key1_val',
290        k2: 'key2_val',
291        k3: 'key3_val'
292      }
293
294      @client_call = ActiveCall.new(
295        call,
296        @pass_through, @pass_through,
297        deadline,
298        started: false,
299        metadata_to_send: starting_metadata)
300
301      expect(@client_call.metadata_to_send).to eq(starting_metadata)
302
303      @client_call.merge_metadata_to_send(
304        k3: 'key3_new_val',
305        k4: 'key4_val')
306
307      expected_md_to_send = {
308        k1: 'key1_val',
309        k2: 'key2_val',
310        k3: 'key3_new_val',
311        k4: 'key4_val' }
312
313      expect(@client_call.metadata_to_send).to eq(expected_md_to_send)
314
315      @client_call.merge_metadata_to_send(k5: 'key5_val')
316      expected_md_to_send.merge!(k5: 'key5_val')
317      expect(@client_call.metadata_to_send).to eq(expected_md_to_send)
318    end
319
320    it 'fails when initial metadata has already been sent' do
321      call = make_test_call
322      @client_call = ActiveCall.new(
323        call,
324        @pass_through,
325        @pass_through,
326        deadline,
327        started: true)
328
329      expect(@client_call.metadata_sent).to eq(true)
330
331      blk = proc do
332        @client_call.merge_metadata_to_send(k1: 'key1_val')
333      end
334
335      expect { blk.call }.to raise_error
336    end
337  end
338
339  describe '#client_invoke' do
340    it 'sends metadata to the server when present' do
341      call = make_test_call
342      metadata = { k1: 'v1', k2: 'v2' }
343      ActiveCall.client_invoke(call, metadata)
344      recvd_rpc =  @received_rpcs_queue.pop
345      recvd_call = recvd_rpc.call
346      expect(recvd_call).to_not be_nil
347      expect(recvd_rpc.metadata).to_not be_nil
348      expect(recvd_rpc.metadata['k1']).to eq('v1')
349      expect(recvd_rpc.metadata['k2']).to eq('v2')
350      # finish the call
351      recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => {})
352      call.run_batch(CallOps::RECV_INITIAL_METADATA => nil)
353      send_and_receive_close_and_status(call, recvd_call)
354    end
355  end
356
357  describe '#send_status', send_status: true do
358    it 'works when no metadata or messages have been sent yet' do
359      call = make_test_call
360      ActiveCall.client_invoke(call)
361
362      recvd_rpc = @received_rpcs_queue.pop
363      server_call = ActiveCall.new(
364        recvd_rpc.call,
365        @pass_through,
366        @pass_through,
367        deadline,
368        started: false)
369
370      expect(server_call.metadata_sent).to eq(false)
371      blk = proc { server_call.send_status(OK) }
372      expect { blk.call }.to_not raise_error
373    end
374  end
375
376  describe '#remote_read', remote_read: true do
377    it 'reads the response sent by a server' do
378      call = make_test_call
379      ActiveCall.client_invoke(call)
380      client_call = ActiveCall.new(call, @pass_through,
381                                   @pass_through, deadline)
382      msg = 'message is a string'
383      client_call.remote_send(msg)
384      server_call = expect_server_to_receive(msg)
385      server_call.remote_send('server_response')
386      expect(client_call.remote_read).to eq('server_response')
387      send_and_receive_close_and_status(
388        call, inner_call_of_active_call(server_call))
389    end
390
391    it 'saves no metadata when the server adds no metadata' do
392      call = make_test_call
393      ActiveCall.client_invoke(call)
394      client_call = ActiveCall.new(call, @pass_through,
395                                   @pass_through, deadline)
396      msg = 'message is a string'
397      client_call.remote_send(msg)
398      server_call = expect_server_to_receive(msg)
399      server_call.remote_send('ignore me')
400      expect(client_call.metadata).to be_nil
401      client_call.remote_read
402      expect(client_call.metadata).to eq({})
403      send_and_receive_close_and_status(
404        call, inner_call_of_active_call(server_call))
405    end
406
407    it 'saves metadata add by the server' do
408      call = make_test_call
409      ActiveCall.client_invoke(call)
410      client_call = ActiveCall.new(call, @pass_through,
411                                   @pass_through, deadline)
412      msg = 'message is a string'
413      client_call.remote_send(msg)
414      server_call = expect_server_to_receive(msg, k1: 'v1', k2: 'v2')
415      server_call.remote_send('ignore me')
416      expect(client_call.metadata).to be_nil
417      client_call.remote_read
418      expected = { 'k1' => 'v1', 'k2' => 'v2' }
419      expect(client_call.metadata).to eq(expected)
420      send_and_receive_close_and_status(
421        call, inner_call_of_active_call(server_call))
422    end
423
424    it 'get a status from server when nothing else sent from server' do
425      client_call = make_test_call
426      ActiveCall.client_invoke(client_call)
427
428      recvd_rpc = @received_rpcs_queue.pop
429      recvd_call = recvd_rpc.call
430
431      server_call = ActiveCall.new(
432        recvd_call,
433        @pass_through,
434        @pass_through,
435        deadline,
436        started: false)
437
438      server_call.send_status(OK, 'OK')
439
440      # Check that we can receive initial metadata and a status
441      client_call.run_batch(
442        CallOps::RECV_INITIAL_METADATA => nil)
443      batch_result = client_call.run_batch(
444        CallOps::RECV_STATUS_ON_CLIENT => nil)
445
446      expect(batch_result.status.code).to eq(OK)
447    end
448
449    it 'get a nil msg before a status when an OK status is sent' do
450      call = make_test_call
451      ActiveCall.client_invoke(call)
452      client_call = ActiveCall.new(call, @pass_through,
453                                   @pass_through, deadline)
454      msg = 'message is a string'
455      client_call.remote_send(msg)
456      call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
457      server_call = expect_server_to_receive(msg)
458      server_call.remote_send('server_response')
459      server_call.send_status(OK, 'OK')
460      expect(client_call.remote_read).to eq('server_response')
461      res = client_call.remote_read
462      expect(res).to be_nil
463    end
464
465    it 'unmarshals the response using the unmarshal func' do
466      call = make_test_call
467      ActiveCall.client_invoke(call)
468      unmarshal = proc { |x| 'unmarshalled:' + x }
469      client_call = ActiveCall.new(call, @pass_through,
470                                   unmarshal, deadline)
471
472      # confirm the client receives the unmarshalled message
473      msg = 'message is a string'
474      client_call.remote_send(msg)
475      server_call = expect_server_to_receive(msg)
476      server_call.remote_send('server_response')
477      expect(client_call.remote_read).to eq('unmarshalled:server_response')
478      send_and_receive_close_and_status(
479        call, inner_call_of_active_call(server_call))
480    end
481  end
482
483  describe '#each_remote_read' do
484    it 'creates an Enumerator' do
485      call = make_test_call
486      client_call = ActiveCall.new(call, @pass_through,
487                                   @pass_through, deadline)
488      expect(client_call.each_remote_read).to be_a(Enumerator)
489      # finish the call
490      client_call.cancel
491    end
492
493    it 'the returned enumerator can read n responses' do
494      call = make_test_call
495      ActiveCall.client_invoke(call)
496      client_call = ActiveCall.new(call, @pass_through,
497                                   @pass_through, deadline)
498      msg = 'message is a string'
499      reply = 'server_response'
500      client_call.remote_send(msg)
501      server_call = expect_server_to_receive(msg)
502      e = client_call.each_remote_read
503      n = 3  # arbitrary value > 1
504      n.times do
505        server_call.remote_send(reply)
506        expect(e.next).to eq(reply)
507      end
508      send_and_receive_close_and_status(
509        call, inner_call_of_active_call(server_call))
510    end
511
512    it 'the returns an enumerator that stops after an OK Status' do
513      call = make_test_call
514      ActiveCall.client_invoke(call)
515      client_call = ActiveCall.new(call, @pass_through,
516                                   @pass_through, deadline)
517      msg = 'message is a string'
518      reply = 'server_response'
519      client_call.remote_send(msg)
520      call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
521      server_call = expect_server_to_receive(msg)
522      e = client_call.each_remote_read
523      n = 3 # arbitrary value > 1
524      n.times do
525        server_call.remote_send(reply)
526        expect(e.next).to eq(reply)
527      end
528      server_call.send_status(OK, 'OK', true)
529      expect { e.next }.to raise_error(StopIteration)
530    end
531  end
532
533  describe '#closing the call from the client' do
534    it 'finishes ok if the server sends a status response' do
535      call = make_test_call
536      ActiveCall.client_invoke(call)
537      client_call = ActiveCall.new(call, @pass_through,
538                                   @pass_through, deadline)
539      msg = 'message is a string'
540      client_call.remote_send(msg)
541      expect do
542        call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
543      end.to_not raise_error
544      server_call = expect_server_to_receive(msg)
545      server_call.remote_send('server_response')
546      expect(client_call.remote_read).to eq('server_response')
547      server_call.send_status(OK, 'status code is OK')
548      expect { client_call.receive_and_check_status }.to_not raise_error
549    end
550
551    it 'finishes ok if the server sends an early status response' do
552      call = make_test_call
553      ActiveCall.client_invoke(call)
554      client_call = ActiveCall.new(call, @pass_through,
555                                   @pass_through, deadline)
556      msg = 'message is a string'
557      client_call.remote_send(msg)
558      server_call = expect_server_to_receive(msg)
559      server_call.remote_send('server_response')
560      server_call.send_status(OK, 'status code is OK')
561      expect(client_call.remote_read).to eq('server_response')
562      expect do
563        call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
564      end.to_not raise_error
565      expect { client_call.receive_and_check_status }.to_not raise_error
566    end
567
568    it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do
569      call = make_test_call
570      ActiveCall.client_invoke(call)
571      client_call = ActiveCall.new(call, @pass_through,
572                                   @pass_through, deadline)
573      msg = 'message is a string'
574      client_call.remote_send(msg)
575      server_call = expect_server_to_receive(msg)
576      server_call.remote_send('server_response')
577      server_call.send_status(OK, 'status code is OK')
578      expect(client_call.remote_read).to eq('server_response')
579      expect do
580        call.run_batch(
581          CallOps::SEND_CLOSE_FROM_CLIENT => nil,
582          CallOps::RECV_STATUS_ON_CLIENT => nil)
583      end.to_not raise_error
584    end
585  end
586
587  # Test sending of the initial metadata in #run_server_bidi
588  # from the server handler both implicitly and explicitly.
589  describe '#run_server_bidi metadata sending tests', run_server_bidi: true do
590    before(:each) do
591      @requests = ['first message', 'second message']
592      @server_to_client_metadata = { 'test_key' => 'test_val' }
593      @server_status = OK
594
595      @client_call = make_test_call
596      @client_call.run_batch(CallOps::SEND_INITIAL_METADATA => {})
597
598      recvd_rpc = @received_rpcs_queue.pop
599      recvd_call = recvd_rpc.call
600      @server_call = ActiveCall.new(
601        recvd_call,
602        @pass_through,
603        @pass_through,
604        deadline,
605        metadata_received: true,
606        started: false,
607        metadata_to_send: @server_to_client_metadata)
608    end
609
610    after(:each) do
611      # Send the requests and send a close so the server can send a status
612      @requests.each do |message|
613        @client_call.run_batch(CallOps::SEND_MESSAGE => message)
614      end
615      @client_call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
616
617      @server_thread.join
618
619      # Expect that initial metadata was sent,
620      # the requests were echoed, and a status was sent
621      batch_result = @client_call.run_batch(
622        CallOps::RECV_INITIAL_METADATA => nil)
623      expect(batch_result.metadata).to eq(@server_to_client_metadata)
624
625      @requests.each do |message|
626        batch_result = @client_call.run_batch(
627          CallOps::RECV_MESSAGE => nil)
628        expect(batch_result.message).to eq(message)
629      end
630
631      batch_result = @client_call.run_batch(
632        CallOps::RECV_STATUS_ON_CLIENT => nil)
633      expect(batch_result.status.code).to eq(@server_status)
634    end
635
636    it 'sends the initial metadata implicitly if not already sent' do
637      # Server handler that doesn't have access to a "call"
638      # It echoes the requests
639      fake_gen_each_reply_with_no_call_param = proc do |msgs|
640        msgs
641      end
642
643      int_ctx = GRPC::InterceptionContext.new
644
645      @server_thread = Thread.new do
646        @server_call.run_server_bidi(
647          fake_gen_each_reply_with_no_call_param, int_ctx)
648        @server_call.send_status(@server_status)
649      end
650    end
651
652    it 'sends the metadata when sent explicitly and not already sent' do
653      # Fake server handler that has access to a "call" object and
654      # uses it to explicitly update and send the initial metadata
655      fake_gen_each_reply_with_call_param = proc do |msgs, call_param|
656        call_param.merge_metadata_to_send(@server_to_client_metadata)
657        call_param.send_initial_metadata
658        msgs
659      end
660      int_ctx = GRPC::InterceptionContext.new
661
662      @server_thread = Thread.new do
663        @server_call.run_server_bidi(
664          fake_gen_each_reply_with_call_param, int_ctx)
665        @server_call.send_status(@server_status)
666      end
667    end
668  end
669
670  def expect_server_to_receive(sent_text, **kw)
671    c = expect_server_to_be_invoked(**kw)
672    expect(c.remote_read).to eq(sent_text)
673    c
674  end
675
676  def expect_server_to_be_invoked(**kw)
677    recvd_rpc =  @received_rpcs_queue.pop
678    expect(recvd_rpc).to_not eq nil
679    recvd_call = recvd_rpc.call
680    recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => kw)
681    ActiveCall.new(recvd_call, @pass_through, @pass_through, deadline,
682                   metadata_received: true, started: true)
683  end
684
685  def make_test_call
686    @ch.create_call(nil, nil, '/method', nil, deadline)
687  end
688
689  def deadline
690    Time.now + 2  # in 2 seconds; arbitrary
691  end
692end
693