1# Copyright 2024, The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16import errno
17import fcntl
18import getpass
19import hashlib
20import logging
21import multiprocessing
22import os
23import pathlib
24import platform
25import signal
26import subprocess
27import sys
28import tempfile
29import time
30
31from atest.metrics import clearcut_client
32from atest.proto import clientanalytics_pb2
33from edit_monitor import utils
34from proto import edit_event_pb2
35
36DEFAULT_PROCESS_TERMINATION_TIMEOUT_SECONDS = 5
37DEFAULT_MONITOR_INTERVAL_SECONDS = 5
38DEFAULT_MEMORY_USAGE_THRESHOLD = 0.02  # 2% of total memory
39DEFAULT_CPU_USAGE_THRESHOLD = 200
40DEFAULT_REBOOT_TIMEOUT_SECONDS = 60 * 60 * 24
41BLOCK_SIGN_FILE = "edit_monitor_block_sign"
42# Enum of the Clearcut log source defined under
43# /google3/wireless/android/play/playlog/proto/log_source_enum.proto
44LOG_SOURCE = 2524
45
46
47def default_daemon_target():
48  """Place holder for the default daemon target."""
49  print("default daemon target")
50
51
52class DaemonManager:
53  """Class to manage and monitor the daemon run as a subprocess."""
54
55  def __init__(
56      self,
57      binary_path: str,
58      daemon_target: callable = default_daemon_target,
59      daemon_args: tuple = (),
60      cclient: clearcut_client.Clearcut | None = None,
61  ):
62    self.binary_path = binary_path
63    self.daemon_target = daemon_target
64    self.daemon_args = daemon_args
65    self.cclient = cclient or clearcut_client.Clearcut(LOG_SOURCE)
66
67    self.user_name = getpass.getuser()
68    self.host_name = platform.node()
69    self.source_root = os.environ.get("ANDROID_BUILD_TOP", "")
70    self.pid = os.getpid()
71    self.daemon_process = None
72
73    self.max_memory_usage = 0
74    self.max_cpu_usage = 0
75    self.total_memory_size = os.sysconf("SC_PAGE_SIZE") * os.sysconf(
76        "SC_PHYS_PAGES"
77    )
78
79    pid_file_dir = pathlib.Path(tempfile.gettempdir()).joinpath("edit_monitor")
80    pid_file_dir.mkdir(parents=True, exist_ok=True)
81    self.pid_file_path = self._get_pid_file_path(pid_file_dir)
82    self.block_sign = pathlib.Path(tempfile.gettempdir()).joinpath(
83        BLOCK_SIGN_FILE
84    )
85
86  def start(self):
87    """Writes the pidfile and starts the daemon proces."""
88    if not utils.is_feature_enabled(
89        "edit_monitor",
90        self.user_name,
91        "ENABLE_ANDROID_EDIT_MONITOR",
92        100,
93    ):
94      logging.warning("Edit monitor is disabled, exiting...")
95      return
96
97    if self.block_sign.exists():
98      logging.warning("Block sign found, exiting...")
99      return
100
101    if self.binary_path.startswith("/google/cog/"):
102      logging.warning("Edit monitor for cog is not supported, exiting...")
103      return
104
105    setup_lock_file = pathlib.Path(tempfile.gettempdir()).joinpath(
106        self.pid_file_path.name + ".setup"
107    )
108    logging.info("setup lock file: %s", setup_lock_file)
109    with open(setup_lock_file, "w") as f:
110      try:
111        # Acquire an exclusive lock
112        fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
113        self._stop_any_existing_instance()
114        self._write_pid_to_pidfile()
115        self._start_daemon_process()
116      except Exception as e:
117        if (
118            isinstance(e, IOError) and e.errno == errno.EAGAIN
119        ):  # Failed to acquire the file lock.
120          logging.warning("Another edit monitor is starting, exitinng...")
121          return
122        else:
123          logging.exception("Failed to start daemon manager with error %s", e)
124          self._send_error_event_to_clearcut(
125              edit_event_pb2.EditEvent.FAILED_TO_START_EDIT_MONITOR
126          )
127          raise e
128      finally:
129        # Release the lock
130        fcntl.flock(f, fcntl.LOCK_UN)
131
132  def monitor_daemon(
133      self,
134      interval: int = DEFAULT_MONITOR_INTERVAL_SECONDS,
135      memory_threshold: float = DEFAULT_MEMORY_USAGE_THRESHOLD,
136      cpu_threshold: float = DEFAULT_CPU_USAGE_THRESHOLD,
137      reboot_timeout: int = DEFAULT_REBOOT_TIMEOUT_SECONDS,
138  ):
139    """Monits the daemon process status.
140
141    Periodically check the CPU/Memory usage of the daemon process as long as the
142    process is still running and kill the process if the resource usage is above
143    given thresholds.
144    """
145    if not self.daemon_process:
146      return
147
148    logging.info("start monitoring daemon process %d.", self.daemon_process.pid)
149    reboot_time = time.time() + reboot_timeout
150    while self.daemon_process.is_alive():
151      if time.time() > reboot_time:
152        self.reboot()
153      try:
154        memory_usage = self._get_process_memory_percent(self.daemon_process.pid)
155        self.max_memory_usage = max(self.max_memory_usage, memory_usage)
156
157        cpu_usage = self._get_process_cpu_percent(self.daemon_process.pid)
158        self.max_cpu_usage = max(self.max_cpu_usage, cpu_usage)
159
160        time.sleep(interval)
161      except Exception as e:
162        # Logging the error and continue.
163        logging.warning("Failed to monitor daemon process with error: %s", e)
164
165      if self.max_memory_usage >= memory_threshold:
166        self._send_error_event_to_clearcut(
167            edit_event_pb2.EditEvent.KILLED_DUE_TO_EXCEEDED_MEMORY_USAGE
168        )
169        logging.error(
170            "Daemon process is consuming too much memory, rebooting..."
171        )
172        self.reboot()
173
174      if self.max_cpu_usage >= cpu_threshold:
175        self._send_error_event_to_clearcut(
176            edit_event_pb2.EditEvent.KILLED_DUE_TO_EXCEEDED_CPU_USAGE
177        )
178        logging.error("Daemon process is consuming too much cpu, killing...")
179        self._terminate_process(self.daemon_process.pid)
180
181    logging.info(
182        "Daemon process %d terminated. Max memory usage: %f, Max cpu"
183        " usage: %f.",
184        self.daemon_process.pid,
185        self.max_memory_usage,
186        self.max_cpu_usage,
187    )
188
189  def stop(self):
190    """Stops the daemon process and removes the pidfile."""
191
192    logging.info("in daemon manager cleanup.")
193    try:
194      if self.daemon_process:
195        # The daemon process might already in termination process,
196        # wait some time before kill it explicitly.
197        self._wait_for_process_terminate(self.daemon_process.pid, 1)
198        if self.daemon_process.is_alive():
199          self._terminate_process(self.daemon_process.pid)
200      self._remove_pidfile(self.pid)
201      logging.info("Successfully stopped daemon manager.")
202    except Exception as e:
203      logging.exception("Failed to stop daemon manager with error %s", e)
204      self._send_error_event_to_clearcut(
205          edit_event_pb2.EditEvent.FAILED_TO_STOP_EDIT_MONITOR
206      )
207      sys.exit(1)
208    finally:
209      self.cclient.flush_events()
210
211  def reboot(self):
212    """Reboots the current process.
213
214    Stops the current daemon manager and reboots the entire process based on
215    the binary file. Exits directly If the binary file no longer exists.
216    """
217    logging.info("Rebooting process based on binary %s.", self.binary_path)
218
219    # Stop the current daemon manager first.
220    self.stop()
221
222    # If the binary no longer exists, exit directly.
223    if not os.path.exists(self.binary_path):
224      logging.info("binary %s no longer exists, exiting.", self.binary_path)
225      sys.exit(0)
226
227    try:
228      os.execv(self.binary_path, sys.argv)
229    except OSError as e:
230      logging.exception("Failed to reboot process with error: %s.", e)
231      self._send_error_event_to_clearcut(
232          edit_event_pb2.EditEvent.FAILED_TO_REBOOT_EDIT_MONITOR
233      )
234      sys.exit(1)  # Indicate an error occurred
235
236  def cleanup(self):
237    """Wipes out all edit monitor instances in the system.
238
239    Stops all the existing edit monitor instances and place a block sign
240    to prevent any edit monitor process to start. This method is only used
241    in emergency case when there's something goes wrong with the edit monitor
242    that requires immediate cleanup to prevent damanger to the system.
243    """
244    logging.debug("Start cleaning up all existing instances.")
245    self._send_error_event_to_clearcut(edit_event_pb2.EditEvent.FORCE_CLEANUP)
246
247    try:
248      # First places a block sign to prevent any edit monitor process to start.
249      self.block_sign.touch()
250    except (FileNotFoundError, PermissionError, OSError):
251      logging.exception("Failed to place the block sign")
252
253    # Finds and kills all the existing instances of edit monitor.
254    existing_instances_pids = self._find_all_instances_pids()
255    for pid in existing_instances_pids:
256      logging.info(
257          "Found existing edit monitor instance with pid %d, killing...", pid
258      )
259      try:
260        self._terminate_process(pid)
261      except Exception:
262        logging.exception("Failed to terminate process %d", pid)
263
264  def _stop_any_existing_instance(self):
265    if not self.pid_file_path.exists():
266      logging.debug("No existing instances.")
267      return
268
269    ex_pid = self._read_pid_from_pidfile()
270
271    if ex_pid:
272      logging.info("Found another instance with pid %d.", ex_pid)
273      self._terminate_process(ex_pid)
274      self._remove_pidfile(ex_pid)
275
276  def _read_pid_from_pidfile(self) -> int | None:
277    try:
278      with open(self.pid_file_path, "r") as f:
279        return int(f.read().strip())
280    except FileNotFoundError as e:
281      logging.warning("pidfile %s does not exist.", self.pid_file_path)
282      return None
283
284  def _write_pid_to_pidfile(self):
285    """Creates a pidfile and writes the current pid to the file.
286
287    Raise FileExistsError if the pidfile already exists.
288    """
289    try:
290      # Use the 'x' mode to open the file for exclusive creation
291      with open(self.pid_file_path, "x") as f:
292        f.write(f"{self.pid}")
293    except FileExistsError as e:
294      # This could be caused due to race condition that a user is trying
295      # to start two edit monitors at the same time. Or because there is
296      # already an existing edit monitor running and we can not kill it
297      # for some reason.
298      logging.exception("pidfile %s already exists.", self.pid_file_path)
299      raise e
300
301  def _start_daemon_process(self):
302    """Starts a subprocess to run the daemon."""
303    p = multiprocessing.Process(
304        target=self.daemon_target, args=self.daemon_args
305    )
306    p.daemon = True
307    p.start()
308
309    logging.info("Start subprocess with PID %d", p.pid)
310    self.daemon_process = p
311
312  def _terminate_process(
313      self, pid: int, timeout: int = DEFAULT_PROCESS_TERMINATION_TIMEOUT_SECONDS
314  ):
315    """Terminates a process with given pid.
316
317    It first sends a SIGTERM to the process to allow it for proper
318    termination with a timeout. If the process is not terminated within
319    the timeout, kills it forcefully.
320    """
321    try:
322      os.kill(pid, signal.SIGTERM)
323      if not self._wait_for_process_terminate(pid, timeout):
324        logging.warning(
325            "Process %d not terminated within timeout, try force kill", pid
326        )
327        os.kill(pid, signal.SIGKILL)
328    except ProcessLookupError:
329      logging.info("Process with PID %d not found (already terminated)", pid)
330
331  def _wait_for_process_terminate(self, pid: int, timeout: int) -> bool:
332    start_time = time.time()
333
334    while time.time() < start_time + timeout:
335      if not self._is_process_alive(pid):
336        return True
337      time.sleep(1)
338
339    logging.error("Process %d not terminated within %d seconds.", pid, timeout)
340    return False
341
342  def _is_process_alive(self, pid: int) -> bool:
343    try:
344      output = subprocess.check_output(
345          ["ps", "-p", str(pid), "-o", "state="], text=True
346      ).strip()
347      state = output.split()[0]
348      return state != "Z"  # Check if the state is not 'Z' (zombie)
349    except subprocess.CalledProcessError:
350      # Process not found (already dead).
351      return False
352    except (FileNotFoundError, OSError, ValueError) as e:
353      logging.warning(
354          "Unable to check the status for process %d with error: %s.", pid, e
355      )
356      return True
357
358  def _remove_pidfile(self, expected_pid: int):
359    recorded_pid = self._read_pid_from_pidfile()
360
361    if recorded_pid is None:
362      logging.info("pid file %s already removed.", self.pid_file_path)
363      return
364
365    if recorded_pid != expected_pid:
366      logging.warning(
367          "pid file contains pid from a different process, expected pid: %d,"
368          " actual pid: %d.",
369          expected_pid,
370          recorded_pid,
371      )
372      return
373
374    logging.debug("removing pidfile written by process %s", expected_pid)
375    try:
376      os.remove(self.pid_file_path)
377    except FileNotFoundError:
378      logging.info("pid file %s already removed.", self.pid_file_path)
379
380  def _get_pid_file_path(self, pid_file_dir: pathlib.Path) -> pathlib.Path:
381    """Generates the path to store the pidfile.
382
383    The file path should have the format of "/tmp/edit_monitor/xxxx.lock"
384    where xxxx is a hashed value based on the binary path that starts the
385    process.
386    """
387    hash_object = hashlib.sha256()
388    hash_object.update(self.binary_path.encode("utf-8"))
389    pid_file_path = pid_file_dir.joinpath(hash_object.hexdigest() + ".lock")
390    logging.info("pid_file_path: %s", pid_file_path)
391
392    return pid_file_path
393
394  def _get_process_memory_percent(self, pid: int) -> float:
395    with open(f"/proc/{pid}/stat", "r") as f:
396      stat_data = f.readline().split()
397      # RSS is the 24th field in /proc/[pid]/stat
398      rss_pages = int(stat_data[23])
399      process_memory = rss_pages * 4 * 1024  # Convert to bytes
400
401    return (
402        process_memory / self.total_memory_size
403        if self.total_memory_size
404        else 0.0
405    )
406
407  def _get_process_cpu_percent(self, pid: int, interval: int = 1) -> float:
408    total_start_time = self._get_total_cpu_time(pid)
409    with open("/proc/uptime", "r") as f:
410      uptime_start = float(f.readline().split()[0])
411
412    time.sleep(interval)
413
414    total_end_time = self._get_total_cpu_time(pid)
415    with open("/proc/uptime", "r") as f:
416      uptime_end = float(f.readline().split()[0])
417
418    return (
419        (total_end_time - total_start_time) / (uptime_end - uptime_start) * 100
420    )
421
422  def _get_total_cpu_time(self, pid: int) -> float:
423    with open(f"/proc/{str(pid)}/stat", "r") as f:
424      stats = f.readline().split()
425      # utime is the 14th field in /proc/[pid]/stat measured in clock ticks.
426      utime = int(stats[13])
427      # stime is the 15th field in /proc/[pid]/stat measured in clock ticks.
428      stime = int(stats[14])
429      return (utime + stime) / os.sysconf(os.sysconf_names["SC_CLK_TCK"])
430
431  def _find_all_instances_pids(self) -> list[int]:
432    pids = []
433
434    try:
435      output = subprocess.check_output(["ps", "-ef", "--no-headers"], text=True)
436      for line in output.splitlines():
437        parts = line.split()
438        process_path = parts[7]
439        if pathlib.Path(process_path).name == "edit_monitor":
440          pid = int(parts[1])
441          if pid != self.pid:  # exclude the current process
442            pids.append(pid)
443    except Exception:
444      logging.exception(
445          "Failed to get pids of existing edit monitors from ps command."
446      )
447
448    return pids
449
450  def _send_error_event_to_clearcut(self, error_type):
451    edit_monitor_error_event_proto = edit_event_pb2.EditEvent(
452        user_name=self.user_name,
453        host_name=self.host_name,
454        source_root=self.source_root,
455    )
456    edit_monitor_error_event_proto.edit_monitor_error_event.CopyFrom(
457        edit_event_pb2.EditEvent.EditMonitorErrorEvent(error_type=error_type)
458    )
459    log_event = clientanalytics_pb2.LogEvent(
460        event_time_ms=int(time.time() * 1000),
461        source_extension=edit_monitor_error_event_proto.SerializeToString(),
462    )
463    self.cclient.log(log_event)
464