xref: /aosp_15_r20/external/pigweed/pw_metric/py/metric_parser_test.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1#!/usr/bin/env python3
2# Copyright 2022 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Tests for retreiving and parsing metrics."""
16from unittest import TestCase, mock, main
17from pw_metric.metric_parser import parse_metrics
18
19from pw_metric_proto import metric_service_pb2
20from pw_status import Status
21from pw_tokenizer import detokenize, tokens
22
23DATABASE = tokens.Database(
24    [
25        tokens.TokenizedStringEntry(0x01148A48, "total_dropped"),
26        tokens.TokenizedStringEntry(0x03796798, "min_queue_remaining"),
27        tokens.TokenizedStringEntry(0x22198280, "total_created"),
28        tokens.TokenizedStringEntry(0x534A42F4, "max_queue_used"),
29        tokens.TokenizedStringEntry(0x5D087463, "pw::work_queue::WorkQueue"),
30        tokens.TokenizedStringEntry(0xA7C43965, "log"),
31    ]
32)
33
34
35class TestParseMetrics(TestCase):
36    """Test parsing metrics received from RPCs"""
37
38    def setUp(self) -> None:
39        """Creating detokenizer and mocking RPC."""
40        self.detokenize = detokenize.Detokenizer(DATABASE)
41        self.rpc_timeout_s = 1
42        self.rpcs = mock.Mock()
43        self.rpcs.pw = mock.Mock()
44        self.rpcs.pw.metric = mock.Mock()
45        self.rpcs.pw.metric.proto = mock.Mock()
46        self.rpcs.pw.metric.proto.MetricService = mock.Mock()
47        self.rpcs.pw.metric.proto.MetricService.Get = mock.Mock()
48        self.rpcs.pw.metric.proto.MetricService.Get.return_value = mock.Mock()
49        self.rpcs.pw.metric.proto.MetricService.Get.return_value.status = (
50            Status.OK
51        )
52        # Creating a group and metric name for better identification.
53        self.log = 0xA7C43965
54        self.total_created = 0x22198280
55        self.total_dropped = 0x01148A48
56        self.min_queue_remaining = 0x03796798
57        self.metric = [
58            metric_service_pb2.Metric(
59                token_path=[self.log, self.total_created],
60                string_path='N/A',
61                as_float=3.0,
62            ),
63            metric_service_pb2.Metric(
64                token_path=[self.log, self.total_dropped],
65                string_path='N/A',
66                as_float=4.0,
67            ),
68        ]
69
70    def test_invalid_detokenizer(self) -> None:
71        """Test invalid detokenizer was supplied."""
72        self.assertEqual(
73            {},
74            parse_metrics(self.rpcs, None, self.rpc_timeout_s),
75            msg='Valid detokenizer.',
76        )
77
78    def test_bad_stream_status(self) -> None:
79        """Test stream response has a status other than OK."""
80        self.rpcs.pw.metric.proto.MetricService.Get.return_value.status = (
81            Status.ABORTED
82        )
83        self.assertEqual(
84            {},
85            parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s),
86            msg='Stream response was not aborted.',
87        )
88
89    def test_parse_metrics(self) -> None:
90        """Test metrics being parsed and recorded."""
91        # Loading metric into RPC.
92        self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [
93            metric_service_pb2.MetricResponse(metrics=self.metric)
94        ]
95        self.assertEqual(
96            {
97                'log': {
98                    'total_created': 3.0,
99                    'total_dropped': 4.0,
100                }
101            },
102            parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s),
103            msg='Metrics are not equal.',
104        )
105
106    def test_three_metric_names(self) -> None:
107        """Test creating a dictionary with three paths."""
108        # Creating another leaf.
109        self.metric.append(
110            metric_service_pb2.Metric(
111                token_path=[self.log, self.min_queue_remaining],
112                string_path='N/A',
113                as_float=1.0,
114            )
115        )
116        self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [
117            metric_service_pb2.MetricResponse(metrics=self.metric)
118        ]
119        self.assertEqual(
120            {
121                'log': {
122                    'total_created': 3.0,
123                    'total_dropped': 4.0,
124                    'min_queue_remaining': 1.0,
125                },
126            },
127            parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s),
128            msg='Metrics are not equal.',
129        )
130
131    def test_inserting_unknown_token(self) -> None:
132        # Inserting an unknown token as a group name.
133        self.metric.append(
134            metric_service_pb2.Metric(
135                token_path=[0x007, self.total_dropped],
136                string_path='N/A',
137                as_float=1.0,
138            )
139        )
140        self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [
141            metric_service_pb2.MetricResponse(metrics=self.metric)
142        ]
143        self.assertEqual(
144            {
145                'log': {
146                    'total_created': 3.0,
147                    'total_dropped': 4.0,
148                },
149                '$': {'total_dropped': 1.0},
150            },
151            parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s),
152            msg='Metrics are not equal.',
153        )
154
155    def test_multiple_metric_response(self) -> None:
156        """Tests multiple metric responses being handled."""
157        # Adding more than one MetricResponses.
158        metric = [
159            metric_service_pb2.Metric(
160                token_path=[0x007, self.total_dropped],
161                string_path='N/A',
162                as_float=1.0,
163            )
164        ]
165        self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [
166            metric_service_pb2.MetricResponse(metrics=self.metric),
167            metric_service_pb2.MetricResponse(metrics=metric),
168        ]
169        self.assertEqual(
170            {
171                'log': {
172                    'total_created': 3.0,
173                    'total_dropped': 4.0,
174                },
175                '$': {
176                    'total_dropped': 1.0,
177                },
178            },
179            parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s),
180            msg='Metrics are not equal.',
181        )
182
183    def test_paths_longer_than_two(self) -> None:
184        """Tests metric paths longer than two."""
185        # Path longer than two.
186        longest_metric = [
187            metric_service_pb2.Metric(
188                token_path=[
189                    self.log,
190                    self.total_created,
191                    self.min_queue_remaining,
192                ],
193                string_path='N/A',
194                as_float=1.0,
195            ),
196        ]
197        self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [
198            metric_service_pb2.MetricResponse(metrics=longest_metric),
199        ]
200        self.assertEqual(
201            {
202                'log': {
203                    'total_created': {'min_queue_remaining': 1.0},
204                }
205            },
206            parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s),
207            msg='Metrics are not equal.',
208        )
209        # Create a new leaf in log.
210        longest_metric.append(
211            metric_service_pb2.Metric(
212                token_path=[self.log, self.total_dropped],
213                string_path='N/A',
214                as_float=3.0,
215            )
216        )
217        metric = [
218            metric_service_pb2.Metric(
219                token_path=[0x007, self.total_dropped],
220                string_path='N/A',
221                as_float=1.0,
222            ),
223            metric_service_pb2.Metric(
224                token_path=[0x007, self.total_created],
225                string_path='N/A',
226                as_float=2.0,
227            ),
228        ]
229        self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [
230            metric_service_pb2.MetricResponse(metrics=longest_metric),
231            metric_service_pb2.MetricResponse(metrics=metric),
232        ]
233        self.assertEqual(
234            {
235                'log': {
236                    'total_created': {
237                        'min_queue_remaining': 1.0,
238                    },
239                    'total_dropped': 3.0,
240                },
241                '$': {
242                    'total_dropped': 1.0,
243                    'total_created': 2.0,
244                },
245            },
246            parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s),
247            msg='Metrics are not equal.',
248        )
249
250    def test_conflicting_keys(self) -> None:
251        """Tests conflicting key and value assignment."""
252        longest_metric = [
253            metric_service_pb2.Metric(
254                token_path=[
255                    self.log,
256                    self.total_created,
257                    self.min_queue_remaining,
258                ],
259                string_path='N/A',
260                as_float=1.0,
261            ),
262        ]
263        # Creates a conflict at log/total_created, should throw an error.
264        self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [
265            metric_service_pb2.MetricResponse(metrics=longest_metric),
266            metric_service_pb2.MetricResponse(metrics=self.metric),
267        ]
268        parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s)
269        self.assertRaises(ValueError, msg='Expected Value Error.')
270
271    def test_conflicting_logs(self) -> None:
272        """Tests conflicting loga being streamed."""
273        longest_metric = [
274            metric_service_pb2.Metric(
275                token_path=[self.log, self.total_created],
276                string_path='N/A',
277                as_float=1.0,
278            ),
279        ]
280        # Creates a duplicate metric for log/total_created.
281        self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [
282            metric_service_pb2.MetricResponse(metrics=longest_metric),
283            metric_service_pb2.MetricResponse(metrics=self.metric),
284        ]
285        parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s)
286        self.assertRaises(ValueError, msg='Expected Value Error.')
287        # Duplicate metrics being loaded.
288        self.rpcs.pw.metric.proto.MetricService.Get.return_value.responses = [
289            metric_service_pb2.MetricResponse(metrics=self.metric),
290            metric_service_pb2.MetricResponse(metrics=self.metric),
291        ]
292        parse_metrics(self.rpcs, self.detokenize, self.rpc_timeout_s)
293        self.assertRaises(ValueError, msg='Expected Value Error.')
294
295
296if __name__ == '__main__':
297    main()
298