1# Copyright 2017 The Abseil Authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Helper script for logging_functional_test."""
16
17import logging as std_logging
18import logging.config as std_logging_config
19import os
20import sys
21import threading
22import time
23import timeit
24from unittest import mock
25
26from absl import app
27from absl import flags
28from absl import logging
29
30FLAGS = flags.FLAGS
31
32
33class VerboseDel(object):
34  """Dummy class to test __del__ running."""
35
36  def __init__(self, msg):
37    self._msg = msg
38
39  def __del__(self):
40    sys.stderr.write(self._msg)
41    sys.stderr.flush()
42
43
44def _test_do_logging():
45  """Do some log operations."""
46  logging.vlog(3, 'This line is VLOG level 3')
47  logging.vlog(2, 'This line is VLOG level 2')
48  logging.log(2, 'This line is log level 2')
49  if logging.vlog_is_on(2):
50    logging.log(1, 'VLOG level 1, but only if VLOG level 2 is active')
51
52  logging.vlog(1, 'This line is VLOG level 1')
53  logging.log(1, 'This line is log level 1')
54  logging.debug('This line is DEBUG')
55
56  logging.vlog(0, 'This line is VLOG level 0')
57  logging.log(0, 'This line is log level 0')
58  logging.info('Interesting Stuff\0')
59  logging.info('Interesting Stuff with Arguments: %d', 42)
60  logging.info('%(a)s Stuff with %(b)s',
61               {'a': 'Interesting', 'b': 'Dictionary'})
62
63  with mock.patch.object(timeit, 'default_timer') as mock_timer:
64    mock_timer.return_value = 0
65    while timeit.default_timer() < 9:
66      logging.log_every_n_seconds(logging.INFO, 'This should appear 5 times.',
67                                  2)
68      mock_timer.return_value = mock_timer() + .2
69
70  for i in range(1, 5):
71    logging.log_first_n(logging.INFO, 'Info first %d of %d', 2, i, 2)
72    logging.log_every_n(logging.INFO, 'Info %d (every %d)', 3, i, 3)
73
74  logging.vlog(-1, 'This line is VLOG level -1')
75  logging.log(-1, 'This line is log level -1')
76  logging.warning('Worrying Stuff')
77  for i in range(1, 5):
78    logging.log_first_n(logging.WARNING, 'Warn first %d of %d', 2, i, 2)
79    logging.log_every_n(logging.WARNING, 'Warn %d (every %d)', 3, i, 3)
80
81  logging.vlog(-2, 'This line is VLOG level -2')
82  logging.log(-2, 'This line is log level -2')
83  try:
84    raise OSError('Fake Error')
85  except OSError:
86    saved_exc_info = sys.exc_info()
87    logging.exception('An Exception %s')
88    logging.exception('Once more, %(reason)s', {'reason': 'just because'})
89    logging.error('Exception 2 %s', exc_info=True)
90    logging.error('Non-exception', exc_info=False)
91
92  try:
93    sys.exc_clear()
94  except AttributeError:
95    # No sys.exc_clear() in Python 3, but this will clear sys.exc_info() too.
96    pass
97
98  logging.error('Exception %s', '3', exc_info=saved_exc_info)
99  logging.error('No traceback', exc_info=saved_exc_info[:2] + (None,))
100
101  logging.error('Alarming Stuff')
102  for i in range(1, 5):
103    logging.log_first_n(logging.ERROR, 'Error first %d of %d', 2, i, 2)
104    logging.log_every_n(logging.ERROR, 'Error %d (every %d)', 3, i, 3)
105  logging.flush()
106
107
108def _test_fatal_main_thread_only():
109  """Test logging.fatal from main thread, no other threads running."""
110  v = VerboseDel('fatal_main_thread_only main del called\n')
111  try:
112    logging.fatal('fatal_main_thread_only message')
113  finally:
114    del v
115
116
117def _test_fatal_with_other_threads():
118  """Test logging.fatal from main thread, other threads running."""
119
120  lock = threading.Lock()
121  lock.acquire()
122
123  def sleep_forever(lock=lock):
124    v = VerboseDel('fatal_with_other_threads non-main del called\n')
125    try:
126      lock.release()
127      while True:
128        time.sleep(10000)
129    finally:
130      del v
131
132  v = VerboseDel('fatal_with_other_threads main del called\n')
133  try:
134    # Start new thread
135    t = threading.Thread(target=sleep_forever)
136    t.start()
137
138    # Wait for other thread
139    lock.acquire()
140    lock.release()
141
142    # Die
143    logging.fatal('fatal_with_other_threads message')
144    while True:
145      time.sleep(10000)
146  finally:
147    del v
148
149
150def _test_fatal_non_main_thread():
151  """Test logging.fatal from non main thread."""
152
153  lock = threading.Lock()
154  lock.acquire()
155
156  def die_soon(lock=lock):
157    v = VerboseDel('fatal_non_main_thread non-main del called\n')
158    try:
159      # Wait for signal from other thread
160      lock.acquire()
161      lock.release()
162      logging.fatal('fatal_non_main_thread message')
163      while True:
164        time.sleep(10000)
165    finally:
166      del v
167
168  v = VerboseDel('fatal_non_main_thread main del called\n')
169  try:
170    # Start new thread
171    t = threading.Thread(target=die_soon)
172    t.start()
173
174    # Signal other thread
175    lock.release()
176
177    # Wait for it to die
178    while True:
179      time.sleep(10000)
180  finally:
181    del v
182
183
184def _test_critical_from_non_absl_logger():
185  """Test CRITICAL logs from non-absl loggers."""
186
187  std_logging.critical('A critical message')
188
189
190def _test_register_frame_to_skip():
191  """Test skipping frames for line number reporting."""
192
193  def _getline():
194
195    def _getline_inner():
196      return logging.get_absl_logger().findCaller()[1]
197
198    return _getline_inner()
199
200  # Check register_frame_to_skip function to see if log frame skipping works.
201  line1 = _getline()
202  line2 = _getline()
203  logging.get_absl_logger().register_frame_to_skip(__file__, '_getline')
204  line3 = _getline()
205  # Both should be line number of the _getline_inner() call.
206  assert (line1 == line2), (line1, line2)
207  # line3 should be a line number in this function.
208  assert (line2 != line3), (line2, line3)
209
210
211def _test_flush():
212  """Test flush in various difficult cases."""
213  # Flush, but one of the logfiles is closed
214  log_filename = os.path.join(FLAGS.log_dir, 'a_thread_with_logfile.txt')
215  with open(log_filename, 'w') as log_file:
216    logging.get_absl_handler().python_handler.stream = log_file
217  logging.flush()
218
219
220def _test_stderrthreshold():
221  """Tests modifying --stderrthreshold after flag parsing will work."""
222
223  def log_things():
224    logging.debug('FLAGS.stderrthreshold=%s, debug log', FLAGS.stderrthreshold)
225    logging.info('FLAGS.stderrthreshold=%s, info log', FLAGS.stderrthreshold)
226    logging.warning('FLAGS.stderrthreshold=%s, warning log',
227                    FLAGS.stderrthreshold)
228    logging.error('FLAGS.stderrthreshold=%s, error log', FLAGS.stderrthreshold)
229
230  FLAGS.stderrthreshold = 'debug'
231  log_things()
232  FLAGS.stderrthreshold = 'info'
233  log_things()
234  FLAGS.stderrthreshold = 'warning'
235  log_things()
236  FLAGS.stderrthreshold = 'error'
237  log_things()
238
239
240def _test_std_logging():
241  """Tests logs from std logging."""
242  std_logging.debug('std debug log')
243  std_logging.info('std info log')
244  std_logging.warning('std warning log')
245  std_logging.error('std error log')
246
247
248def _test_bad_exc_info():
249  """Tests when a bad exc_info valud is provided."""
250  logging.info('Bad exc_info', exc_info=(None, None))
251
252
253def _test_none_exc_info():
254  """Tests when exc_info is requested but not available."""
255  # Clear exc_info first.
256  try:
257    sys.exc_clear()
258  except AttributeError:
259    # No sys.exc_clear() in Python 3, but this will clear sys.exc_info() too.
260    pass
261  logging.info('None exc_info', exc_info=True)
262
263
264def _test_unicode():
265  """Tests unicode handling."""
266
267  test_names = []
268
269  def log(name, msg, *args):
270    """Logs the message, and ensures the same name is not logged again."""
271    assert name not in test_names, ('test_unicode expects unique names to work,'
272                                    ' found existing name {}').format(name)
273    test_names.append(name)
274
275    # Add line separators so that tests can verify the output for each log
276    # message.
277    sys.stderr.write('-- begin {} --\n'.format(name))
278    logging.info(msg, *args)
279    sys.stderr.write('-- end {} --\n'.format(name))
280
281  log('unicode', u'G\u00eete: Ch\u00e2tonnaye')
282  log('unicode % unicode', u'G\u00eete: %s', u'Ch\u00e2tonnaye')
283  log('bytes % bytes', u'G\u00eete: %s'.encode('utf-8'),
284      u'Ch\u00e2tonnaye'.encode('utf-8'))
285  log('unicode % bytes', u'G\u00eete: %s', u'Ch\u00e2tonnaye'.encode('utf-8'))
286  log('bytes % unicode', u'G\u00eete: %s'.encode('utf-8'), u'Ch\u00e2tonnaye')
287  log('unicode % iso8859-15', u'G\u00eete: %s',
288      u'Ch\u00e2tonnaye'.encode('iso-8859-15'))
289  log('str % exception', 'exception: %s', Exception(u'Ch\u00e2tonnaye'))
290
291
292def main(argv):
293  del argv  # Unused.
294
295  test_name = os.environ.get('TEST_NAME', None)
296  test_fn = globals().get('_test_%s' % test_name)
297  if test_fn is None:
298    raise AssertionError('TEST_NAME must be set to a valid value')
299  # Flush so previous messages are written to file before we switch to a new
300  # file with use_absl_log_file.
301  logging.flush()
302  if os.environ.get('USE_ABSL_LOG_FILE') == '1':
303    logging.get_absl_handler().use_absl_log_file('absl_log_file', FLAGS.log_dir)
304
305  test_fn()
306
307
308if __name__ == '__main__':
309  sys.argv[0] = 'py_argv_0'
310  if os.environ.get('CALL_DICT_CONFIG') == '1':
311    std_logging_config.dictConfig({'version': 1})
312  app.run(main)
313