# Copyright (C) 2024 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import pathlib import unittest from unittest.mock import patch from atest import constants from atest.logstorage import log_uploader from pyfakefs import fake_filesystem_unittest class LogUploaderModuleTest(unittest.TestCase): def setUp(self): super().setUp() @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt') @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt') @patch.dict( os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: '0'} ) def test_is_uploading_logs_flag_value_unrecognized_returns_false(self): gcert_checker = lambda: True result = log_uploader.is_uploading_logs(gcert_checker) self.assertFalse(result) @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt') @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt') @patch.dict( os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: '1'} ) def test_is_uploading_logs_flag_value_1_returns_true(self): gcert_checker = lambda: True result = log_uploader.is_uploading_logs(gcert_checker) self.assertTrue(result) @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt') @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt') @patch.dict( os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: 'TRUE'} ) def test_is_uploading_logs_flag_value_capitalized_true_returns_true(self): gcert_checker = lambda: True result = log_uploader.is_uploading_logs(gcert_checker) self.assertTrue(result) @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt') @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt') @patch.dict( os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: 'true'} ) def test_is_uploading_logs_flag_value_true_process_returns_true(self): gcert_checker = lambda: True result = log_uploader.is_uploading_logs(gcert_checker) self.assertTrue(result) @patch('atest.constants.CREDENTIAL_FILE_NAME', None) @patch('atest.constants.TOKEN_FILE_PATH', None) @patch.dict( os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: 'true'} ) def test_is_uploading_logs_no_creds_process_returns_false(self): gcert_checker = lambda: True result = log_uploader.is_uploading_logs(gcert_checker) self.assertFalse(result) @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt') @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt') @patch.dict( os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: 'false'} ) def test_is_uploading_logs_not_requested_process_returns_false(self): gcert_checker = lambda: True result = log_uploader.is_uploading_logs(gcert_checker) self.assertFalse(result) @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt') @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt') @patch.dict( os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: ''} ) def test_is_uploading_logs_returns_true_by_default(self): gcert_checker = lambda: True result = log_uploader.is_uploading_logs(gcert_checker) self.assertTrue(result) @patch('atest.constants.CREDENTIAL_FILE_NAME', 'creds.txt') @patch('atest.constants.TOKEN_FILE_PATH', 'token.txt') @patch.dict( os.environ, {log_uploader._ENABLE_ATEST_LOG_UPLOADING_ENV_KEY: ''} ) def test_is_uploading_logs_gcert_not_available_returns_false(self): gcert_checker = lambda: False result = log_uploader.is_uploading_logs(gcert_checker) self.assertFalse(result) class LogUploaderTest(fake_filesystem_unittest.TestCase): def setUp(self): super().setUp() self.setUpPyfakefs() def test_upload_single_file_contains_name(self): file_path = pathlib.Path('/dir/some_name.log') self.fs.create_file(file_path) fake_client = self._FakeUploadingClient() log_uploader._LogUploadSession( 'any_run_id', fake_client ).upload_single_file(file_path) self.assertEqual( fake_client.get_received_upload_artifact_arguments()[0]['resource_id'], file_path.name, ) def test_upload_single_file_metadata_content_type(self): file_path = pathlib.Path('/dir/some_name.log') self.fs.create_file(file_path) fake_client = self._FakeUploadingClient() log_uploader._LogUploadSession( 'any_run_id', fake_client ).upload_single_file(file_path) self.assertEqual( fake_client.get_received_upload_artifact_arguments()[0]['metadata'][ 'artifactType' ], 'HOST_LOG', ) def test_create_artifact_metadata_use_file_name(self): file_path = pathlib.Path('/dir/some_name.log') self.fs.create_file(file_path) fake_client = self._FakeUploadingClient() log_uploader._LogUploadSession( 'any_run_id', fake_client ).upload_single_file(file_path) self.assertEqual( fake_client.get_received_upload_artifact_arguments()[0]['metadata'][ 'name' ], file_path.name, ) def test_upload_single_file_duplicate_names_unique_resource_id(self): dir_path = pathlib.Path('/test_dir') file_name = 'some_name.log' file_path1 = dir_path.joinpath(file_name) file_path2 = dir_path.joinpath('some_sub_dir').joinpath(file_name) self.fs.create_file(file_path1) self.fs.create_file(file_path2) fake_client = self._FakeUploadingClient() suj = log_uploader._LogUploadSession('any_run_id', fake_client) suj.upload_single_file(file_path1) suj.upload_single_file(file_path2) self.assertNotEqual( fake_client.get_received_upload_artifact_arguments()[0]['resource_id'], fake_client.get_received_upload_artifact_arguments()[1]['resource_id'], ) def test_upload_single_file_duplicate_names_preserve_stem_and_suffix(self): dir_path = pathlib.Path('/test_dir') file_name = 'filename.extension' file_path1 = dir_path.joinpath(file_name) file_path2 = dir_path.joinpath('some_sub_dir').joinpath(file_name) self.fs.create_file(file_path1) self.fs.create_file(file_path2) fake_client = self._FakeUploadingClient() suj = log_uploader._LogUploadSession('any_run_id', fake_client) suj.upload_single_file(file_path1) suj.upload_single_file(file_path2) self.assertRegex( fake_client.get_received_upload_artifact_arguments()[1]['resource_id'], r'^filename.+\.extension', ) def test_upload_directory_does_not_upload_empty_directory(self): empty_dir = pathlib.Path('/test_dir') self.fs.create_dir(empty_dir) fake_client = self._FakeUploadingClient() log_uploader._LogUploadSession('any_run_id', fake_client).upload_directory( empty_dir ) self.assertEqual( len(fake_client.get_received_upload_artifact_arguments()), 0 ) def test_get_file_paths_single_file(self): dir_path = pathlib.Path('/test_dir') file_path = dir_path.joinpath('file.txt') self.fs.create_file(file_path) fake_client = self._FakeUploadingClient() log_uploader._LogUploadSession('any_run_id', fake_client).upload_directory( dir_path ) self.assertEqual( len(fake_client.get_received_upload_artifact_arguments()), 1 ) def test_get_file_paths_multiple_files(self): dir_path = pathlib.Path('/test_dir') file_path1 = dir_path.joinpath('file1.txt') file_path2 = dir_path.joinpath('file2.txt') self.fs.create_file(file_path1) self.fs.create_file(file_path2) fake_client = self._FakeUploadingClient() log_uploader._LogUploadSession('any_run_id', fake_client).upload_directory( dir_path ) self.assertEqual( len(fake_client.get_received_upload_artifact_arguments()), 2 ) def test_get_file_paths_nested_directories(self): dir_path = pathlib.Path('/test_dir') file_path = dir_path.joinpath('test_dir2/file2.txt') self.fs.create_file(file_path) fake_client = self._FakeUploadingClient() log_uploader._LogUploadSession('any_run_id', fake_client).upload_directory( dir_path ) self.assertEqual( fake_client.get_received_upload_artifact_arguments()[0][ 'artifact_path' ], file_path, ) def test_get_file_paths_symbolic_link_file(self): dir_path = pathlib.Path('/test_dir') link_path = dir_path.joinpath('link.txt') target_path = pathlib.Path('/test_dir2/file.txt') file_content = 'some content' self.fs.create_file(target_path, contents=file_content) self.fs.create_symlink(link_path, target_path) fake_client = self._FakeUploadingClient() log_uploader._LogUploadSession('any_run_id', fake_client).upload_directory( dir_path ) self.assertEqual( fake_client.get_received_upload_artifact_arguments()[0][ 'artifact_path' ], link_path, ) def test_get_file_paths_symbolic_link_directory(self): # Create a directory with a file and a symbolic link to it dir_path = pathlib.Path('/test_dir') link_path = dir_path.joinpath('link_dir') target_path = pathlib.Path('/test_dir2') file_name = 'file.txt' file_content = 'some content' self.fs.create_file(target_path.joinpath(file_name), contents=file_content) self.fs.create_symlink(link_path, target_path) fake_client = self._FakeUploadingClient() log_uploader._LogUploadSession('any_run_id', fake_client).upload_directory( dir_path ) self.assertEqual( fake_client.get_received_upload_artifact_arguments()[0][ 'artifact_path' ], link_path.joinpath(file_name), ) class _FakeUploadingClient(log_uploader._SimpleUploadingClient): def __init__(self): self._received_upload_artifact_arguments = [] # TODO(yuexima): This is basically a mock, switch to use mock directly. def get_received_upload_artifact_arguments(self): return self._received_upload_artifact_arguments def upload_artifact( self, resource_id: str, metadata: dict[str, str], artifact_path: pathlib.Path, num_of_retries, ) -> None: self._received_upload_artifact_arguments.append({ 'resource_id': resource_id, 'metadata': metadata, 'artifact_path': artifact_path, }) if __name__ == '__main__': unittest.main()