xref: /aosp_15_r20/build/make/tools/edit_monitor/edit_monitor_test.py (revision 9e94795a3d4ef5c1d47486f9a02bb378756cea8a)
1# Copyright 2024, The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Unittests for Edit Monitor."""
16
17import logging
18import multiprocessing
19import os
20import pathlib
21import signal
22import sys
23import tempfile
24import time
25import unittest
26
27from atest.proto import clientanalytics_pb2
28from edit_monitor import edit_monitor
29from proto import edit_event_pb2
30
31
32class EditMonitorTest(unittest.TestCase):
33
34  @classmethod
35  def setUpClass(cls):
36    super().setUpClass()
37    # Configure to print logging to stdout.
38    logging.basicConfig(filename=None, level=logging.DEBUG)
39    console = logging.StreamHandler(sys.stdout)
40    logging.getLogger('').addHandler(console)
41
42  def setUp(self):
43    super().setUp()
44    self.working_dir = tempfile.TemporaryDirectory()
45    self.root_monitoring_path = pathlib.Path(self.working_dir.name).joinpath(
46        'files'
47    )
48    self.root_monitoring_path.mkdir()
49    self.log_event_dir = pathlib.Path(self.working_dir.name).joinpath('logs')
50    self.log_event_dir.mkdir()
51
52  def tearDown(self):
53    self.working_dir.cleanup()
54    super().tearDown()
55
56  def test_log_single_edit_event_success(self):
57    # Create the .git file under the monitoring dir.
58    self.root_monitoring_path.joinpath('.git').touch()
59    fake_cclient = FakeClearcutClient(
60        log_output_file=self.log_event_dir.joinpath('logs.output')
61    )
62    p = self._start_test_edit_monitor_process(fake_cclient)
63
64    # Create and modify a file.
65    test_file = self.root_monitoring_path.joinpath('test.txt')
66    with open(test_file, 'w') as f:
67      f.write('something')
68    # Move the file.
69    test_file_moved = self.root_monitoring_path.joinpath('new_test.txt')
70    test_file.rename(test_file_moved)
71    # Delete the file.
72    test_file_moved.unlink()
73    # Give some time for the edit monitor to receive the edit event.
74    time.sleep(1)
75    # Stop the edit monitor and flush all events.
76    os.kill(p.pid, signal.SIGINT)
77    p.join()
78
79    logged_events = self._get_logged_events()
80    self.assertEqual(len(logged_events), 4)
81    expected_create_event = edit_event_pb2.EditEvent.SingleEditEvent(
82        file_path=str(
83            self.root_monitoring_path.joinpath('test.txt').resolve()
84        ),
85        edit_type=edit_event_pb2.EditEvent.CREATE,
86    )
87    expected_modify_event = edit_event_pb2.EditEvent.SingleEditEvent(
88        file_path=str(
89            self.root_monitoring_path.joinpath('test.txt').resolve()
90        ),
91        edit_type=edit_event_pb2.EditEvent.MODIFY,
92    )
93    expected_move_event = edit_event_pb2.EditEvent.SingleEditEvent(
94        file_path=str(
95            self.root_monitoring_path.joinpath('test.txt').resolve()
96        ),
97        edit_type=edit_event_pb2.EditEvent.MOVE,
98    )
99    expected_delete_event = edit_event_pb2.EditEvent.SingleEditEvent(
100        file_path=str(
101            self.root_monitoring_path.joinpath('new_test.txt').resolve()
102        ),
103        edit_type=edit_event_pb2.EditEvent.DELETE,
104    )
105    self.assertEqual(
106        expected_create_event,
107        edit_event_pb2.EditEvent.FromString(
108            logged_events[0].source_extension
109        ).single_edit_event,
110    )
111    self.assertEqual(
112        expected_modify_event,
113        edit_event_pb2.EditEvent.FromString(
114            logged_events[1].source_extension
115        ).single_edit_event,
116    )
117    self.assertEqual(
118        expected_move_event,
119        edit_event_pb2.EditEvent.FromString(
120            logged_events[2].source_extension
121        ).single_edit_event,
122    )
123    self.assertEqual(
124        expected_delete_event,
125        edit_event_pb2.EditEvent.FromString(
126            logged_events[3].source_extension
127        ).single_edit_event,
128    )
129
130
131  def test_log_aggregated_edit_event_success(self):
132    # Create the .git file under the monitoring dir.
133    self.root_monitoring_path.joinpath('.git').touch()
134    fake_cclient = FakeClearcutClient(
135        log_output_file=self.log_event_dir.joinpath('logs.output')
136    )
137    p = self._start_test_edit_monitor_process(fake_cclient)
138
139    # Create 6 test files
140    for i in range(6):
141      test_file = self.root_monitoring_path.joinpath('test_' + str(i))
142      test_file.touch()
143
144    # Give some time for the edit monitor to receive the edit event.
145    time.sleep(1)
146    # Stop the edit monitor and flush all events.
147    os.kill(p.pid, signal.SIGINT)
148    p.join()
149
150    logged_events = self._get_logged_events()
151    self.assertEqual(len(logged_events), 1)
152
153    expected_aggregated_edit_event = (
154        edit_event_pb2.EditEvent.AggregatedEditEvent(
155            num_edits=6,
156        )
157    )
158
159    self.assertEqual(
160        expected_aggregated_edit_event,
161        edit_event_pb2.EditEvent.FromString(
162            logged_events[0].source_extension
163        ).aggregated_edit_event,
164    )
165
166  def test_do_not_log_edit_event_for_directory_change(self):
167    # Create the .git file under the monitoring dir.
168    self.root_monitoring_path.joinpath('.git').touch()
169    fake_cclient = FakeClearcutClient(
170        log_output_file=self.log_event_dir.joinpath('logs.output')
171    )
172    p = self._start_test_edit_monitor_process(fake_cclient)
173
174    # Create a sub directory
175    self.root_monitoring_path.joinpath('test_dir').mkdir()
176    # Give some time for the edit monitor to receive the edit event.
177    time.sleep(1)
178    # Stop the edit monitor and flush all events.
179    os.kill(p.pid, signal.SIGINT)
180    p.join()
181
182    logged_events = self._get_logged_events()
183    self.assertEqual(len(logged_events), 0)
184
185  def test_do_not_log_edit_event_for_hidden_file(self):
186    # Create the .git file under the monitoring dir.
187    self.root_monitoring_path.joinpath('.git').touch()
188    fake_cclient = FakeClearcutClient(
189        log_output_file=self.log_event_dir.joinpath('logs.output')
190    )
191    p = self._start_test_edit_monitor_process(fake_cclient)
192
193    # Create a hidden file.
194    self.root_monitoring_path.joinpath('.test.txt').touch()
195    # Create a hidden dir.
196    hidden_dir = self.root_monitoring_path.joinpath('.test')
197    hidden_dir.mkdir()
198    hidden_dir.joinpath('test.txt').touch()
199    # Give some time for the edit monitor to receive the edit event.
200    time.sleep(1)
201    # Stop the edit monitor and flush all events.
202    os.kill(p.pid, signal.SIGINT)
203    p.join()
204
205    logged_events = self._get_logged_events()
206    self.assertEqual(len(logged_events), 0)
207
208  def test_do_not_log_edit_event_for_non_git_project_file(self):
209    fake_cclient = FakeClearcutClient(
210        log_output_file=self.log_event_dir.joinpath('logs.output')
211    )
212    p = self._start_test_edit_monitor_process(fake_cclient)
213
214    # Create a file.
215    self.root_monitoring_path.joinpath('test.txt').touch()
216    # Create a file under a sub dir.
217    sub_dir = self.root_monitoring_path.joinpath('.test')
218    sub_dir.mkdir()
219    sub_dir.joinpath('test.txt').touch()
220    # Give some time for the edit monitor to receive the edit event.
221    time.sleep(1)
222    # Stop the edit monitor and flush all events.
223    os.kill(p.pid, signal.SIGINT)
224    p.join()
225
226    logged_events = self._get_logged_events()
227    self.assertEqual(len(logged_events), 0)
228
229  def test_log_edit_event_fail(self):
230    # Create the .git file under the monitoring dir.
231    self.root_monitoring_path.joinpath('.git').touch()
232    fake_cclient = FakeClearcutClient(
233        log_output_file=self.log_event_dir.joinpath('logs.output'),
234        raise_log_exception=True,
235    )
236    p = self._start_test_edit_monitor_process(fake_cclient)
237
238    # Create a file.
239    self.root_monitoring_path.joinpath('test.txt').touch()
240    # Give some time for the edit monitor to receive the edit event.
241    time.sleep(1)
242    # Stop the edit monitor and flush all events.
243    os.kill(p.pid, signal.SIGINT)
244    p.join()
245
246    logged_events = self._get_logged_events()
247    self.assertEqual(len(logged_events), 0)
248
249  def _start_test_edit_monitor_process(
250      self, cclient
251  ) -> multiprocessing.Process:
252    receiver, sender = multiprocessing.Pipe()
253    # Start edit monitor in a subprocess.
254    p = multiprocessing.Process(
255        target=edit_monitor.start,
256        args=(str(self.root_monitoring_path.resolve()), False, 0.5, 5, cclient, sender),
257    )
258    p.daemon = True
259    p.start()
260
261    # Wait until observer started.
262    received_data = receiver.recv()
263    self.assertEquals(received_data, 'Observer started.')
264
265    receiver.close()
266    return p
267
268  def _get_logged_events(self):
269    with open(self.log_event_dir.joinpath('logs.output'), 'rb') as f:
270      data = f.read()
271
272    return [
273        clientanalytics_pb2.LogEvent.FromString(record)
274        for record in data.split(b'\x00')
275        if record
276    ]
277
278
279class FakeClearcutClient:
280
281  def __init__(self, log_output_file, raise_log_exception=False):
282    self.pending_log_events = []
283    self.raise_log_exception = raise_log_exception
284    self.log_output_file = log_output_file
285
286  def log(self, log_event):
287    if self.raise_log_exception:
288      raise Exception('unknown exception')
289    self.pending_log_events.append(log_event)
290
291  def flush_events(self):
292    delimiter = b'\x00'  # Use a null byte as the delimiter
293    with open(self.log_output_file, 'wb') as f:
294      for log_event in self.pending_log_events:
295        f.write(log_event.SerializeToString() + delimiter)
296
297    self.pending_log_events.clear()
298
299
300if __name__ == '__main__':
301  unittest.main()
302