xref: /aosp_15_r20/external/grpc-grpc/src/python/grpcio_tests/tests/admin/admin_test.py (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1# Copyright 2021 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""A test to ensure that admin services are registered correctly."""
15
16from concurrent.futures import ThreadPoolExecutor
17import logging
18import sys
19import unittest
20
21import grpc
22import grpc_admin
23from grpc_channelz.v1 import channelz_pb2
24from grpc_channelz.v1 import channelz_pb2_grpc
25from grpc_csds import csds_pb2
26from grpc_csds import csds_pb2_grpc
27
28
29@unittest.skipIf(
30    sys.version_info[0] < 3, "ProtoBuf descriptor has moved on from Python2"
31)
32class TestAdmin(unittest.TestCase):
33    def setUp(self):
34        self._server = grpc.server(ThreadPoolExecutor())
35        port = self._server.add_insecure_port("localhost:0")
36        grpc_admin.add_admin_servicers(self._server)
37        self._server.start()
38
39        self._channel = grpc.insecure_channel("localhost:%s" % port)
40
41    def tearDown(self):
42        self._channel.close()
43        self._server.stop(0)
44
45    def test_has_csds(self):
46        stub = csds_pb2_grpc.ClientStatusDiscoveryServiceStub(self._channel)
47        resp = stub.FetchClientStatus(csds_pb2.ClientStatusRequest())
48        # No exception raised and the response is valid
49        self.assertEqual(len(resp.config), 0)
50
51    def test_has_channelz(self):
52        stub = channelz_pb2_grpc.ChannelzStub(self._channel)
53        resp = stub.GetTopChannels(channelz_pb2.GetTopChannelsRequest())
54        # No exception raised and the response is valid
55        self.assertGreater(len(resp.channel), 0)
56
57
58if __name__ == "__main__":
59    logging.basicConfig(level=logging.DEBUG)
60    unittest.main(verbosity=2)
61