1# Copyright 2018 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests of grpc_channelz.v1.channelz.""" 15 16from concurrent import futures 17import sys 18import unittest 19 20import grpc 21from grpc_channelz.v1 import channelz 22from grpc_channelz.v1 import channelz_pb2 23from grpc_channelz.v1 import channelz_pb2_grpc 24 25from tests.unit import test_common 26from tests.unit.framework.common import test_constants 27 28_SUCCESSFUL_UNARY_UNARY = "/test/SuccessfulUnaryUnary" 29_FAILED_UNARY_UNARY = "/test/FailedUnaryUnary" 30_SUCCESSFUL_STREAM_STREAM = "/test/SuccessfulStreamStream" 31 32_REQUEST = b"\x00\x00\x00" 33_RESPONSE = b"\x01\x01\x01" 34 35_DISABLE_REUSE_PORT = (("grpc.so_reuseport", 0),) 36_ENABLE_CHANNELZ = (("grpc.enable_channelz", 1),) 37_DISABLE_CHANNELZ = (("grpc.enable_channelz", 0),) 38 39 40def _successful_unary_unary(request, servicer_context): 41 return _RESPONSE 42 43 44def _failed_unary_unary(request, servicer_context): 45 servicer_context.set_code(grpc.StatusCode.INTERNAL) 46 servicer_context.set_details("Channelz Test Intended Failure") 47 48 49def _successful_stream_stream(request_iterator, servicer_context): 50 for _ in request_iterator: 51 yield _RESPONSE 52 53 54class _GenericHandler(grpc.GenericRpcHandler): 55 def service(self, handler_call_details): 56 if handler_call_details.method == _SUCCESSFUL_UNARY_UNARY: 57 return grpc.unary_unary_rpc_method_handler(_successful_unary_unary) 58 elif handler_call_details.method == _FAILED_UNARY_UNARY: 59 return grpc.unary_unary_rpc_method_handler(_failed_unary_unary) 60 elif handler_call_details.method == _SUCCESSFUL_STREAM_STREAM: 61 return grpc.stream_stream_rpc_method_handler( 62 _successful_stream_stream 63 ) 64 else: 65 return None 66 67 68class _ChannelServerPair(object): 69 def __init__(self): 70 # Server will enable channelz service 71 self.server = grpc.server( 72 futures.ThreadPoolExecutor(max_workers=3), 73 options=_DISABLE_REUSE_PORT + _ENABLE_CHANNELZ, 74 ) 75 port = self.server.add_insecure_port("[::]:0") 76 self.server.add_generic_rpc_handlers((_GenericHandler(),)) 77 self.server.start() 78 79 # Channel will enable channelz service... 80 self.channel = grpc.insecure_channel( 81 "localhost:%d" % port, _ENABLE_CHANNELZ 82 ) 83 84 85def _generate_channel_server_pairs(n): 86 return [_ChannelServerPair() for i in range(n)] 87 88 89def _close_channel_server_pairs(pairs): 90 for pair in pairs: 91 pair.server.stop(None) 92 pair.channel.close() 93 94 95@unittest.skipIf( 96 sys.version_info[0] < 3, "ProtoBuf descriptor has moved on from Python2" 97) 98class ChannelzServicerTest(unittest.TestCase): 99 def _send_successful_unary_unary(self, idx): 100 _, r = ( 101 self._pairs[idx] 102 .channel.unary_unary( 103 _SUCCESSFUL_UNARY_UNARY, 104 _registered_method=True, 105 ) 106 .with_call(_REQUEST) 107 ) 108 self.assertEqual(r.code(), grpc.StatusCode.OK) 109 110 def _send_failed_unary_unary(self, idx): 111 try: 112 self._pairs[idx].channel.unary_unary( 113 _FAILED_UNARY_UNARY, 114 _registered_method=True, 115 ).with_call(_REQUEST) 116 except grpc.RpcError: 117 return 118 else: 119 self.fail("This call supposed to fail") 120 121 def _send_successful_stream_stream(self, idx): 122 response_iterator = ( 123 self._pairs[idx] 124 .channel.stream_stream( 125 _SUCCESSFUL_STREAM_STREAM, 126 _registered_method=True, 127 ) 128 .__call__(iter([_REQUEST] * test_constants.STREAM_LENGTH)) 129 ) 130 cnt = 0 131 for _ in response_iterator: 132 cnt += 1 133 self.assertEqual(cnt, test_constants.STREAM_LENGTH) 134 135 def _get_channel_id(self, idx): 136 """Channel id may not be consecutive""" 137 resp = self._channelz_stub.GetTopChannels( 138 channelz_pb2.GetTopChannelsRequest(start_channel_id=0) 139 ) 140 self.assertGreater(len(resp.channel), idx) 141 return resp.channel[idx].ref.channel_id 142 143 def setUp(self): 144 self._pairs = [] 145 # This server is for Channelz info fetching only 146 # It self should not enable Channelz 147 self._server = grpc.server( 148 futures.ThreadPoolExecutor(max_workers=3), 149 options=_DISABLE_REUSE_PORT + _DISABLE_CHANNELZ, 150 ) 151 port = self._server.add_insecure_port("[::]:0") 152 channelz.add_channelz_servicer(self._server) 153 self._server.start() 154 155 # This channel is used to fetch Channelz info only 156 # Channelz should not be enabled 157 self._channel = grpc.insecure_channel( 158 "localhost:%d" % port, _DISABLE_CHANNELZ 159 ) 160 self._channelz_stub = channelz_pb2_grpc.ChannelzStub(self._channel) 161 162 def tearDown(self): 163 self._server.stop(None) 164 self._channel.close() 165 _close_channel_server_pairs(self._pairs) 166 167 def test_get_top_channels_basic(self): 168 self._pairs = _generate_channel_server_pairs(1) 169 resp = self._channelz_stub.GetTopChannels( 170 channelz_pb2.GetTopChannelsRequest(start_channel_id=0) 171 ) 172 self.assertEqual(len(resp.channel), 1) 173 self.assertEqual(resp.end, True) 174 175 def test_get_top_channels_high_start_id(self): 176 self._pairs = _generate_channel_server_pairs(1) 177 resp = self._channelz_stub.GetTopChannels( 178 channelz_pb2.GetTopChannelsRequest(start_channel_id=10000) 179 ) 180 self.assertEqual(len(resp.channel), 0) 181 self.assertEqual(resp.end, True) 182 183 def test_successful_request(self): 184 self._pairs = _generate_channel_server_pairs(1) 185 self._send_successful_unary_unary(0) 186 resp = self._channelz_stub.GetChannel( 187 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)) 188 ) 189 self.assertEqual(resp.channel.data.calls_started, 1) 190 self.assertEqual(resp.channel.data.calls_succeeded, 1) 191 self.assertEqual(resp.channel.data.calls_failed, 0) 192 193 def test_failed_request(self): 194 self._pairs = _generate_channel_server_pairs(1) 195 self._send_failed_unary_unary(0) 196 resp = self._channelz_stub.GetChannel( 197 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)) 198 ) 199 self.assertEqual(resp.channel.data.calls_started, 1) 200 self.assertEqual(resp.channel.data.calls_succeeded, 0) 201 self.assertEqual(resp.channel.data.calls_failed, 1) 202 203 def test_many_requests(self): 204 self._pairs = _generate_channel_server_pairs(1) 205 k_success = 7 206 k_failed = 9 207 for i in range(k_success): 208 self._send_successful_unary_unary(0) 209 for i in range(k_failed): 210 self._send_failed_unary_unary(0) 211 resp = self._channelz_stub.GetChannel( 212 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)) 213 ) 214 self.assertEqual(resp.channel.data.calls_started, k_success + k_failed) 215 self.assertEqual(resp.channel.data.calls_succeeded, k_success) 216 self.assertEqual(resp.channel.data.calls_failed, k_failed) 217 218 def test_many_channel(self): 219 k_channels = 4 220 self._pairs = _generate_channel_server_pairs(k_channels) 221 resp = self._channelz_stub.GetTopChannels( 222 channelz_pb2.GetTopChannelsRequest(start_channel_id=0) 223 ) 224 self.assertEqual(len(resp.channel), k_channels) 225 226 def test_many_requests_many_channel(self): 227 k_channels = 4 228 self._pairs = _generate_channel_server_pairs(k_channels) 229 k_success = 11 230 k_failed = 13 231 for i in range(k_success): 232 self._send_successful_unary_unary(0) 233 self._send_successful_unary_unary(2) 234 for i in range(k_failed): 235 self._send_failed_unary_unary(1) 236 self._send_failed_unary_unary(2) 237 238 # The first channel saw only successes 239 resp = self._channelz_stub.GetChannel( 240 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)) 241 ) 242 self.assertEqual(resp.channel.data.calls_started, k_success) 243 self.assertEqual(resp.channel.data.calls_succeeded, k_success) 244 self.assertEqual(resp.channel.data.calls_failed, 0) 245 246 # The second channel saw only failures 247 resp = self._channelz_stub.GetChannel( 248 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(1)) 249 ) 250 self.assertEqual(resp.channel.data.calls_started, k_failed) 251 self.assertEqual(resp.channel.data.calls_succeeded, 0) 252 self.assertEqual(resp.channel.data.calls_failed, k_failed) 253 254 # The third channel saw both successes and failures 255 resp = self._channelz_stub.GetChannel( 256 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(2)) 257 ) 258 self.assertEqual(resp.channel.data.calls_started, k_success + k_failed) 259 self.assertEqual(resp.channel.data.calls_succeeded, k_success) 260 self.assertEqual(resp.channel.data.calls_failed, k_failed) 261 262 # The fourth channel saw nothing 263 resp = self._channelz_stub.GetChannel( 264 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(3)) 265 ) 266 self.assertEqual(resp.channel.data.calls_started, 0) 267 self.assertEqual(resp.channel.data.calls_succeeded, 0) 268 self.assertEqual(resp.channel.data.calls_failed, 0) 269 270 def test_many_subchannels(self): 271 k_channels = 4 272 self._pairs = _generate_channel_server_pairs(k_channels) 273 k_success = 17 274 k_failed = 19 275 for i in range(k_success): 276 self._send_successful_unary_unary(0) 277 self._send_successful_unary_unary(2) 278 for i in range(k_failed): 279 self._send_failed_unary_unary(1) 280 self._send_failed_unary_unary(2) 281 282 gtc_resp = self._channelz_stub.GetTopChannels( 283 channelz_pb2.GetTopChannelsRequest(start_channel_id=0) 284 ) 285 self.assertEqual(len(gtc_resp.channel), k_channels) 286 for i in range(k_channels): 287 # If no call performed in the channel, there shouldn't be any subchannel 288 if gtc_resp.channel[i].data.calls_started == 0: 289 self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0) 290 continue 291 292 # Otherwise, the subchannel should exist 293 self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0) 294 gsc_resp = self._channelz_stub.GetSubchannel( 295 channelz_pb2.GetSubchannelRequest( 296 subchannel_id=gtc_resp.channel[i] 297 .subchannel_ref[0] 298 .subchannel_id 299 ) 300 ) 301 self.assertEqual( 302 gtc_resp.channel[i].data.calls_started, 303 gsc_resp.subchannel.data.calls_started, 304 ) 305 self.assertEqual( 306 gtc_resp.channel[i].data.calls_succeeded, 307 gsc_resp.subchannel.data.calls_succeeded, 308 ) 309 self.assertEqual( 310 gtc_resp.channel[i].data.calls_failed, 311 gsc_resp.subchannel.data.calls_failed, 312 ) 313 314 def test_server_basic(self): 315 self._pairs = _generate_channel_server_pairs(1) 316 resp = self._channelz_stub.GetServers( 317 channelz_pb2.GetServersRequest(start_server_id=0) 318 ) 319 self.assertEqual(len(resp.server), 1) 320 321 def test_get_one_server(self): 322 self._pairs = _generate_channel_server_pairs(1) 323 gss_resp = self._channelz_stub.GetServers( 324 channelz_pb2.GetServersRequest(start_server_id=0) 325 ) 326 self.assertEqual(len(gss_resp.server), 1) 327 gs_resp = self._channelz_stub.GetServer( 328 channelz_pb2.GetServerRequest( 329 server_id=gss_resp.server[0].ref.server_id 330 ) 331 ) 332 self.assertEqual( 333 gss_resp.server[0].ref.server_id, gs_resp.server.ref.server_id 334 ) 335 336 def test_server_call(self): 337 self._pairs = _generate_channel_server_pairs(1) 338 k_success = 23 339 k_failed = 29 340 for i in range(k_success): 341 self._send_successful_unary_unary(0) 342 for i in range(k_failed): 343 self._send_failed_unary_unary(0) 344 345 resp = self._channelz_stub.GetServers( 346 channelz_pb2.GetServersRequest(start_server_id=0) 347 ) 348 self.assertEqual(len(resp.server), 1) 349 self.assertEqual( 350 resp.server[0].data.calls_started, k_success + k_failed 351 ) 352 self.assertEqual(resp.server[0].data.calls_succeeded, k_success) 353 self.assertEqual(resp.server[0].data.calls_failed, k_failed) 354 355 def test_many_subchannels_and_sockets(self): 356 k_channels = 4 357 self._pairs = _generate_channel_server_pairs(k_channels) 358 k_success = 3 359 k_failed = 5 360 for i in range(k_success): 361 self._send_successful_unary_unary(0) 362 self._send_successful_unary_unary(2) 363 for i in range(k_failed): 364 self._send_failed_unary_unary(1) 365 self._send_failed_unary_unary(2) 366 367 gtc_resp = self._channelz_stub.GetTopChannels( 368 channelz_pb2.GetTopChannelsRequest(start_channel_id=0) 369 ) 370 self.assertEqual(len(gtc_resp.channel), k_channels) 371 for i in range(k_channels): 372 # If no call performed in the channel, there shouldn't be any subchannel 373 if gtc_resp.channel[i].data.calls_started == 0: 374 self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0) 375 continue 376 377 # Otherwise, the subchannel should exist 378 self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0) 379 gsc_resp = self._channelz_stub.GetSubchannel( 380 channelz_pb2.GetSubchannelRequest( 381 subchannel_id=gtc_resp.channel[i] 382 .subchannel_ref[0] 383 .subchannel_id 384 ) 385 ) 386 self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1) 387 388 gs_resp = self._channelz_stub.GetSocket( 389 channelz_pb2.GetSocketRequest( 390 socket_id=gsc_resp.subchannel.socket_ref[0].socket_id 391 ) 392 ) 393 self.assertEqual( 394 gsc_resp.subchannel.data.calls_started, 395 gs_resp.socket.data.streams_started, 396 ) 397 self.assertEqual( 398 gsc_resp.subchannel.data.calls_started, 399 gs_resp.socket.data.streams_succeeded, 400 ) 401 # Calls started == messages sent, only valid for unary calls 402 self.assertEqual( 403 gsc_resp.subchannel.data.calls_started, 404 gs_resp.socket.data.messages_sent, 405 ) 406 # Only receive responses when the RPC was successful 407 self.assertEqual( 408 gsc_resp.subchannel.data.calls_succeeded, 409 gs_resp.socket.data.messages_received, 410 ) 411 412 if gs_resp.socket.remote.HasField("tcpip_address"): 413 address = gs_resp.socket.remote.tcpip_address.ip_address 414 self.assertTrue( 415 len(address) == 4 or len(address) == 16, address 416 ) 417 if gs_resp.socket.local.HasField("tcpip_address"): 418 address = gs_resp.socket.local.tcpip_address.ip_address 419 self.assertTrue( 420 len(address) == 4 or len(address) == 16, address 421 ) 422 423 def test_streaming_rpc(self): 424 self._pairs = _generate_channel_server_pairs(1) 425 # In C++, the argument for _send_successful_stream_stream is message length. 426 # Here the argument is still channel idx, to be consistent with the other two. 427 self._send_successful_stream_stream(0) 428 429 gc_resp = self._channelz_stub.GetChannel( 430 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)) 431 ) 432 self.assertEqual(gc_resp.channel.data.calls_started, 1) 433 self.assertEqual(gc_resp.channel.data.calls_succeeded, 1) 434 self.assertEqual(gc_resp.channel.data.calls_failed, 0) 435 # Subchannel exists 436 self.assertGreater(len(gc_resp.channel.subchannel_ref), 0) 437 438 while True: 439 gsc_resp = self._channelz_stub.GetSubchannel( 440 channelz_pb2.GetSubchannelRequest( 441 subchannel_id=gc_resp.channel.subchannel_ref[ 442 0 443 ].subchannel_id 444 ) 445 ) 446 if ( 447 gsc_resp.subchannel.data.calls_started 448 == gsc_resp.subchannel.data.calls_succeeded 449 + gsc_resp.subchannel.data.calls_failed 450 ): 451 break 452 self.assertEqual(gsc_resp.subchannel.data.calls_started, 1) 453 self.assertEqual(gsc_resp.subchannel.data.calls_failed, 0) 454 self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, 1) 455 # Socket exists 456 self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1) 457 458 while True: 459 gs_resp = self._channelz_stub.GetSocket( 460 channelz_pb2.GetSocketRequest( 461 socket_id=gsc_resp.subchannel.socket_ref[0].socket_id 462 ) 463 ) 464 if ( 465 gs_resp.socket.data.streams_started 466 == gs_resp.socket.data.streams_succeeded 467 + gs_resp.socket.data.streams_failed 468 ): 469 break 470 self.assertEqual(gs_resp.socket.data.streams_started, 1) 471 self.assertEqual(gs_resp.socket.data.streams_succeeded, 1) 472 self.assertEqual(gs_resp.socket.data.streams_failed, 0) 473 self.assertEqual( 474 gs_resp.socket.data.messages_sent, test_constants.STREAM_LENGTH 475 ) 476 self.assertEqual( 477 gs_resp.socket.data.messages_received, test_constants.STREAM_LENGTH 478 ) 479 480 def test_server_sockets(self): 481 self._pairs = _generate_channel_server_pairs(1) 482 self._send_successful_unary_unary(0) 483 self._send_failed_unary_unary(0) 484 485 gs_resp = self._channelz_stub.GetServers( 486 channelz_pb2.GetServersRequest(start_server_id=0) 487 ) 488 self.assertEqual(len(gs_resp.server), 1) 489 self.assertEqual(gs_resp.server[0].data.calls_started, 2) 490 self.assertEqual(gs_resp.server[0].data.calls_succeeded, 1) 491 self.assertEqual(gs_resp.server[0].data.calls_failed, 1) 492 493 gss_resp = self._channelz_stub.GetServerSockets( 494 channelz_pb2.GetServerSocketsRequest( 495 server_id=gs_resp.server[0].ref.server_id, start_socket_id=0 496 ) 497 ) 498 # If the RPC call failed, it will raise a grpc.RpcError 499 # So, if there is no exception raised, considered pass 500 501 def test_server_listen_sockets(self): 502 self._pairs = _generate_channel_server_pairs(1) 503 504 gss_resp = self._channelz_stub.GetServers( 505 channelz_pb2.GetServersRequest(start_server_id=0) 506 ) 507 self.assertEqual(len(gss_resp.server), 1) 508 self.assertEqual(len(gss_resp.server[0].listen_socket), 1) 509 510 gs_resp = self._channelz_stub.GetSocket( 511 channelz_pb2.GetSocketRequest( 512 socket_id=gss_resp.server[0].listen_socket[0].socket_id 513 ) 514 ) 515 516 # If the RPC call failed, it will raise a grpc.RpcError 517 # So, if there is no exception raised, considered pass 518 519 def test_invalid_query_get_server(self): 520 try: 521 self._channelz_stub.GetServer( 522 channelz_pb2.GetServerRequest(server_id=10000) 523 ) 524 except BaseException as e: 525 self.assertIn("StatusCode.NOT_FOUND", str(e)) 526 else: 527 self.fail("Invalid query not detected") 528 529 def test_invalid_query_get_channel(self): 530 try: 531 self._channelz_stub.GetChannel( 532 channelz_pb2.GetChannelRequest(channel_id=10000) 533 ) 534 except BaseException as e: 535 self.assertIn("StatusCode.NOT_FOUND", str(e)) 536 else: 537 self.fail("Invalid query not detected") 538 539 def test_invalid_query_get_subchannel(self): 540 try: 541 self._channelz_stub.GetSubchannel( 542 channelz_pb2.GetSubchannelRequest(subchannel_id=10000) 543 ) 544 except BaseException as e: 545 self.assertIn("StatusCode.NOT_FOUND", str(e)) 546 else: 547 self.fail("Invalid query not detected") 548 549 def test_invalid_query_get_socket(self): 550 try: 551 self._channelz_stub.GetSocket( 552 channelz_pb2.GetSocketRequest(socket_id=10000) 553 ) 554 except BaseException as e: 555 self.assertIn("StatusCode.NOT_FOUND", str(e)) 556 else: 557 self.fail("Invalid query not detected") 558 559 def test_invalid_query_get_server_sockets(self): 560 try: 561 self._channelz_stub.GetServerSockets( 562 channelz_pb2.GetServerSocketsRequest( 563 server_id=10000, 564 start_socket_id=0, 565 ) 566 ) 567 except BaseException as e: 568 self.assertIn("StatusCode.NOT_FOUND", str(e)) 569 else: 570 self.fail("Invalid query not detected") 571 572 573if __name__ == "__main__": 574 unittest.main(verbosity=2) 575