1# Copyright 2024, The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Unittests for Edit Monitor.""" 16 17import logging 18import multiprocessing 19import os 20import pathlib 21import signal 22import sys 23import tempfile 24import time 25import unittest 26 27from atest.proto import clientanalytics_pb2 28from edit_monitor import edit_monitor 29from proto import edit_event_pb2 30 31 32class EditMonitorTest(unittest.TestCase): 33 34 @classmethod 35 def setUpClass(cls): 36 super().setUpClass() 37 # Configure to print logging to stdout. 38 logging.basicConfig(filename=None, level=logging.DEBUG) 39 console = logging.StreamHandler(sys.stdout) 40 logging.getLogger('').addHandler(console) 41 42 def setUp(self): 43 super().setUp() 44 self.working_dir = tempfile.TemporaryDirectory() 45 self.root_monitoring_path = pathlib.Path(self.working_dir.name).joinpath( 46 'files' 47 ) 48 self.root_monitoring_path.mkdir() 49 self.log_event_dir = pathlib.Path(self.working_dir.name).joinpath('logs') 50 self.log_event_dir.mkdir() 51 52 def tearDown(self): 53 self.working_dir.cleanup() 54 super().tearDown() 55 56 def test_log_single_edit_event_success(self): 57 # Create the .git file under the monitoring dir. 58 self.root_monitoring_path.joinpath('.git').touch() 59 fake_cclient = FakeClearcutClient( 60 log_output_file=self.log_event_dir.joinpath('logs.output') 61 ) 62 p = self._start_test_edit_monitor_process(fake_cclient) 63 64 # Create and modify a file. 65 test_file = self.root_monitoring_path.joinpath('test.txt') 66 with open(test_file, 'w') as f: 67 f.write('something') 68 # Move the file. 69 test_file_moved = self.root_monitoring_path.joinpath('new_test.txt') 70 test_file.rename(test_file_moved) 71 # Delete the file. 72 test_file_moved.unlink() 73 # Give some time for the edit monitor to receive the edit event. 74 time.sleep(1) 75 # Stop the edit monitor and flush all events. 76 os.kill(p.pid, signal.SIGINT) 77 p.join() 78 79 logged_events = self._get_logged_events() 80 self.assertEqual(len(logged_events), 4) 81 expected_create_event = edit_event_pb2.EditEvent.SingleEditEvent( 82 file_path=str( 83 self.root_monitoring_path.joinpath('test.txt').resolve() 84 ), 85 edit_type=edit_event_pb2.EditEvent.CREATE, 86 ) 87 expected_modify_event = edit_event_pb2.EditEvent.SingleEditEvent( 88 file_path=str( 89 self.root_monitoring_path.joinpath('test.txt').resolve() 90 ), 91 edit_type=edit_event_pb2.EditEvent.MODIFY, 92 ) 93 expected_move_event = edit_event_pb2.EditEvent.SingleEditEvent( 94 file_path=str( 95 self.root_monitoring_path.joinpath('test.txt').resolve() 96 ), 97 edit_type=edit_event_pb2.EditEvent.MOVE, 98 ) 99 expected_delete_event = edit_event_pb2.EditEvent.SingleEditEvent( 100 file_path=str( 101 self.root_monitoring_path.joinpath('new_test.txt').resolve() 102 ), 103 edit_type=edit_event_pb2.EditEvent.DELETE, 104 ) 105 self.assertEqual( 106 expected_create_event, 107 edit_event_pb2.EditEvent.FromString( 108 logged_events[0].source_extension 109 ).single_edit_event, 110 ) 111 self.assertEqual( 112 expected_modify_event, 113 edit_event_pb2.EditEvent.FromString( 114 logged_events[1].source_extension 115 ).single_edit_event, 116 ) 117 self.assertEqual( 118 expected_move_event, 119 edit_event_pb2.EditEvent.FromString( 120 logged_events[2].source_extension 121 ).single_edit_event, 122 ) 123 self.assertEqual( 124 expected_delete_event, 125 edit_event_pb2.EditEvent.FromString( 126 logged_events[3].source_extension 127 ).single_edit_event, 128 ) 129 130 131 def test_log_aggregated_edit_event_success(self): 132 # Create the .git file under the monitoring dir. 133 self.root_monitoring_path.joinpath('.git').touch() 134 fake_cclient = FakeClearcutClient( 135 log_output_file=self.log_event_dir.joinpath('logs.output') 136 ) 137 p = self._start_test_edit_monitor_process(fake_cclient) 138 139 # Create 6 test files 140 for i in range(6): 141 test_file = self.root_monitoring_path.joinpath('test_' + str(i)) 142 test_file.touch() 143 144 # Give some time for the edit monitor to receive the edit event. 145 time.sleep(1) 146 # Stop the edit monitor and flush all events. 147 os.kill(p.pid, signal.SIGINT) 148 p.join() 149 150 logged_events = self._get_logged_events() 151 self.assertEqual(len(logged_events), 1) 152 153 expected_aggregated_edit_event = ( 154 edit_event_pb2.EditEvent.AggregatedEditEvent( 155 num_edits=6, 156 ) 157 ) 158 159 self.assertEqual( 160 expected_aggregated_edit_event, 161 edit_event_pb2.EditEvent.FromString( 162 logged_events[0].source_extension 163 ).aggregated_edit_event, 164 ) 165 166 def test_do_not_log_edit_event_for_directory_change(self): 167 # Create the .git file under the monitoring dir. 168 self.root_monitoring_path.joinpath('.git').touch() 169 fake_cclient = FakeClearcutClient( 170 log_output_file=self.log_event_dir.joinpath('logs.output') 171 ) 172 p = self._start_test_edit_monitor_process(fake_cclient) 173 174 # Create a sub directory 175 self.root_monitoring_path.joinpath('test_dir').mkdir() 176 # Give some time for the edit monitor to receive the edit event. 177 time.sleep(1) 178 # Stop the edit monitor and flush all events. 179 os.kill(p.pid, signal.SIGINT) 180 p.join() 181 182 logged_events = self._get_logged_events() 183 self.assertEqual(len(logged_events), 0) 184 185 def test_do_not_log_edit_event_for_hidden_file(self): 186 # Create the .git file under the monitoring dir. 187 self.root_monitoring_path.joinpath('.git').touch() 188 fake_cclient = FakeClearcutClient( 189 log_output_file=self.log_event_dir.joinpath('logs.output') 190 ) 191 p = self._start_test_edit_monitor_process(fake_cclient) 192 193 # Create a hidden file. 194 self.root_monitoring_path.joinpath('.test.txt').touch() 195 # Create a hidden dir. 196 hidden_dir = self.root_monitoring_path.joinpath('.test') 197 hidden_dir.mkdir() 198 hidden_dir.joinpath('test.txt').touch() 199 # Give some time for the edit monitor to receive the edit event. 200 time.sleep(1) 201 # Stop the edit monitor and flush all events. 202 os.kill(p.pid, signal.SIGINT) 203 p.join() 204 205 logged_events = self._get_logged_events() 206 self.assertEqual(len(logged_events), 0) 207 208 def test_do_not_log_edit_event_for_non_git_project_file(self): 209 fake_cclient = FakeClearcutClient( 210 log_output_file=self.log_event_dir.joinpath('logs.output') 211 ) 212 p = self._start_test_edit_monitor_process(fake_cclient) 213 214 # Create a file. 215 self.root_monitoring_path.joinpath('test.txt').touch() 216 # Create a file under a sub dir. 217 sub_dir = self.root_monitoring_path.joinpath('.test') 218 sub_dir.mkdir() 219 sub_dir.joinpath('test.txt').touch() 220 # Give some time for the edit monitor to receive the edit event. 221 time.sleep(1) 222 # Stop the edit monitor and flush all events. 223 os.kill(p.pid, signal.SIGINT) 224 p.join() 225 226 logged_events = self._get_logged_events() 227 self.assertEqual(len(logged_events), 0) 228 229 def test_log_edit_event_fail(self): 230 # Create the .git file under the monitoring dir. 231 self.root_monitoring_path.joinpath('.git').touch() 232 fake_cclient = FakeClearcutClient( 233 log_output_file=self.log_event_dir.joinpath('logs.output'), 234 raise_log_exception=True, 235 ) 236 p = self._start_test_edit_monitor_process(fake_cclient) 237 238 # Create a file. 239 self.root_monitoring_path.joinpath('test.txt').touch() 240 # Give some time for the edit monitor to receive the edit event. 241 time.sleep(1) 242 # Stop the edit monitor and flush all events. 243 os.kill(p.pid, signal.SIGINT) 244 p.join() 245 246 logged_events = self._get_logged_events() 247 self.assertEqual(len(logged_events), 0) 248 249 def _start_test_edit_monitor_process( 250 self, cclient 251 ) -> multiprocessing.Process: 252 receiver, sender = multiprocessing.Pipe() 253 # Start edit monitor in a subprocess. 254 p = multiprocessing.Process( 255 target=edit_monitor.start, 256 args=(str(self.root_monitoring_path.resolve()), False, 0.5, 5, cclient, sender), 257 ) 258 p.daemon = True 259 p.start() 260 261 # Wait until observer started. 262 received_data = receiver.recv() 263 self.assertEquals(received_data, 'Observer started.') 264 265 receiver.close() 266 return p 267 268 def _get_logged_events(self): 269 with open(self.log_event_dir.joinpath('logs.output'), 'rb') as f: 270 data = f.read() 271 272 return [ 273 clientanalytics_pb2.LogEvent.FromString(record) 274 for record in data.split(b'\x00') 275 if record 276 ] 277 278 279class FakeClearcutClient: 280 281 def __init__(self, log_output_file, raise_log_exception=False): 282 self.pending_log_events = [] 283 self.raise_log_exception = raise_log_exception 284 self.log_output_file = log_output_file 285 286 def log(self, log_event): 287 if self.raise_log_exception: 288 raise Exception('unknown exception') 289 self.pending_log_events.append(log_event) 290 291 def flush_events(self): 292 delimiter = b'\x00' # Use a null byte as the delimiter 293 with open(self.log_output_file, 'wb') as f: 294 for log_event in self.pending_log_events: 295 f.write(log_event.SerializeToString() + delimiter) 296 297 self.pending_log_events.clear() 298 299 300if __name__ == '__main__': 301 unittest.main() 302