xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2018 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests of grpc_channelz.v1.channelz."""
15
16from concurrent import futures
17import sys
18import unittest
19
20import grpc
21from grpc_channelz.v1 import channelz
22from grpc_channelz.v1 import channelz_pb2
23from grpc_channelz.v1 import channelz_pb2_grpc
24
25from tests.unit import test_common
26from tests.unit.framework.common import test_constants
27
28_SUCCESSFUL_UNARY_UNARY = "/test/SuccessfulUnaryUnary"
29_FAILED_UNARY_UNARY = "/test/FailedUnaryUnary"
30_SUCCESSFUL_STREAM_STREAM = "/test/SuccessfulStreamStream"
31
32_REQUEST = b"\x00\x00\x00"
33_RESPONSE = b"\x01\x01\x01"
34
35_DISABLE_REUSE_PORT = (("grpc.so_reuseport", 0),)
36_ENABLE_CHANNELZ = (("grpc.enable_channelz", 1),)
37_DISABLE_CHANNELZ = (("grpc.enable_channelz", 0),)
38
39
40def _successful_unary_unary(request, servicer_context):
41    return _RESPONSE
42
43
44def _failed_unary_unary(request, servicer_context):
45    servicer_context.set_code(grpc.StatusCode.INTERNAL)
46    servicer_context.set_details("Channelz Test Intended Failure")
47
48
49def _successful_stream_stream(request_iterator, servicer_context):
50    for _ in request_iterator:
51        yield _RESPONSE
52
53
54class _GenericHandler(grpc.GenericRpcHandler):
55    def service(self, handler_call_details):
56        if handler_call_details.method == _SUCCESSFUL_UNARY_UNARY:
57            return grpc.unary_unary_rpc_method_handler(_successful_unary_unary)
58        elif handler_call_details.method == _FAILED_UNARY_UNARY:
59            return grpc.unary_unary_rpc_method_handler(_failed_unary_unary)
60        elif handler_call_details.method == _SUCCESSFUL_STREAM_STREAM:
61            return grpc.stream_stream_rpc_method_handler(
62                _successful_stream_stream
63            )
64        else:
65            return None
66
67
68class _ChannelServerPair(object):
69    def __init__(self):
70        # Server will enable channelz service
71        self.server = grpc.server(
72            futures.ThreadPoolExecutor(max_workers=3),
73            options=_DISABLE_REUSE_PORT + _ENABLE_CHANNELZ,
74        )
75        port = self.server.add_insecure_port("[::]:0")
76        self.server.add_generic_rpc_handlers((_GenericHandler(),))
77        self.server.start()
78
79        # Channel will enable channelz service...
80        self.channel = grpc.insecure_channel(
81            "localhost:%d" % port, _ENABLE_CHANNELZ
82        )
83
84
85def _generate_channel_server_pairs(n):
86    return [_ChannelServerPair() for i in range(n)]
87
88
89def _close_channel_server_pairs(pairs):
90    for pair in pairs:
91        pair.server.stop(None)
92        pair.channel.close()
93
94
95@unittest.skipIf(
96    sys.version_info[0] < 3, "ProtoBuf descriptor has moved on from Python2"
97)
98class ChannelzServicerTest(unittest.TestCase):
99    def _send_successful_unary_unary(self, idx):
100        _, r = (
101            self._pairs[idx]
102            .channel.unary_unary(
103                _SUCCESSFUL_UNARY_UNARY,
104                _registered_method=True,
105            )
106            .with_call(_REQUEST)
107        )
108        self.assertEqual(r.code(), grpc.StatusCode.OK)
109
110    def _send_failed_unary_unary(self, idx):
111        try:
112            self._pairs[idx].channel.unary_unary(
113                _FAILED_UNARY_UNARY,
114                _registered_method=True,
115            ).with_call(_REQUEST)
116        except grpc.RpcError:
117            return
118        else:
119            self.fail("This call supposed to fail")
120
121    def _send_successful_stream_stream(self, idx):
122        response_iterator = (
123            self._pairs[idx]
124            .channel.stream_stream(
125                _SUCCESSFUL_STREAM_STREAM,
126                _registered_method=True,
127            )
128            .__call__(iter([_REQUEST] * test_constants.STREAM_LENGTH))
129        )
130        cnt = 0
131        for _ in response_iterator:
132            cnt += 1
133        self.assertEqual(cnt, test_constants.STREAM_LENGTH)
134
135    def _get_channel_id(self, idx):
136        """Channel id may not be consecutive"""
137        resp = self._channelz_stub.GetTopChannels(
138            channelz_pb2.GetTopChannelsRequest(start_channel_id=0)
139        )
140        self.assertGreater(len(resp.channel), idx)
141        return resp.channel[idx].ref.channel_id
142
143    def setUp(self):
144        self._pairs = []
145        # This server is for Channelz info fetching only
146        # It self should not enable Channelz
147        self._server = grpc.server(
148            futures.ThreadPoolExecutor(max_workers=3),
149            options=_DISABLE_REUSE_PORT + _DISABLE_CHANNELZ,
150        )
151        port = self._server.add_insecure_port("[::]:0")
152        channelz.add_channelz_servicer(self._server)
153        self._server.start()
154
155        # This channel is used to fetch Channelz info only
156        # Channelz should not be enabled
157        self._channel = grpc.insecure_channel(
158            "localhost:%d" % port, _DISABLE_CHANNELZ
159        )
160        self._channelz_stub = channelz_pb2_grpc.ChannelzStub(self._channel)
161
162    def tearDown(self):
163        self._server.stop(None)
164        self._channel.close()
165        _close_channel_server_pairs(self._pairs)
166
167    def test_get_top_channels_basic(self):
168        self._pairs = _generate_channel_server_pairs(1)
169        resp = self._channelz_stub.GetTopChannels(
170            channelz_pb2.GetTopChannelsRequest(start_channel_id=0)
171        )
172        self.assertEqual(len(resp.channel), 1)
173        self.assertEqual(resp.end, True)
174
175    def test_get_top_channels_high_start_id(self):
176        self._pairs = _generate_channel_server_pairs(1)
177        resp = self._channelz_stub.GetTopChannels(
178            channelz_pb2.GetTopChannelsRequest(start_channel_id=10000)
179        )
180        self.assertEqual(len(resp.channel), 0)
181        self.assertEqual(resp.end, True)
182
183    def test_successful_request(self):
184        self._pairs = _generate_channel_server_pairs(1)
185        self._send_successful_unary_unary(0)
186        resp = self._channelz_stub.GetChannel(
187            channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))
188        )
189        self.assertEqual(resp.channel.data.calls_started, 1)
190        self.assertEqual(resp.channel.data.calls_succeeded, 1)
191        self.assertEqual(resp.channel.data.calls_failed, 0)
192
193    def test_failed_request(self):
194        self._pairs = _generate_channel_server_pairs(1)
195        self._send_failed_unary_unary(0)
196        resp = self._channelz_stub.GetChannel(
197            channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))
198        )
199        self.assertEqual(resp.channel.data.calls_started, 1)
200        self.assertEqual(resp.channel.data.calls_succeeded, 0)
201        self.assertEqual(resp.channel.data.calls_failed, 1)
202
203    def test_many_requests(self):
204        self._pairs = _generate_channel_server_pairs(1)
205        k_success = 7
206        k_failed = 9
207        for i in range(k_success):
208            self._send_successful_unary_unary(0)
209        for i in range(k_failed):
210            self._send_failed_unary_unary(0)
211        resp = self._channelz_stub.GetChannel(
212            channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))
213        )
214        self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
215        self.assertEqual(resp.channel.data.calls_succeeded, k_success)
216        self.assertEqual(resp.channel.data.calls_failed, k_failed)
217
218    def test_many_channel(self):
219        k_channels = 4
220        self._pairs = _generate_channel_server_pairs(k_channels)
221        resp = self._channelz_stub.GetTopChannels(
222            channelz_pb2.GetTopChannelsRequest(start_channel_id=0)
223        )
224        self.assertEqual(len(resp.channel), k_channels)
225
226    def test_many_requests_many_channel(self):
227        k_channels = 4
228        self._pairs = _generate_channel_server_pairs(k_channels)
229        k_success = 11
230        k_failed = 13
231        for i in range(k_success):
232            self._send_successful_unary_unary(0)
233            self._send_successful_unary_unary(2)
234        for i in range(k_failed):
235            self._send_failed_unary_unary(1)
236            self._send_failed_unary_unary(2)
237
238        # The first channel saw only successes
239        resp = self._channelz_stub.GetChannel(
240            channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))
241        )
242        self.assertEqual(resp.channel.data.calls_started, k_success)
243        self.assertEqual(resp.channel.data.calls_succeeded, k_success)
244        self.assertEqual(resp.channel.data.calls_failed, 0)
245
246        # The second channel saw only failures
247        resp = self._channelz_stub.GetChannel(
248            channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(1))
249        )
250        self.assertEqual(resp.channel.data.calls_started, k_failed)
251        self.assertEqual(resp.channel.data.calls_succeeded, 0)
252        self.assertEqual(resp.channel.data.calls_failed, k_failed)
253
254        # The third channel saw both successes and failures
255        resp = self._channelz_stub.GetChannel(
256            channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(2))
257        )
258        self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
259        self.assertEqual(resp.channel.data.calls_succeeded, k_success)
260        self.assertEqual(resp.channel.data.calls_failed, k_failed)
261
262        # The fourth channel saw nothing
263        resp = self._channelz_stub.GetChannel(
264            channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(3))
265        )
266        self.assertEqual(resp.channel.data.calls_started, 0)
267        self.assertEqual(resp.channel.data.calls_succeeded, 0)
268        self.assertEqual(resp.channel.data.calls_failed, 0)
269
270    def test_many_subchannels(self):
271        k_channels = 4
272        self._pairs = _generate_channel_server_pairs(k_channels)
273        k_success = 17
274        k_failed = 19
275        for i in range(k_success):
276            self._send_successful_unary_unary(0)
277            self._send_successful_unary_unary(2)
278        for i in range(k_failed):
279            self._send_failed_unary_unary(1)
280            self._send_failed_unary_unary(2)
281
282        gtc_resp = self._channelz_stub.GetTopChannels(
283            channelz_pb2.GetTopChannelsRequest(start_channel_id=0)
284        )
285        self.assertEqual(len(gtc_resp.channel), k_channels)
286        for i in range(k_channels):
287            # If no call performed in the channel, there shouldn't be any subchannel
288            if gtc_resp.channel[i].data.calls_started == 0:
289                self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0)
290                continue
291
292            # Otherwise, the subchannel should exist
293            self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0)
294            gsc_resp = self._channelz_stub.GetSubchannel(
295                channelz_pb2.GetSubchannelRequest(
296                    subchannel_id=gtc_resp.channel[i]
297                    .subchannel_ref[0]
298                    .subchannel_id
299                )
300            )
301            self.assertEqual(
302                gtc_resp.channel[i].data.calls_started,
303                gsc_resp.subchannel.data.calls_started,
304            )
305            self.assertEqual(
306                gtc_resp.channel[i].data.calls_succeeded,
307                gsc_resp.subchannel.data.calls_succeeded,
308            )
309            self.assertEqual(
310                gtc_resp.channel[i].data.calls_failed,
311                gsc_resp.subchannel.data.calls_failed,
312            )
313
314    def test_server_basic(self):
315        self._pairs = _generate_channel_server_pairs(1)
316        resp = self._channelz_stub.GetServers(
317            channelz_pb2.GetServersRequest(start_server_id=0)
318        )
319        self.assertEqual(len(resp.server), 1)
320
321    def test_get_one_server(self):
322        self._pairs = _generate_channel_server_pairs(1)
323        gss_resp = self._channelz_stub.GetServers(
324            channelz_pb2.GetServersRequest(start_server_id=0)
325        )
326        self.assertEqual(len(gss_resp.server), 1)
327        gs_resp = self._channelz_stub.GetServer(
328            channelz_pb2.GetServerRequest(
329                server_id=gss_resp.server[0].ref.server_id
330            )
331        )
332        self.assertEqual(
333            gss_resp.server[0].ref.server_id, gs_resp.server.ref.server_id
334        )
335
336    def test_server_call(self):
337        self._pairs = _generate_channel_server_pairs(1)
338        k_success = 23
339        k_failed = 29
340        for i in range(k_success):
341            self._send_successful_unary_unary(0)
342        for i in range(k_failed):
343            self._send_failed_unary_unary(0)
344
345        resp = self._channelz_stub.GetServers(
346            channelz_pb2.GetServersRequest(start_server_id=0)
347        )
348        self.assertEqual(len(resp.server), 1)
349        self.assertEqual(
350            resp.server[0].data.calls_started, k_success + k_failed
351        )
352        self.assertEqual(resp.server[0].data.calls_succeeded, k_success)
353        self.assertEqual(resp.server[0].data.calls_failed, k_failed)
354
355    def test_many_subchannels_and_sockets(self):
356        k_channels = 4
357        self._pairs = _generate_channel_server_pairs(k_channels)
358        k_success = 3
359        k_failed = 5
360        for i in range(k_success):
361            self._send_successful_unary_unary(0)
362            self._send_successful_unary_unary(2)
363        for i in range(k_failed):
364            self._send_failed_unary_unary(1)
365            self._send_failed_unary_unary(2)
366
367        gtc_resp = self._channelz_stub.GetTopChannels(
368            channelz_pb2.GetTopChannelsRequest(start_channel_id=0)
369        )
370        self.assertEqual(len(gtc_resp.channel), k_channels)
371        for i in range(k_channels):
372            # If no call performed in the channel, there shouldn't be any subchannel
373            if gtc_resp.channel[i].data.calls_started == 0:
374                self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0)
375                continue
376
377            # Otherwise, the subchannel should exist
378            self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0)
379            gsc_resp = self._channelz_stub.GetSubchannel(
380                channelz_pb2.GetSubchannelRequest(
381                    subchannel_id=gtc_resp.channel[i]
382                    .subchannel_ref[0]
383                    .subchannel_id
384                )
385            )
386            self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1)
387
388            gs_resp = self._channelz_stub.GetSocket(
389                channelz_pb2.GetSocketRequest(
390                    socket_id=gsc_resp.subchannel.socket_ref[0].socket_id
391                )
392            )
393            self.assertEqual(
394                gsc_resp.subchannel.data.calls_started,
395                gs_resp.socket.data.streams_started,
396            )
397            self.assertEqual(
398                gsc_resp.subchannel.data.calls_started,
399                gs_resp.socket.data.streams_succeeded,
400            )
401            # Calls started == messages sent, only valid for unary calls
402            self.assertEqual(
403                gsc_resp.subchannel.data.calls_started,
404                gs_resp.socket.data.messages_sent,
405            )
406            # Only receive responses when the RPC was successful
407            self.assertEqual(
408                gsc_resp.subchannel.data.calls_succeeded,
409                gs_resp.socket.data.messages_received,
410            )
411
412            if gs_resp.socket.remote.HasField("tcpip_address"):
413                address = gs_resp.socket.remote.tcpip_address.ip_address
414                self.assertTrue(
415                    len(address) == 4 or len(address) == 16, address
416                )
417            if gs_resp.socket.local.HasField("tcpip_address"):
418                address = gs_resp.socket.local.tcpip_address.ip_address
419                self.assertTrue(
420                    len(address) == 4 or len(address) == 16, address
421                )
422
423    def test_streaming_rpc(self):
424        self._pairs = _generate_channel_server_pairs(1)
425        # In C++, the argument for _send_successful_stream_stream is message length.
426        # Here the argument is still channel idx, to be consistent with the other two.
427        self._send_successful_stream_stream(0)
428
429        gc_resp = self._channelz_stub.GetChannel(
430            channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))
431        )
432        self.assertEqual(gc_resp.channel.data.calls_started, 1)
433        self.assertEqual(gc_resp.channel.data.calls_succeeded, 1)
434        self.assertEqual(gc_resp.channel.data.calls_failed, 0)
435        # Subchannel exists
436        self.assertGreater(len(gc_resp.channel.subchannel_ref), 0)
437
438        while True:
439            gsc_resp = self._channelz_stub.GetSubchannel(
440                channelz_pb2.GetSubchannelRequest(
441                    subchannel_id=gc_resp.channel.subchannel_ref[
442                        0
443                    ].subchannel_id
444                )
445            )
446            if (
447                gsc_resp.subchannel.data.calls_started
448                == gsc_resp.subchannel.data.calls_succeeded
449                + gsc_resp.subchannel.data.calls_failed
450            ):
451                break
452        self.assertEqual(gsc_resp.subchannel.data.calls_started, 1)
453        self.assertEqual(gsc_resp.subchannel.data.calls_failed, 0)
454        self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, 1)
455        # Socket exists
456        self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1)
457
458        while True:
459            gs_resp = self._channelz_stub.GetSocket(
460                channelz_pb2.GetSocketRequest(
461                    socket_id=gsc_resp.subchannel.socket_ref[0].socket_id
462                )
463            )
464            if (
465                gs_resp.socket.data.streams_started
466                == gs_resp.socket.data.streams_succeeded
467                + gs_resp.socket.data.streams_failed
468            ):
469                break
470        self.assertEqual(gs_resp.socket.data.streams_started, 1)
471        self.assertEqual(gs_resp.socket.data.streams_succeeded, 1)
472        self.assertEqual(gs_resp.socket.data.streams_failed, 0)
473        self.assertEqual(
474            gs_resp.socket.data.messages_sent, test_constants.STREAM_LENGTH
475        )
476        self.assertEqual(
477            gs_resp.socket.data.messages_received, test_constants.STREAM_LENGTH
478        )
479
480    def test_server_sockets(self):
481        self._pairs = _generate_channel_server_pairs(1)
482        self._send_successful_unary_unary(0)
483        self._send_failed_unary_unary(0)
484
485        gs_resp = self._channelz_stub.GetServers(
486            channelz_pb2.GetServersRequest(start_server_id=0)
487        )
488        self.assertEqual(len(gs_resp.server), 1)
489        self.assertEqual(gs_resp.server[0].data.calls_started, 2)
490        self.assertEqual(gs_resp.server[0].data.calls_succeeded, 1)
491        self.assertEqual(gs_resp.server[0].data.calls_failed, 1)
492
493        gss_resp = self._channelz_stub.GetServerSockets(
494            channelz_pb2.GetServerSocketsRequest(
495                server_id=gs_resp.server[0].ref.server_id, start_socket_id=0
496            )
497        )
498        # If the RPC call failed, it will raise a grpc.RpcError
499        # So, if there is no exception raised, considered pass
500
501    def test_server_listen_sockets(self):
502        self._pairs = _generate_channel_server_pairs(1)
503
504        gss_resp = self._channelz_stub.GetServers(
505            channelz_pb2.GetServersRequest(start_server_id=0)
506        )
507        self.assertEqual(len(gss_resp.server), 1)
508        self.assertEqual(len(gss_resp.server[0].listen_socket), 1)
509
510        gs_resp = self._channelz_stub.GetSocket(
511            channelz_pb2.GetSocketRequest(
512                socket_id=gss_resp.server[0].listen_socket[0].socket_id
513            )
514        )
515
516        # If the RPC call failed, it will raise a grpc.RpcError
517        # So, if there is no exception raised, considered pass
518
519    def test_invalid_query_get_server(self):
520        try:
521            self._channelz_stub.GetServer(
522                channelz_pb2.GetServerRequest(server_id=10000)
523            )
524        except BaseException as e:
525            self.assertIn("StatusCode.NOT_FOUND", str(e))
526        else:
527            self.fail("Invalid query not detected")
528
529    def test_invalid_query_get_channel(self):
530        try:
531            self._channelz_stub.GetChannel(
532                channelz_pb2.GetChannelRequest(channel_id=10000)
533            )
534        except BaseException as e:
535            self.assertIn("StatusCode.NOT_FOUND", str(e))
536        else:
537            self.fail("Invalid query not detected")
538
539    def test_invalid_query_get_subchannel(self):
540        try:
541            self._channelz_stub.GetSubchannel(
542                channelz_pb2.GetSubchannelRequest(subchannel_id=10000)
543            )
544        except BaseException as e:
545            self.assertIn("StatusCode.NOT_FOUND", str(e))
546        else:
547            self.fail("Invalid query not detected")
548
549    def test_invalid_query_get_socket(self):
550        try:
551            self._channelz_stub.GetSocket(
552                channelz_pb2.GetSocketRequest(socket_id=10000)
553            )
554        except BaseException as e:
555            self.assertIn("StatusCode.NOT_FOUND", str(e))
556        else:
557            self.fail("Invalid query not detected")
558
559    def test_invalid_query_get_server_sockets(self):
560        try:
561            self._channelz_stub.GetServerSockets(
562                channelz_pb2.GetServerSocketsRequest(
563                    server_id=10000,
564                    start_socket_id=0,
565                )
566            )
567        except BaseException as e:
568            self.assertIn("StatusCode.NOT_FOUND", str(e))
569        else:
570            self.fail("Invalid query not detected")
571
572
573if __name__ == "__main__":
574    unittest.main(verbosity=2)
575