xref: /aosp_15_r20/external/autotest/utils/frozen_chromite/utils/outcap.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# -*- coding: utf-8 -*-
2# Copyright 2019 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Tools for capturing program output at a low level.
7
8Mostly useful for capturing stdout/stderr as directly assigning to those
9variables won't work everywhere.
10"""
11
12from __future__ import print_function
13
14import os
15import re
16import sys
17import tempfile
18
19
20class _FdCapturer(object):
21  """Helper class to capture output at the file descriptor level.
22
23  This is meant to be used with sys.stdout or sys.stderr. By capturing
24  file descriptors, this will also intercept subprocess output, which
25  reassigning sys.stdout or sys.stderr will not do.
26
27  Output will only be captured, it will no longer be printed while
28  the capturer is active.
29  """
30
31  def __init__(self, source, output=None):
32    """Construct the _FdCapturer object.
33
34    Does not start capturing until Start() is called.
35
36    Args:
37      source: A file object to capture. Typically sys.stdout or
38        sys.stderr, but will work with anything that implements flush()
39        and fileno().
40      output: A file name where the captured output is to be stored. If None,
41        then the output will be stored to a temporary file.
42    """
43    self._source = source
44    self._captured = ''
45    self._saved_fd = None
46    self._tempfile = None
47    self._capturefile = None
48    self._capturefile_reader = None
49    self._capturefile_name = output
50
51  def _SafeCreateTempfile(self, tempfile_obj):
52    """Ensure that the tempfile is created safely.
53
54    (1) Stash away a reference to the tempfile.
55    (2) Unlink the file from the filesystem.
56
57    (2) ensures that if we crash, the file gets deleted. (1) ensures that while
58    we are running, we hold a reference to the file so the system does not close
59    the file.
60
61    Args:
62      tempfile_obj: A tempfile object.
63    """
64    self._tempfile = tempfile_obj
65    os.unlink(tempfile_obj.name)
66
67  def Start(self):
68    """Begin capturing output."""
69    if self._capturefile_name is None:
70      tempfile_obj = tempfile.NamedTemporaryFile(delete=False)
71      self._capturefile = tempfile_obj.file
72      self._capturefile_name = tempfile_obj.name
73      self._capturefile_reader = open(self._capturefile_name)
74      self._SafeCreateTempfile(tempfile_obj)
75    else:
76      # Open file passed in for writing. Set buffering=1 for line level
77      # buffering.
78      self._capturefile = open(self._capturefile_name, 'w', buffering=1)
79      self._capturefile_reader = open(self._capturefile_name)
80    # Save the original fd so we can revert in Stop().
81    self._saved_fd = os.dup(self._source.fileno())
82    os.dup2(self._capturefile.fileno(), self._source.fileno())
83
84  def Stop(self):
85    """Stop capturing output."""
86    self.GetCaptured()
87    if self._saved_fd is not None:
88      os.dup2(self._saved_fd, self._source.fileno())
89      os.close(self._saved_fd)
90      self._saved_fd = None
91    # If capturefile and capturefile_reader exist, close them as they were
92    # opened in self.Start().
93    if self._capturefile_reader is not None:
94      self._capturefile_reader.close()
95      self._capturefile_reader = None
96    if self._capturefile is not None:
97      self._capturefile.close()
98      self._capturefile = None
99
100  def GetCaptured(self):
101    """Return all output captured up to this point.
102
103    Can be used while capturing or after Stop() has been called.
104    """
105    self._source.flush()
106    if self._capturefile_reader is not None:
107      self._captured += self._capturefile_reader.read()
108    return self._captured
109
110  def ClearCaptured(self):
111    """Erase all captured output."""
112    self.GetCaptured()
113    self._captured = ''
114
115
116class OutputCapturer(object):
117  """Class for capturing stdout/stderr output.
118
119  Class is designed as a 'ContextManager'.
120
121  Examples:
122    with cros_build_lib.OutputCapturer() as output:
123      # Capturing of stdout/stderr automatically starts now.
124      # Do stuff that sends output to stdout/stderr.
125      # Capturing automatically stops at end of 'with' block.
126
127    # stdout/stderr can be retrieved from the OutputCapturer object:
128    stdout = output.GetStdoutLines() # Or other access methods
129
130    # Some Assert methods are only valid if capturing was used in test.
131    self.AssertOutputContainsError() # Or other related methods
132
133    # OutputCapturer can also be used to capture output to specified files.
134    with self.OutputCapturer(stdout_path='/tmp/stdout.txt') as output:
135      # Do stuff.
136      # stdout will be captured to /tmp/stdout.txt.
137  """
138
139  OPER_MSG_SPLIT_RE = re.compile(r'^\033\[1;.*?\033\[0m$|^[^\n]*$',
140                                 re.DOTALL | re.MULTILINE)
141
142  __slots__ = ['_stdout_capturer', '_stderr_capturer', '_quiet_fail']
143
144  def __init__(self, stdout_path=None, stderr_path=None, quiet_fail=False):
145    """Initalize OutputCapturer with capture files.
146
147    If OutputCapturer is initialized with filenames to capture stdout and stderr
148    to, then those files are used. Otherwise, temporary files are created.
149
150    Args:
151      stdout_path: File to capture stdout to. If None, a temporary file is used.
152      stderr_path: File to capture stderr to. If None, a temporary file is used.
153      quiet_fail: If True fail quietly without printing the captured stdout and
154        stderr.
155    """
156    self._stdout_capturer = _FdCapturer(sys.stdout, output=stdout_path)
157    self._stderr_capturer = _FdCapturer(sys.stderr, output=stderr_path)
158    self._quiet_fail = quiet_fail
159
160  def __enter__(self):
161    # This method is called with entering 'with' block.
162    self.StartCapturing()
163    return self
164
165  def __exit__(self, exc_type, exc_val, exc_tb):
166    # This method is called when exiting 'with' block.
167    self.StopCapturing()
168
169    if exc_type and not self._quiet_fail:
170      print('Exception during output capturing: %r' % (exc_val,))
171      stdout = self.GetStdout()
172      if stdout:
173        print('Captured stdout was:\n%s' % stdout)
174      else:
175        print('No captured stdout')
176      stderr = self.GetStderr()
177      if stderr:
178        print('Captured stderr was:\n%s' % stderr)
179      else:
180        print('No captured stderr')
181
182  def StartCapturing(self):
183    """Begin capturing stdout and stderr."""
184    self._stdout_capturer.Start()
185    self._stderr_capturer.Start()
186
187  def StopCapturing(self):
188    """Stop capturing stdout and stderr."""
189    self._stdout_capturer.Stop()
190    self._stderr_capturer.Stop()
191
192  def ClearCaptured(self):
193    """Clear any captured stdout/stderr content."""
194    self._stdout_capturer.ClearCaptured()
195    self._stderr_capturer.ClearCaptured()
196
197  def GetStdout(self):
198    """Return captured stdout so far."""
199    return self._stdout_capturer.GetCaptured()
200
201  def GetStderr(self):
202    """Return captured stderr so far."""
203    return self._stderr_capturer.GetCaptured()
204
205  def _GetOutputLines(self, output, include_empties):
206    """Split |output| into lines, optionally |include_empties|.
207
208    Return array of lines.
209    """
210
211    lines = self.OPER_MSG_SPLIT_RE.findall(output)
212    if not include_empties:
213      lines = [ln for ln in lines if ln]
214
215    return lines
216
217  def GetStdoutLines(self, include_empties=True):
218    """Return captured stdout so far as array of lines.
219
220    If |include_empties| is false filter out all empty lines.
221    """
222    return self._GetOutputLines(self.GetStdout(), include_empties)
223
224  def GetStderrLines(self, include_empties=True):
225    """Return captured stderr so far as array of lines.
226
227    If |include_empties| is false filter out all empty lines.
228    """
229    return self._GetOutputLines(self.GetStderr(), include_empties)
230