1# Copyright 2024, The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Integration tests for Edit Monitor.""" 16 17import glob 18from importlib import resources 19import logging 20import os 21import pathlib 22import shutil 23import signal 24import subprocess 25import sys 26import tempfile 27import time 28import unittest 29from unittest import mock 30 31 32class EditMonitorIntegrationTest(unittest.TestCase): 33 34 @classmethod 35 def setUpClass(cls): 36 super().setUpClass() 37 # Configure to print logging to stdout. 38 logging.basicConfig(filename=None, level=logging.DEBUG) 39 console = logging.StreamHandler(sys.stdout) 40 logging.getLogger("").addHandler(console) 41 42 def setUp(self): 43 super().setUp() 44 self.working_dir = tempfile.TemporaryDirectory() 45 self.root_monitoring_path = pathlib.Path(self.working_dir.name).joinpath( 46 "files" 47 ) 48 self.root_monitoring_path.mkdir() 49 self.edit_monitor_binary_path = self._import_executable("edit_monitor") 50 self.patch = mock.patch.dict( 51 os.environ, {"ENABLE_ANDROID_EDIT_MONITOR": "true"} 52 ) 53 self.patch.start() 54 55 def tearDown(self): 56 self.patch.stop() 57 self.working_dir.cleanup() 58 super().tearDown() 59 60 def test_log_single_edit_event_success(self): 61 p = self._start_edit_monitor_process() 62 63 # Create the .git file under the monitoring dir. 64 self.root_monitoring_path.joinpath(".git").touch() 65 66 # Create and modify a file. 67 test_file = self.root_monitoring_path.joinpath("test.txt") 68 with open(test_file, "w") as f: 69 f.write("something") 70 71 # Move the file. 72 test_file_moved = self.root_monitoring_path.joinpath("new_test.txt") 73 test_file.rename(test_file_moved) 74 75 # Delete the file. 76 test_file_moved.unlink() 77 78 # Give some time for the edit monitor to receive the edit event. 79 time.sleep(1) 80 # Stop the edit monitor and flush all events. 81 os.kill(p.pid, signal.SIGINT) 82 p.communicate() 83 84 self.assertEqual(self._get_logged_events_num(), 4) 85 86 def test_start_multiple_edit_monitor_only_one_started(self): 87 p1 = self._start_edit_monitor_process(wait_for_observer_start=False) 88 p2 = self._start_edit_monitor_process(wait_for_observer_start=False) 89 p3 = self._start_edit_monitor_process(wait_for_observer_start=False) 90 91 live_processes = self._get_live_processes([p1, p2, p3]) 92 93 # Cleanup all live processes. 94 for p in live_processes: 95 os.kill(p.pid, signal.SIGINT) 96 p.communicate() 97 98 self.assertEqual(len(live_processes), 1) 99 100 def _start_edit_monitor_process(self, wait_for_observer_start=True): 101 command = f""" 102 export TMPDIR="{self.working_dir.name}" 103 {self.edit_monitor_binary_path} --path={self.root_monitoring_path} --dry_run""" 104 p = subprocess.Popen( 105 command, 106 shell=True, 107 text=True, 108 start_new_session=True, 109 executable="/bin/bash", 110 ) 111 if wait_for_observer_start: 112 self._wait_for_observer_start(time_out=5) 113 114 return p 115 116 def _wait_for_observer_start(self, time_out): 117 start_time = time.time() 118 119 while time.time() < start_time + time_out: 120 log_files = glob.glob(self.working_dir.name + "/edit_monitor_*/*.log") 121 if log_files: 122 with open(log_files[0], "r") as f: 123 for line in f: 124 logging.debug("initial log: %s", line) 125 if line.rstrip("\n").endswith("Observer started."): 126 return 127 else: 128 time.sleep(1) 129 130 self.fail(f"Observer not started in {time_out} seconds.") 131 132 def _get_logged_events_num(self): 133 log_files = glob.glob(self.working_dir.name + "/edit_monitor_*/*.log") 134 self.assertEqual(len(log_files), 1) 135 136 with open(log_files[0], "r") as f: 137 for line in f: 138 logging.debug("complete log: %s", line) 139 if line.rstrip("\n").endswith("in dry run."): 140 return int(line.split(":")[-1].split(" ")[2]) 141 142 return 0 143 144 def _get_live_processes(self, processes): 145 live_processes = [] 146 for p in processes: 147 try: 148 p.wait(timeout=5) 149 except subprocess.TimeoutExpired as e: 150 live_processes.append(p) 151 logging.info("process: %d still alive.", p.pid) 152 else: 153 logging.info("process: %d stopped.", p.pid) 154 return live_processes 155 156 def _import_executable(self, executable_name: str) -> pathlib.Path: 157 binary_dir = pathlib.Path(self.working_dir.name).joinpath("binary") 158 binary_dir.mkdir() 159 executable_path = binary_dir.joinpath(executable_name) 160 with resources.as_file( 161 resources.files("testdata").joinpath(executable_name) 162 ) as binary: 163 shutil.copy(binary, executable_path) 164 executable_path.chmod(0o755) 165 return executable_path 166 167 168if __name__ == "__main__": 169 unittest.main() 170