1#!/usr/bin/env python3 2# Copyright 2022 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Tests for retreiving and parsing metrics.""" 16from unittest import TestCase, mock, main 17from pw_metric.metric_parser import parse_metrics 18 19from pw_metric_proto import metric_service_pb2 20from pw_status import Status 21from pw_tokenizer import detokenize, tokens 22 23DATABASE = tokens.Database( 24 [ 25 tokens.TokenizedStringEntry(0x01148A48, "total_dropped"), 26 tokens.TokenizedStringEntry(0x03796798, "min_queue_remaining"), 27 tokens.TokenizedStringEntry(0x22198280, "total_created"), 28 tokens.TokenizedStringEntry(0x534A42F4, "max_queue_used"), 29 tokens.TokenizedStringEntry(0x5D087463, "pw::work_queue::WorkQueue"), 30 tokens.TokenizedStringEntry(0xA7C43965, "log"), 31 ] 32) 33 34 35class TestParseMetrics(TestCase): 36 """Test parsing metrics received from RPCs""" 37 38 def setUp(self) -> None: 39 """Creating detokenizer and mocking RPC.""" 40 self.detokenize = detokenize.Detokenizer(DATABASE) 41 self.rpc_timeout_s = 1 42 self.rpcs = mock.Mock() 43 self.rpcs.pw = mock.Mock() 44 self.rpcs.pw.metric = mock.Mock() 45 self.rpcs.pw.metric.proto = mock.Mock() 46 self.rpcs.pw.metric.proto.MetricService = mock.Mock() 47 self.rpcs.pw.metric.proto.MetricService.Get = mock.Mock() 48 self.rpcs.pw.metric.proto.MetricService.Get.return_value = mock.Mock() 49 self.rpcs.pw.metric.proto.MetricService.Get.return_value.status = ( 50 Status.OK 51 ) 52 # Creating a group and metric name for better identification. 53 self.log = 0xA7C43965 54 self.total_created = 0x22198280 55 self.total_dropped = 0x01148A48 56 self.min_queue_remaining = 0x03796798 57 self.metric = [ 58 metric_service_pb2.Metric( 59 token_path=[self.log, self.total_created], 60 string_path='N/A', 61 as_float=3.0, 62 ), 63 metric_service_pb2.Metric( 64 token_path=[self.log, self.total_dropped], 65 string_path='N/A', 66 as_float=4.0, 67 ), 68 ] 69 70 def test_invalid_detokenizer(self) -> None: 71 """Test invalid detokenizer was supplied.""" 72 self.assertEqual( 73 {}, 74 parse_metrics(self.rpcs, None, self.rpc_timeout_s), 75 msg='Valid detokenizer.', 76 ) 77 78 def test_bad_stream_status(self) -> None: 79 """Test stream response has a status other than OK.""" 80 self.rpcs.pw.metric.proto.MetricService.Get.return_value.status = ( 81 Status.ABORTED 82 ) 83 self.assertEqual( 84 {}, 85 parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s), 86 msg='Stream response was not aborted.', 87 ) 88 89 def test_parse_metrics(self) -> None: 90 """Test metrics being parsed and recorded.""" 91 # Loading metric into RPC. 92 self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [ 93 metric_service_pb2.MetricResponse(metrics=self.metric) 94 ] 95 self.assertEqual( 96 { 97 'log': { 98 'total_created': 3.0, 99 'total_dropped': 4.0, 100 } 101 }, 102 parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s), 103 msg='Metrics are not equal.', 104 ) 105 106 def test_three_metric_names(self) -> None: 107 """Test creating a dictionary with three paths.""" 108 # Creating another leaf. 109 self.metric.append( 110 metric_service_pb2.Metric( 111 token_path=[self.log, self.min_queue_remaining], 112 string_path='N/A', 113 as_float=1.0, 114 ) 115 ) 116 self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [ 117 metric_service_pb2.MetricResponse(metrics=self.metric) 118 ] 119 self.assertEqual( 120 { 121 'log': { 122 'total_created': 3.0, 123 'total_dropped': 4.0, 124 'min_queue_remaining': 1.0, 125 }, 126 }, 127 parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s), 128 msg='Metrics are not equal.', 129 ) 130 131 def test_inserting_unknown_token(self) -> None: 132 # Inserting an unknown token as a group name. 133 self.metric.append( 134 metric_service_pb2.Metric( 135 token_path=[0x007, self.total_dropped], 136 string_path='N/A', 137 as_float=1.0, 138 ) 139 ) 140 self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [ 141 metric_service_pb2.MetricResponse(metrics=self.metric) 142 ] 143 self.assertEqual( 144 { 145 'log': { 146 'total_created': 3.0, 147 'total_dropped': 4.0, 148 }, 149 '$': {'total_dropped': 1.0}, 150 }, 151 parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s), 152 msg='Metrics are not equal.', 153 ) 154 155 def test_multiple_metric_response(self) -> None: 156 """Tests multiple metric responses being handled.""" 157 # Adding more than one MetricResponses. 158 metric = [ 159 metric_service_pb2.Metric( 160 token_path=[0x007, self.total_dropped], 161 string_path='N/A', 162 as_float=1.0, 163 ) 164 ] 165 self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [ 166 metric_service_pb2.MetricResponse(metrics=self.metric), 167 metric_service_pb2.MetricResponse(metrics=metric), 168 ] 169 self.assertEqual( 170 { 171 'log': { 172 'total_created': 3.0, 173 'total_dropped': 4.0, 174 }, 175 '$': { 176 'total_dropped': 1.0, 177 }, 178 }, 179 parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s), 180 msg='Metrics are not equal.', 181 ) 182 183 def test_paths_longer_than_two(self) -> None: 184 """Tests metric paths longer than two.""" 185 # Path longer than two. 186 longest_metric = [ 187 metric_service_pb2.Metric( 188 token_path=[ 189 self.log, 190 self.total_created, 191 self.min_queue_remaining, 192 ], 193 string_path='N/A', 194 as_float=1.0, 195 ), 196 ] 197 self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [ 198 metric_service_pb2.MetricResponse(metrics=longest_metric), 199 ] 200 self.assertEqual( 201 { 202 'log': { 203 'total_created': {'min_queue_remaining': 1.0}, 204 } 205 }, 206 parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s), 207 msg='Metrics are not equal.', 208 ) 209 # Create a new leaf in log. 210 longest_metric.append( 211 metric_service_pb2.Metric( 212 token_path=[self.log, self.total_dropped], 213 string_path='N/A', 214 as_float=3.0, 215 ) 216 ) 217 metric = [ 218 metric_service_pb2.Metric( 219 token_path=[0x007, self.total_dropped], 220 string_path='N/A', 221 as_float=1.0, 222 ), 223 metric_service_pb2.Metric( 224 token_path=[0x007, self.total_created], 225 string_path='N/A', 226 as_float=2.0, 227 ), 228 ] 229 self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [ 230 metric_service_pb2.MetricResponse(metrics=longest_metric), 231 metric_service_pb2.MetricResponse(metrics=metric), 232 ] 233 self.assertEqual( 234 { 235 'log': { 236 'total_created': { 237 'min_queue_remaining': 1.0, 238 }, 239 'total_dropped': 3.0, 240 }, 241 '$': { 242 'total_dropped': 1.0, 243 'total_created': 2.0, 244 }, 245 }, 246 parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s), 247 msg='Metrics are not equal.', 248 ) 249 250 def test_conflicting_keys(self) -> None: 251 """Tests conflicting key and value assignment.""" 252 longest_metric = [ 253 metric_service_pb2.Metric( 254 token_path=[ 255 self.log, 256 self.total_created, 257 self.min_queue_remaining, 258 ], 259 string_path='N/A', 260 as_float=1.0, 261 ), 262 ] 263 # Creates a conflict at log/total_created, should throw an error. 264 self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [ 265 metric_service_pb2.MetricResponse(metrics=longest_metric), 266 metric_service_pb2.MetricResponse(metrics=self.metric), 267 ] 268 parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s) 269 self.assertRaises(ValueError, msg='Expected Value Error.') 270 271 def test_conflicting_logs(self) -> None: 272 """Tests conflicting loga being streamed.""" 273 longest_metric = [ 274 metric_service_pb2.Metric( 275 token_path=[self.log, self.total_created], 276 string_path='N/A', 277 as_float=1.0, 278 ), 279 ] 280 # Creates a duplicate metric for log/total_created. 281 self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [ 282 metric_service_pb2.MetricResponse(metrics=longest_metric), 283 metric_service_pb2.MetricResponse(metrics=self.metric), 284 ] 285 parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s) 286 self.assertRaises(ValueError, msg='Expected Value Error.') 287 # Duplicate metrics being loaded. 288 self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [ 289 metric_service_pb2.MetricResponse(metrics=self.metric), 290 metric_service_pb2.MetricResponse(metrics=self.metric), 291 ] 292 parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s) 293 self.assertRaises(ValueError, msg='Expected Value Error.') 294 295 296if __name__ == '__main__': 297 main() 298