xref: /aosp_15_r20/tools/asuite/atest/logstorage/logstorage_utils.py (revision c2e18aaa1096c836b086f94603d04f4eb9cf37f5)
1# Copyright (C) 2020 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16"""Utility functions for logstorage."""
17from __future__ import print_function
18
19import logging
20import time
21import uuid
22
23from atest import atest_utils
24from atest import constants
25from atest.logstorage import atest_gcp_utils
26from atest.metrics import metrics_base
27from googleapiclient.discovery import build
28import httplib2
29from oauth2client import client as oauth2_client
30
31UPLOAD_REQUESTED_FILE_NAME = 'UPLOAD_REQUESTED'
32
33
34def is_credential_available() -> bool:
35  """Checks whether the credential needed for log upload is available."""
36  return constants.CREDENTIAL_FILE_NAME and constants.TOKEN_FILE_PATH
37
38
39def is_upload_enabled(args: dict[str, str]) -> bool:
40  """Determines whether log upload is enabled."""
41  if not is_credential_available() or not constants.GTF_TARGETS:
42    return False
43
44  config_folder_path = atest_utils.get_config_folder()
45  config_folder_path.mkdir(parents=True, exist_ok=True)
46  upload_requested_file = config_folder_path.joinpath(
47      UPLOAD_REQUESTED_FILE_NAME
48  )
49
50  is_request_upload = args.get(constants.REQUEST_UPLOAD_RESULT)
51  is_disable_upload = args.get(constants.DISABLE_UPLOAD_RESULT)
52  is_previously_requested = upload_requested_file.exists()
53
54  # Note: is_request_upload and is_disable_upload are from mutually exclusive
55  # args so they won't be True simutaniously.
56  if not is_disable_upload and is_previously_requested:  # Previously enabled
57    atest_utils.colorful_print(
58        'AnTS result uploading is enabled. (To disable, use'
59        ' --disable-upload-result flag)',
60        constants.GREEN,
61    )
62    return True
63
64  if is_request_upload and not is_previously_requested:  # First time enable
65    atest_utils.colorful_print(
66        'AnTS result uploading is switched on and will apply to the current and'
67        ' future TradeFed test runs. To disable it, run a test with the'
68        ' --disable-upload-result flag.',
69        constants.GREEN,
70    )
71    upload_requested_file.touch()
72    return True
73
74  if is_disable_upload and is_previously_requested:  # First time disable
75    atest_utils.colorful_print(
76        'AnTS result uploading is switched off and will apply to the current'
77        ' and future TradeFed test runs. To re-enable it, run a test with the'
78        ' --request-upload-result flag.',
79        constants.GREEN,
80    )
81    upload_requested_file.unlink()
82    config_folder_path.joinpath(constants.CREDENTIAL_FILE_NAME).unlink(
83        missing_ok=True
84    )
85    return False
86
87  return False
88
89
90def do_upload_flow(
91    extra_args: dict[str, str], invocation_properties: dict[str, str] = None
92) -> tuple:
93  """Run upload flow.
94
95  Asking user's decision and do the related steps.
96
97  Args:
98      extra_args: Dict of extra args to add to test run.
99      invocation_properties: Additional invocation properties to write into the
100        invocation.
101
102  Return:
103      A tuple of credential object and invocation information dict.
104  """
105  invocation_properties = invocation_properties or {}
106  return atest_gcp_utils.do_upload_flow(
107      extra_args, lambda cred: BuildClient(cred), invocation_properties
108  )
109
110
111class BuildClient:
112  """Build api helper class."""
113
114  def __init__(
115      self,
116      creds,
117      api_version=constants.STORAGE_API_VERSION,
118      url=constants.DISCOVERY_SERVICE,
119  ):
120    """Init BuildClient class.
121
122    Args:
123        creds: An oauth2client.OAuth2Credentials instance.
124    """
125    http_auth = creds.authorize(httplib2.Http())
126    self.client = build(
127        serviceName=constants.STORAGE_SERVICE_NAME,
128        version=api_version,
129        cache_discovery=False,
130        http=http_auth,
131        discoveryServiceUrl=url,
132    )
133
134  def list_branch(self):
135    """List all branch."""
136    return self.client.branch().list(maxResults=10000).execute()
137
138  def list_target(self, branch):
139    """List all target in the branch."""
140    return self.client.target().list(branch=branch, maxResults=10000).execute()
141
142  def get_branch(self, branch):
143    """Get BuildInfo for specific branch.
144
145    Args:
146        branch: A string of branch name to query.
147    """
148    query_branch = ''
149    try:
150      query_branch = self.client.branch().get(resourceId=branch).execute()
151    # pylint: disable=broad-except
152    except Exception:
153      return ''
154    return query_branch
155
156  def insert_local_build(self, external_id, target, branch):
157    """Insert a build record.
158
159    Args:
160        external_id: unique id of build record.
161        target: build target.
162        branch: build branch.
163
164    Returns:
165        A build record object.
166    """
167    body = {
168        'buildId': '',
169        'externalId': external_id,
170        'branch': branch,
171        'target': {'name': target, 'target': target},
172        'buildAttemptStatus': 'complete',
173    }
174    return self.client.build().insert(buildType='local', body=body).execute()
175
176  def insert_build_attempts(self, build_record):
177    """Insert a build attempt record.
178
179    Args:
180        build_record: build record.
181
182    Returns:
183        A build attempt object.
184    """
185    build_attempt = {'id': 0, 'status': 'complete', 'successful': True}
186    return (
187        self.client.buildattempt()
188        .insert(
189            buildId=build_record['buildId'],
190            target=build_record['target']['name'],
191            body=build_attempt,
192        )
193        .execute()
194    )
195
196  def insert_invocation(
197      self, build_record: dict[str, str], invocation_properties: dict[str, str]
198  ):
199    """Insert a build invocation record.
200
201    Args:
202        build_record: build record.
203        invocation_properties: Additional invocation properties to write into
204          the invocation.
205
206    Returns:
207        A build invocation object.
208    """
209    sponge_invocation_id = str(uuid.uuid4())
210    user_email = metrics_base.get_user_email()
211    invocation = {
212        'primaryBuild': {
213            'buildId': build_record['buildId'],
214            'buildTarget': build_record['target']['name'],
215            'branch': build_record['branch'],
216        },
217        'schedulerState': 'running',
218        'runner': 'atest',
219        'scheduler': 'atest',
220        'users': [user_email],
221        'properties': [
222            {
223                'name': 'sponge_invocation_id',
224                'value': sponge_invocation_id,
225            },
226            {
227                'name': 'test_uri',
228                'value': f'{constants.STORAGE2_TEST_URI}{sponge_invocation_id}',
229            },
230        ] + [
231            {'name': key, 'value': value}
232            for key, value in invocation_properties.items()
233        ],
234    }
235    return self.client.invocation().insert(body=invocation).execute()
236
237  def update_invocation(self, invocation):
238    """Insert a build invocation record.
239
240    Args:
241        invocation: invocation record.
242
243    Returns:
244        A invocation object.
245    """
246    # Because invocation revision will be update by TF, we need to fetch
247    # latest invocation revision to update status correctly.
248    count = 0
249    invocations = None
250    while count < 5:
251      invocations = (
252          self.client.invocation()
253          .list(invocationId=invocation['invocationId'], maxResults=10)
254          .execute()
255          .get('invocations', [])
256      )
257      if invocations:
258        break
259      time.sleep(0.5)
260      count = count + 1
261    if invocations:
262      latest_revision = invocations[-1].get('revision', '')
263      if latest_revision:
264        logging.debug(
265            'Get latest_revision:%s from invocations:%s',
266            latest_revision,
267            invocations,
268        )
269        invocation['revision'] = latest_revision
270    return (
271        self.client.invocation()
272        .update(resourceId=invocation['invocationId'], body=invocation)
273        .execute()
274    )
275
276  def insert_work_unit(self, invocation_record):
277    """Insert a workunit record.
278
279    Args:
280        invocation_record: invocation record.
281
282    Returns:
283        the workunit object.
284    """
285    workunit = {'invocationId': invocation_record['invocationId']}
286    return self.client.workunit().insert(body=workunit).execute()
287