xref: /aosp_15_r20/cts/common/device-side/bedstead/btest/btest (revision b7c941bb3fa97aba169d73cee0bed2de8ac964bf)
1#!/usr/bin/env python3
2#
3# Copyright (C) 2022 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the 'License');
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#    http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an 'AS IS' BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16import sys, argparse, os
17import subprocess
18import re
19import queue
20from threading  import Thread
21import itertools
22import time
23
24class CurrentUserState:
25    def __init__(self, args):
26        self.args = args
27        self.current_user = get_current_user(args)
28
29    def name(self):
30        return "RUN_ON_CURRENT_USER"
31
32    def is_active(self, device_state):
33        return True
34
35    def include_annotations(self, args):
36        return []
37
38    def initialise(self, device_state):
39        pass
40
41    def enter(self, device_state):
42        debug(self.args, "[Test] Entering state " + self.name())
43
44    def get_user(self):
45        return self.current_user
46
47    def all_supported_annotations(self, args):
48        return self.include_annotations(args)
49
50class SystemUserState:
51    def __init__(self, args):
52        self.args = args
53
54    def name(self):
55        return "RUN_ON_SYSTEM_USER"
56
57    def is_active(self, device_state):
58        return device_state["current_user"] == 0
59
60    def include_annotations(self, args):
61        if args.headless:
62            # We want to skip all of the RequireRunOnPrimaryUser ones which get assumption failed
63            return ["com.android.bedstead.multiuser.annotations.RequireRunOnSystemUser"]
64        return []
65
66    def initialise(self, device_state):
67        pass
68
69    def enter(self, device_state):
70        debug(self.args, "[Test] Entering state " + self.name())
71        execute_shell_command("Test", self.args, ["adb", "shell", "am", "switch-user", "0"])
72
73    def get_user(self):
74        return 0
75
76    def all_supported_annotations(self, args):
77        return self.include_annotations(args)
78
79class SecondaryUserState:
80    def __init__(self, args):
81        self.args = args
82
83    def name(self):
84        return "RUN_ON_SECONDARY_USER"
85
86    def is_active(self, device_state):
87        if not is_secondary_user(device_state, device_state["users"][device_state["current_user"]]):
88            return False
89        if not self.args.headless:
90            return True
91
92        secondary_user_id = get_or_create_secondary_user(device_state, self.args)
93        return device_state["current_user"] == secondary_user_id
94
95    def include_annotations(self, args):
96        return ["com.android.bedstead.multiuser.annotations.RequireRunOnSecondaryUser"]
97
98    def initialise(self, device_state):
99        self.user_id = device_state["current_user"]
100
101    def enter(self, device_state):
102        debug(self.args, "[Test] Entering state " + self.name())
103
104        self.user_id = get_or_create_secondary_user(device_state, self.args)
105        execute_shell_command("Test", self.args, ["adb", "shell", "am", "switch-user", str(self.user_id)])
106        for module in self.args.modules:
107            execute_shell_command("Test", self.args, ["adb", "shell", "pm", "install-existing", "--user", str(self.user_id), supported_modules[module][PACKAGE_NAME]])
108
109            for additional_app in supported_modules[module].get(ADDITIONAL_APPS, []):
110                command = ["adb", "install-existing", "--user", str(self.user_id), additional_app[PACKAGE_NAME]]
111                execute_shell_command("Test", self.args, command, shell=True, executable="/bin/bash")
112
113    def get_user(self):
114        return self.user_id
115
116    def all_supported_annotations(self, args):
117        return self.include_annotations(args)
118
119class AdditionalUserState:
120    """ This state is only useful for headless devices. """
121    def __init__(self, args):
122        self.args = args
123
124    def name(self):
125        return "RUN_ON_ADDITIONAL_USER"
126
127    def is_active(self, device_state):
128        return is_additional_user(device_state, device_state["users"][device_state["current_user"]])
129
130    def include_annotations(self, args):
131        return ["com.android.bedstead.multiuser.annotations.RequireRunOnAdditionalUser"]
132
133    def initialise(self, device_state):
134        self.user_id = device_state["current_user"]
135
136    def enter(self, device_state):
137        debug(self.args, "[Test] Entering state " + self.name())
138
139        self.user_id = get_or_create_additional_user(device_state, self.args)
140        execute_shell_command("Test", self.args, ["adb", "shell", "am", "switch-user", str(self.user_id)])
141        for module in self.args.modules:
142            execute_shell_command("Test", self.args, ["adb", "shell", "pm", "install-existing", "--user", str(self.user_id), supported_modules[module][PACKAGE_NAME]])
143
144            for additional_app in supported_modules[module].get(ADDITIONAL_APPS, []):
145                command = ["adb", "install-existing", "--user", str(self.user_id), additional_app[PACKAGE_NAME]]
146                execute_shell_command("Test", self.args, command, shell=True, executable="/bin/bash")
147
148    def get_user(self):
149        return self.user_id
150
151    def all_supported_annotations(self, args):
152        return self.include_annotations(args) + ["com.android.bedstead.multiuser.annotations.RequireRunOnSecondaryUser"]
153
154class WorkProfileState:
155    def __init__(self, args):
156        self.args = args
157
158    def name(self):
159        return "RUN_ON_WORK_PROFILE"
160
161    def is_active(self, device_state):
162        if self.args.headless:
163            if device_state["current_user"] == 0:
164                return False
165        else:
166            if not device_state["current_user"] == 0:
167                return False
168        return self._has_work_profile(device_state["users"][device_state["current_user"]])
169
170    def include_annotations(self, args):
171        return ["com.android.bedstead.enterprise.annotations.RequireRunOnWorkProfile"]
172
173    def initialise(self, device_state):
174        self.user_id = device_state["users"][device_state["current_user"]]["work_profile_id"]
175
176    def enter(self, device_state):
177        debug(self.args, "[Test] Entering state " + self.name())
178        user = self._get_or_create_work_profile(device_state)
179        self.user_id = user["id"]
180        execute_shell_command("Test", self.args, ["adb", "shell", "am", "switch-user", str(user["parent"])])
181        execute_shell_command("Test", self.args, ["adb", "shell", "am", "start-user ", str(self.user_id)])
182        for module in self.args.modules:
183            execute_shell_command("Test", self.args, ["adb", "shell", "pm", "install-existing", "--user", str(self.user_id), supported_modules[module][PACKAGE_NAME]])
184
185            for additional_app in supported_modules[module].get(ADDITIONAL_APPS, []):
186                command = ["adb", "install-existing", "--user", str(self.user_id), additional_app[PACKAGE_NAME]]
187                execute_shell_command("Test", self.args, command, shell=True, executable="/bin/bash")
188
189    def _get_or_create_work_profile(self, device_state):
190        users = get_users(self.args)
191        for user in users.values():
192            if self._is_work_profile(user):
193                return user
194
195        parent_id = 0 if not self.args.headless else get_or_create_secondary_user(device_state, self.args)
196
197        work_profile_id = create_work_profile(device_state, self.args, parent_id)
198        return {"id": work_profile_id, "type": "profile.MANAGED", "flags": None, "parent": str(parent_id)}
199
200    def get_user(self):
201        return self.user_id
202
203    def _has_work_profile(self, user):
204        return "work_profile_id" in user
205
206    def _is_work_profile(self, user):
207        return user["type"] == "profile.MANAGED"
208
209    def all_supported_annotations(self, args):
210        return self.include_annotations(args)
211
212class CloneProfileState:
213    def __init__(self, args):
214        self.args = args
215
216    def name(self):
217        return "RUN_ON_CLONE_PROFILE"
218
219    def is_active(self, device_state):
220        if not is_clone_profile(device_state, device_state["users"][device_state["current_user"]]):
221            return False
222        if self.args.headless:
223            return False
224        return self._has_clone_profile(device_state["users"][device_state["current_user"]])
225
226    def include_annotations(self, args):
227        return ["com.android.bedstead.multiuser.annotations.RequireRunOnCloneProfile"]
228
229    def initialise(self, device_state):
230        self.user_id = device_state["users"][device_state["current_user"]]["clone_profile_id"]
231
232    def enter(self, device_state):
233        debug(self.args, "[Test] Entering state " + self.name())
234        user = self._get_or_create_clone_profile(device_state)
235        debug(self.args, "[Test] clone is " + str(user))
236        self.user_id = user["id"]
237        execute_shell_command("Test", self.args, ["adb", "shell", "am", "switch-user", str(user["parent"])])
238        execute_shell_command("Test", self.args, ["adb", "shell", "am", "start-user ", str(self.user_id)])
239        for module in self.args.modules:
240            execute_shell_command("Test", self.args, ["adb", "shell", "pm", "install-existing", "--user", str(self.user_id), supported_modules[module][PACKAGE_NAME]])
241
242            for additional_app in supported_modules[module].get(ADDITIONAL_APPS, []):
243                command = ["adb", "install-existing", "--user", str(self.user_id), additional_app[PACKAGE_NAME]]
244                execute_shell_command("Test", self.args, command, shell=True, executable="/bin/bash")
245
246    def _get_or_create_clone_profile(self, device_state):
247        users = get_users(self.args)
248        for user in users.values():
249            if is_clone_profile(device_state, user):
250                return user
251
252        parent_id = 0 # Clone user only supported on system user only as of now.
253
254        clone_profile_id = create_clone_profile(device_state, self.args, parent_id)
255        return {"id": clone_profile_id, "type": "profile.CLONE", "flags": None, "parent": str(parent_id)}
256
257    def get_user(self):
258        return self.user_id
259
260    def _has_clone_profile(self, user):
261        return "clone_profile_id" in user
262
263    def all_supported_annotations(self, args):
264        return self.include_annotations(args)
265
266
267class PrivateProfileState:
268    def __init__(self, args):
269        self.args = args
270
271    def name(self):
272        return "RUN_ON_PRIVATE_PROFILE"
273
274    def is_active(self, device_state):
275        if not is_private_profile(device_state, device_state["users"][device_state["current_user"]]):
276            return False
277        if self.args.headless:
278            return False
279        return self._has_private_profile(device_state["users"][device_state["current_user"]])
280
281    def include_annotations(self, args):
282        return ["com.android.bedstead.multiuser.annotations.RequireRunOnPrivateProfile"]
283
284    def initialise(self, device_state):
285        self.user_id = device_state["users"][device_state["current_user"]]["private_profile_id"]
286
287    def enter(self, device_state):
288        debug(self.args, "[Test] Entering state " + self.name())
289        user = self._get_or_create_private_profile(device_state)
290        debug(self.args, "[Test] private profile is " + str(user))
291        self.user_id = user["id"]
292        execute_shell_command("Test", self.args, ["adb", "shell", "am", "switch-user", str(user["parent"])])
293        execute_shell_command("Test", self.args, ["adb", "shell", "am", "start-user ", str(self.user_id)])
294        for module in self.args.modules:
295            execute_shell_command("Test", self.args, ["adb", "shell", "pm", "install-existing", "--user", str(self.user_id), supported_modules[module][PACKAGE_NAME]])
296
297            for additional_app in supported_modules[module].get(ADDITIONAL_APPS, []):
298                command = ["adb", "install-existing", "--user", str(self.user_id), additional_app[PACKAGE_NAME]]
299                execute_shell_command("Test", self.args, command, shell=True, executable="/bin/bash")
300
301    def _get_or_create_private_profile(self, device_state):
302        users = get_users(self.args)
303        for user in users.values():
304            if is_private_profile(device_state, user):
305                return user
306
307        parent_id = 0 # Private profiles are only supported on system user only as of now.
308
309        private_profile_id = create_private_profile(device_state, self.args, parent_id)
310        return {"id": private_profile_id, "type": "profile.PRIVATE", "flags": None, "parent": str(parent_id)}
311
312    def get_user(self):
313        return self.user_id
314
315    def _has_private_profile(self, user):
316        return "private_profile_id" in user
317
318    def all_supported_annotations(self, args):
319        return self.include_annotations(args)
320
321RUN_ON_CURRENT_USER = CurrentUserState
322RUN_ON_SYSTEM_USER = SystemUserState
323RUN_ON_SECONDARY_USER = SecondaryUserState
324RUN_ON_WORK_PROFILE = WorkProfileState
325RUN_ON_ADDITIONAL_USER = AdditionalUserState
326RUN_ON_CLONE_PROFILE = CloneProfileState
327RUN_ON_PRIVATE_PROFILE = PrivateProfileState
328
329STATE_CODES = {
330    "c": RUN_ON_CURRENT_USER,
331    "s": RUN_ON_SYSTEM_USER,
332    "y": RUN_ON_SECONDARY_USER,
333    "w": RUN_ON_WORK_PROFILE,
334    "a": RUN_ON_ADDITIONAL_USER,
335    "l": RUN_ON_CLONE_PROFILE,
336    "p": RUN_ON_PRIVATE_PROFILE,
337    "i": "i" # SPECIAL CASE DEALT WITH AT THE START OF PARSING
338}
339
340DEFAULT_STATES = "csywalp"
341
342SHORT_PACKAGE_PREFIXES = {
343    "a.d.i": "android.devicepolicy.internal",
344    "a.d.c.t": "android.devicepolicy.cts.telephony",
345    "a.d.c": "android.devicepolicy.cts",
346    "a.d.g": "android.devicepolicy.gts",
347    "a.m.c": "android.multiuser.cts",
348    "nene": "com.android.bedstead.nene"
349}
350
351# We hardcode supported modules so we can optimise for
352# development of those modules. It is not our intention to support all tests.
353supported_modules = {
354    # XTS
355    "CtsDevicePolicyTestCases": {
356        "package": "android.devicepolicy.cts",
357        "runner": "androidx.test.runner.AndroidJUnitRunner",
358        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
359    },
360    "CtsDevicePolicySimTestCases": {
361        "package": "android.devicepolicy.cts.telephony",
362        "runner": "androidx.test.runner.AndroidJUnitRunner",
363        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
364    },
365    "GtsDevicePolicyTestCases": {
366        "package": "android.devicepolicy.gts",
367        "runner": "androidx.test.runner.AndroidJUnitRunner",
368        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
369    },
370    "GtsInteractiveAudioTestCases": {
371        "package": "com.google.android.audio.gts",
372        "runner": "androidx.test.runner.AndroidJUnitRunner",
373        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
374    },
375    "CtsMultiUserTestCases": {
376        "package": "android.multiuser.cts",
377        "runner": "androidx.test.runner.AndroidJUnitRunner",
378        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
379    },
380    "CtsAccountManagerMultiuserTestCases": {
381        "package": "android.accounts.cts.multiuser",
382        "runner": "androidx.test.runner.AndroidJUnitRunner",
383        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER]
384    },
385
386    # Internal
387    "InternalDevicePolicyTestCases": {
388        "package": "android.devicepolicy.internal",
389        "runner": "androidx.test.runner.AndroidJUnitRunner",
390        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
391    },
392
393    # Bedstead
394    "ActivityContextTest": {
395        "package": "com.android.activitycontext.test",
396        "runner": "androidx.test.runner.AndroidJUnitRunner",
397        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
398    },
399
400    "DeviceAdminAppTest": {
401        "package": "com.android.bedstead.deviceadminapp.test",
402        "runner": "androidx.test.runner.AndroidJUnitRunner",
403        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
404    },
405    "EventLibTest": {
406        "package": "com.android.eventlib.test",
407        "path": "com.android.eventlib",
408        "runner": "androidx.test.runner.AndroidJUnitRunner",
409        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE],
410        "additional_apps": [{"target": "EventLibTestApp", "package": "com.android.eventlib.tests.testapp"}]
411    },
412    "HarrierTest": {
413        "package": "com.android.bedstead.harrier.test",
414        "path": "com.android.bedstead.harrier",
415        "runner": "androidx.test.runner.AndroidJUnitRunner",
416        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
417    },
418    "NeneTest": {
419        "package": "com.android.bedstead.nene.test",
420        "path": "com.android.bedstead.nene",
421        "runner": "androidx.test.runner.AndroidJUnitRunner",
422        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE],
423        "additional_targets": ["NeneTestApp1"],
424        "files": [{"from": "NeneTestApp1.apk", "to": "/data/local/tmp/NeneTestApp1.apk"}]
425    },
426    "BedsteadQueryableTest": {
427        "package": "com.android.queryable.test",
428        "path": "com.android.queryable",
429        "runner": "androidx.test.runner.AndroidJUnitRunner",
430        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
431    },
432    "RemoteDPCTest": {
433        "package": "com.android.bedstead.remotedpc.test",
434        "path": "com.android.bedstead.remotedpc",
435        "runner": "androidx.test.runner.AndroidJUnitRunner",
436        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
437    },
438    "RemoteAccountAuthenticatorTest": {
439        "package": "com.android.bedstead.remoteaccountauthenticator.test",
440        "path": "com.android.bedstead.remoteaccountauthenticator",
441        "runner": "androidx.test.runner.AndroidJUnitRunner",
442        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER]
443    },
444    "TestAppTest": {
445        "package": "com.android.bedstead.testapp.test",
446        "path": "com.android.bedstead.testapp",
447        "runner": "androidx.test.runner.AndroidJUnitRunner",
448        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE]
449    },
450    "CtsDevicePolicySimTestCases": {
451        "package": "android.devicepolicy.cts.telephony",
452        "runner": "androidx.test.runner.AndroidJUnitRunner",
453        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_ADDITIONAL_USER]
454    },
455
456    # Modules
457    "bedstead-usb-test": {
458        "package": "com.android.bedstead.usb.test",
459        "path": "com.android.bedstead.usb",
460        "runner": "androidx.test.runner.AndroidJUnitRunner",
461        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE],
462    },
463    "bedstead-adb-test": {
464        "package": "com.android.bedstead.adb.test",
465        "path": "com.android.bedstead.adb",
466        "runner": "androidx.test.runner.AndroidJUnitRunner",
467        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE],
468    },
469    "bedstead-root-test": {
470        "package": "com.android.xts.root.test",
471        "path": "com.android.xts.root",
472        "runner": "androidx.test.runner.AndroidJUnitRunner",
473        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE],
474    },
475    "bedstead-permissions-test": {
476        "package": "com.android.bedstead.permissions.test",
477        "path": "com.android.bedstead.permissions",
478        "runner": "androidx.test.runner.AndroidJUnitRunner",
479        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE],
480    },
481    "bedstead-flags-test": {
482        "package": "com.android.bedstead.flags.test",
483        "path": "com.android.bedstead.flags",
484        "runner": "androidx.test.runner.AndroidJUnitRunner",
485        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE],
486    },
487    "bedstead-multiuser-test": {
488        "package": "com.android.bedstead.multiuser.test",
489        "path": "com.android.bedstead.multiuser",
490        "runner": "androidx.test.runner.AndroidJUnitRunner",
491        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE],
492    },
493    "bedstead-enterprise-test": {
494        "package": "com.android.bedstead.enterprise.test",
495        "path": "com.android.bedstead.enterprise",
496        "runner": "androidx.test.runner.AndroidJUnitRunner",
497        "states": [RUN_ON_SYSTEM_USER, RUN_ON_WORK_PROFILE, RUN_ON_SECONDARY_USER, RUN_ON_ADDITIONAL_USER, RUN_ON_CLONE_PROFILE, RUN_ON_PRIVATE_PROFILE],
498    },
499}
500
501TARGET_NAME = "target"
502PACKAGE_NAME = "package"
503PATH = "path"
504RUNNER = "runner"
505STATES = "states"
506ADDITIONAL_APPS = "additional_apps"
507ADDITIONAL_TARGETS = "additional_targets"
508FILES = "files"
509
510# Theme configuration
511RESET_CODE = "\33[0m"
512CLASS_NAME_COLOUR_CODE = "\33[35m"
513TEST_NAME_COLOUR_CODE = "\33[33m"
514PASSED_CODE = "\33[32m"
515FAILED_CODE = "\33[31m"
516IGNORED_CODE = "\33[33m"
517
518AVAILABLE_PARAMETER_COLOUR_CODES = [
519    '\33[40m',
520    '\33[41m',
521    '\33[42m',
522    '\33[43m',
523    '\33[44m',
524    '\33[45m',
525    '\33[46m',
526    '\33[101m',
527    '\33[104m',
528    '\33[105m',
529]
530
531def find_module_for_class_method(class_method):
532    """ If only a class#method is provided, see if we can find the module. Will return None if not. """
533    matching_modules = []
534    for module in supported_modules:
535        path = supported_modules[module].get(PATH, supported_modules[module][PACKAGE_NAME])
536
537        if class_method.startswith(path):
538            matching_modules.append(module)
539
540    if len(matching_modules) == 0:
541        return None
542    elif len(matching_modules) == 1:
543        return matching_modules[0]
544    else:
545        print("Found multiple potential modules. Please add module name to command")
546        sys.exit(1)
547
548def get_args():
549    """ Parse command line arguments. """
550    parser = argparse.ArgumentParser(description="Run tests during development")
551    parser.add_argument("targets", nargs="+", type=str, help="The target to run. This is in the form module:class#method. Method and class are optional. Use LAST_PASSES, LAST_FAILURES, LAST_IGNORED, or LAST_ASSUMPTION_FAILED to include the passed/failed/ignored/assumption failed tests from the most recent run.")
552    parser.add_argument("-b", "--build", action='store_true', help="Builds test targets. (default)")
553    parser.add_argument("-i", "--install", action='store_true', help="Installs test targets. (default)")
554    parser.add_argument("-t", "--test", action='store_true', help="Runs test targets. (default)")
555    parser.add_argument("-d", "--debug", action="store_true", help="Include debug output.")
556    parser.add_argument("-c", "--coexistence", action="store", default="?", help="Force device policy coexistence on or off (ignoring bedstead test preferences)")
557    parser.add_argument("-s", "--states", help="Specify states which should be included. Options are (c)urrent (s)ystem secondar(y), (w)ork profile, c(l)one (i)nitial, (a)dditional, (p)rivate profile. Defaults to all states.")
558    parser.add_argument("-n", "--interactive", action='store', nargs='?', default="disabled", help="Run interactive tests. This will exclude non-interactive tests and will enable manual testing. Pass 'bi' to build and install automations, 't' to use automations (default) and m to enable manual interaction (default)")
559    parser.add_argument("--stop-after-first-failure", action="store_true", help="If true, will stop execution after encountering a single failure")
560    parser.add_argument("--rerun-after-all-pass", action="store_true", help="If true, will re-start the run if all pass")
561    parser.add_argument("-r", "--root", action="store_true", help="Run rooted tests. This will ensure that adb is running as root and will include tests annotated as @RequireAdbRoot - which are otherwise excluded.")
562    # parser.add_argument("--capture-bugreport-after-crash", action="store_true", help="If true, will capture a bug report on failure due to a process or system server crash")
563    parser.add_argument("--timer", action="store", choices=['junit', 'btest'], default='junit', help="Either 'junit' (default) which is the internal junit timer and reflects the numbers shown on dashboards, etc. or 'btest' which tracks the actual time from the start of the test to the result being posted (including bedstead setup and teardown)")
564    parser.add_argument("-l", "--detect-leaks", action="store_true", help="throw an exception when the device_policy state has leaked after a test method execution")
565    parser.add_argument("--skip-teardown", action="store_true", help="skip teardown after executing test class and test methods")
566    args = parser.parse_args()
567    if not args.build and not args.install and not args.test:
568        args.build = True
569        args.install = True
570        args.test = True
571
572    if not args.states:
573        args.states = DEFAULT_STATES
574    args.states = set(args.states)
575    valid_states = ["c", "s", "y", "w", "l", "a", "i", "p"]
576    for state in args.states:
577        if not state in valid_states:
578            print("State " + state + " is invalid, must be one of " + str(valid_states))
579            sys.exit(1)
580    args.states = [STATE_CODES[a] for a in args.states]
581
582    args.build_interactive = False
583    args.install_interactive = False
584    args.manual_interactive = False
585    args.automate_interactive = False
586    if not args.interactive:
587        args.build_interactive = True
588        args.install_interactive = True
589        args.manual_interactive = True
590        args.automate_interactive = True
591        args.interactive = "enabled"
592
593    if args.interactive == "disabled":
594        args.interactive = False
595    elif args.interactive != "enabled":
596        valid_instructions = ["b", "i", "t", "m"]
597        for instruction in args.interactive:
598            if instruction == "b":
599                args.build_interactive = True
600            elif instruction == "i":
601                args.install_interactive = True
602            elif instruction == "t":
603                args.automate_interactive = True
604            elif instruction == "m":
605                args.manual_interactive = True
606            else:
607                print("Instruction " + instruction + " is invalid, must be one of " + str(valid_instructions))
608                sys.exit(1)
609
610    load_module_and_class_methods(args)
611
612    return args
613
614def expand_short_target(target):
615    if target == "LAST_FAILURES":
616        out = os.environ["OUT"]
617        with open(out + "/btest_failures.txt", "r") as f:
618            return [t.strip() for t in f.readlines()]
619    if target == "LAST_PASSES":
620        out = os.environ["OUT"]
621        with open(out + "/btest_passes.txt", "r") as f:
622            return [t.strip() for t in f.readlines()]
623    if target == "LAST_IGNORED":
624        out = os.environ["OUT"]
625        with open(out + "/btest_ignored.txt", "r") as f:
626            return [t.strip() for t in f.readlines()]
627    if target == "LAST_ASSUMPTION_FAILED":
628        out = os.environ["OUT"]
629        with open(out + "/btest_assumption_failed.txt", "r") as f:
630            return [t.strip() for t in f.readlines()]
631
632    for short in SHORT_PACKAGE_PREFIXES.keys():
633        if target.startswith(short):
634            target = SHORT_PACKAGE_PREFIXES[short] + target[len(short):]
635            break
636    return [target]
637
638def flatten(list_of_lists):
639    return list(itertools.chain.from_iterable(list_of_lists))
640
641def load_module_and_class_methods(args):
642    """ Parse targets from args and load module and class_method. """
643    args.targets = flatten([expand_short_target(target) for target in args.targets])
644
645    if len(args.targets) == 0:
646        print("No targets to run")
647        sys.exit(0)
648
649    new_targets = []
650
651    for target in args.targets:
652        target_parts = target.split(":", 1)
653        module = target_parts[0]
654        class_method = target_parts[1] if len(target_parts) > 1 else None
655
656        if not module in supported_modules:
657            # Let's guess that maybe they omitted the module
658            class_method = module
659            module = find_module_for_class_method(class_method)
660            if not module:
661                print("Could not find module or module not supported " + class_method + ". btest only supports a small number of test modules.")
662                sys.exit(1)
663        new_targets.append((module, class_method))
664    args.targets = new_targets
665    args.modules = set([t[0] for t in args.targets])
666
667def build_modules(args):
668    build_top = os.environ["ANDROID_BUILD_TOP"]
669
670    # Unfortunately I haven't figured out a way to just import the envsetup so we need to run it each time
671    must_build = False
672    command = ". " + build_top + "/build/envsetup.sh"
673
674    if args.build:
675        must_build = True
676        targets = args.modules.copy()
677        for t in args.targets:
678            targets.update(supported_modules[t[0]].get(ADDITIONAL_TARGETS, []))
679            targets.update([app[TARGET_NAME] for app in supported_modules[t[0]].get(ADDITIONAL_APPS, [])])
680
681        command += " &&" + " && ".join(["m " + t for t in targets])
682
683    if args.interactive and args.build_interactive:
684        must_build = True
685        command += " && m InteractiveAutomation"
686
687    if must_build:
688        debug(args, "[Build] Executing '" + command + "'")
689
690        # TODO: We should also stream the output
691        output, err = execute_shell_command("BUILD", args, [command], shell=True, executable="/bin/bash")
692        print(output)
693        if "failed to build some targets" in output:
694            sys.exit(1)
695
696
697def install(args):
698    out = os.environ["OUT"]
699
700    if args.install:
701        for module in args.modules:
702            command = ["adb install --user all -t -g " + out + "/testcases/" + module + "/*/" + module + ".apk"]
703            execute_shell_command("Install", args, command, shell=True, executable="/bin/bash")
704
705    if args.interactive and args.install_interactive:
706        command = ["adb push " + out + "/system/app/InteractiveAutomation/InteractiveAutomation.apk /sdcard"]
707        execute_shell_command("Install", args, command, shell=True, executable="/bin/bash")
708
709    for module in args.modules:
710        for additional_app in supported_modules[module].get(ADDITIONAL_APPS, []):
711            command = ["adb install --user all -t -g " + out + "/testcases/" + additional_app[TARGET_NAME] + "/*/" + additional_app[TARGET_NAME] + ".apk"]
712            execute_shell_command("Install", args, command, shell=True, executable="/bin/bash")
713
714    for module in args.modules:
715        for file in supported_modules[module].get(FILES, []):
716            command = ["adb push " + out + "/testcases/*/" + file["from"]  + " " + file["to"]]
717            execute_shell_command("Install", args, command, shell=True, executable="/bin/bash")
718
719class Test:
720
721    def __init__(self, args, module, class_methods, state, btest_run, total_test_count, next_test, include_annotations, exclude_annotations, has_later_states):
722        self.args = args
723        self.state = state
724        self.module_package = supported_modules[module][PACKAGE_NAME]
725        self.runner = supported_modules[module][RUNNER]
726        self.class_methods = class_methods
727        self.parameter_colour_codes = {}
728        self.available_parameter_colour_codes_pointer = 0
729        self.total_test_count = total_test_count
730        self.next_test = next_test
731        self.btest_run = btest_run
732        self.test_results = {}
733        self.include_annotations = include_annotations.copy()
734        self.exclude_annotations = exclude_annotations.copy()
735        self.has_no_tests = False # Marked at the end of the test if there were no new tests
736        self.has_loaded_total_test_count = False # Used to ensure we don't double count test counts
737        self.has_later_states = has_later_states # True if we don't know the full number of tests because we'll be running more states later
738
739        if self.args.interactive:
740            self.include_annotations.append("com.android.interactive.annotations.Interactive")
741        else:
742            self.exclude_annotations.append("com.android.interactive.annotations.Interactive")
743
744        if self.args.root:
745            self.include_annotations.append("com.android.xts.root.annotations.RequireAdbRoot")
746        else:
747            self.exclude_annotations.append("com.android.xts.root.annotations.RequireAdbRoot")
748
749    def run(self):
750        execute_shell_command("Test", self.args, ["adb", "shell", "am", "start-user ", str(self.state.get_user())])
751
752        # This prepares the devices to begin running a test without a persistent adb connection.
753        # it will write the ongoing result log to /sdcard/ex.log and then we will repeatedly
754        # check that - allowing for usb disconnection without losing the results
755        execute_shell_command("Test", self.args, ["adb", "shell", "rm -f /sdcard/ex.log && rm -f /sdcard/ex_now.log && rm -f /sdcard/ex_read.log && rm -f /sdcard/test_pid.txt && touch /sdcard/ex_now.log"])
756
757        command = "adb shell nohup am instrument --user " + str(self.state.get_user())
758
759        # Use the formatted output
760        command += " -e listener com.android.bedstead.harrier.BedsteadRunListener"
761
762        if self.include_annotations:
763            command += " -e annotation " + ",".join(self.include_annotations)
764
765        if self.exclude_annotations:
766            command += " -e notAnnotation " + ",".join(self.exclude_annotations)
767
768        # btest supports untethering during the run
769        command += " -e CAN_UNTETHER true"
770
771        if self.args.interactive:
772            if self.args.manual_interactive:
773                command += " -e ENABLE_MANUAL true"
774            else:
775                command += " -e ENABLE_MANUAL false"
776            if self.args.automate_interactive:
777                command += " -e ENABLE_AUTOMATION true"
778            else:
779                command += " -e ENABLE_AUTOMATION false"
780
781        if self.args.coexistence == "yes":
782            command += " -e COEXISTENCE true"
783        elif self.args.coexistence == "no":
784            command += " -e COEXISTENCE false"
785
786        if self.args.detect_leaks:
787            if self.args.skip_teardown:
788                print("Error. You can't use --skip-teardown together with --detect-leaks")
789                sys.exit(1)
790            command += " -e throw-on-device-policy-leaks true"
791        elif self.args.skip_teardown:
792            command += " -e skip-class-teardown true"
793            command += " -e skip-test-teardown true"
794
795        if len(self.class_methods) > 0:
796            if any("*" in s for s in self.class_methods):
797                if len(self.class_methods) > 1:
798                    print("Error. If you use a wildcard target, you can only specify one target")
799                    sys.exit(1)
800                for class_method in self.class_methods:
801                    # class_method = self.class_methods[0]
802                    # We need to escape 3 times to get through the various interpreters
803                    # Using regex adds 5 seconds or so to running a single test so has to be opt-in
804                    command += " -e tests_regex " + ".*".join([re.escape(re.escape(re.escape(s))) for s in class_method.split("*")])
805            else:
806                command += " -e class " + ",".join(self.class_methods)
807
808        command += " -w " + self.module_package + "/" + self.runner
809
810        # This means we will have /sdcard/ex.log with the test results and /sdcard/test_pid.txt
811        # with the pid of the process. We can check the pid to see if the tests are still running
812        command += " > /sdcard/ex.log 2>&1 & echo $! > /sdcard/test_pid.txt"
813
814        if self.args.debug:
815            print("[Test] Executing '" + command + "'")
816
817        def enqueue_output(queue):
818            output, stderr = execute_shell_command("Test", self.args, ["adb", "shell", "cp /sdcard/ex_now.log /sdcard/ex_read.log && cp /sdcard/ex.log /sdcard/ex_now.log && comm -23 /sdcard/ex_now.log /sdcard/ex_read.log"])
819            for line in output.split("\n"):
820                debug(self.args, "[DEBUG] " + line)
821
822                if "Time: " in line:
823                    debug(self.args, "[DEBUG] Output finished")
824                    self.tests_finished = True
825                queue.put(line)
826
827        execute_shell_command("Test", self.args, command.split(" "))
828        self.tests_finished = False
829        self.test_process_queue = queue.Queue()
830        self.test_process_thread = Thread(target=enqueue_output, args=(self.test_process_queue,))
831        self.test_process_thread.daemon = True
832        self.test_process_thread.start()
833
834        start_timer = time.monotonic_ns()
835
836        num_tests = self.get_num_tests()
837        if num_tests > -1:
838            self.total_test_count += num_tests
839
840            modified_total_test_count = str(self.total_test_count)
841            if self.has_later_states:
842                modified_total_test_count += "+"
843            total_test_length = len(modified_total_test_count)
844
845            for i in range(num_tests):
846                result = self.get_result(i)
847
848                if not result:
849                    # Process has finished
850                    break
851
852                test_name_parts = re.split('[#\[\]]', result["testName"])
853                print("[" + str(self.next_test).rjust(total_test_length) + "/" + modified_total_test_count + "] " + CLASS_NAME_COLOUR_CODE + test_name_parts[0] + RESET_CODE + "#" + TEST_NAME_COLOUR_CODE + test_name_parts[1] + RESET_CODE, end='')
854                self.next_test += 1
855
856                if len(test_name_parts) > 2:
857                    # 1 or more parameterizations - [2] will be a name, then every other one is empty
858                    parameterizations = (test_name_parts[2::2])
859                    for p in parameterizations:
860                        print("[" + self.get_parameter_colour_code(p) + p + RESET_CODE + "]", end='')
861                sys.stdout.flush()
862
863                while not result["isFinished"]:
864                    result = self.get_result(i)
865                    if not self.tests_are_running():
866                        break
867
868                if result:
869                    if self.args.timer == "btest":
870                        # Replace junit runtime with actual elapsed time
871                        result["btestRunTime"] = time.monotonic_ns() - start_timer
872                        start_timer = time.monotonic_ns()
873
874                    self.print_result(result)
875
876                    if self.args.stop_after_first_failure and str(result["result"]) == "1":
877                        # Failure
878                        self.kill()
879                        raise KeyboardInterrupt
880                        return
881
882            debug(self.args, "Waiting for tests to stop running...")
883            wait_to_end_timer = time.monotonic_ns()
884            while self.tests_are_running():
885                if time.monotonic_ns() - wait_to_end_timer > 10000000000:
886                    print("(Waited 10 seconds for test process to end. Killing)")
887                    self.kill()
888                    break
889            debug(self.args, "Done")
890
891            if self.next_test <= num_tests:
892                # Tests are missing - probably something went wrong...
893                print(">> ERROR: Expected " + str(num_tests) + " results but got " + str(self.next_test))
894                self.dump_output()
895
896        while not self.test_process_queue.empty():
897            output = self.test_process_queue.get()
898            if "(0 tests)" in output:
899                debug(self.args, "[" + self.state.name() + "] No tests to run")
900            if "Process crashed before executing the test" in output:
901                print(output)
902                sys.exit(1)
903
904    def dump_output(self):
905        while not self.test_process_queue.empty():
906            output = self.test_process_queue.get()
907            print(output)
908
909    def tests_are_running(self):
910        output, err = execute_shell_command("Test", self.args, ["adb", "shell", "ps | grep $(cat /sdcard/test_pid.txt)"])
911        return output.strip()
912
913    def kill(self):
914        execute_shell_command("Test", self.args, ["adb", "shell", "kill $(cat /sdcard/test_pid.txt)"])
915
916    def get_num_tests(self):
917        numTests = -1
918        while numTests == -1:
919            if not self.tests_are_running():
920                return -1
921
922            output, err = execute_shell_command("TEST", self.args, ["adb", "shell", "content query --user " + str(self.state.get_user()) + " --uri content://" + self.module_package + ".BedsteadRunResultsProvider/numTests"])
923            if not output:
924                continue # Not running yet?
925            if "No result found" in output:
926                continue
927            numTests = int(output.split("tests=", 2)[1].strip())
928        return numTests
929
930    def get_result(self, i):
931        result = None
932        while not result:
933            output, err = execute_shell_command("TEST", self.args, ["adb", "shell", "content query --user " + str(self.state.get_user()) + " --uri content://" + self.module_package + ".BedsteadRunResultsProvider/" + str(i)])
934            if not output:
935                continue # Not running yet?
936            if "No result found" in output:
937                if self.get_num_tests() == -1:
938                    # The process has ended
939                    return None
940
941                continue
942
943            result = {}
944            result["index"] = int(output.split("index=", 2)[1].split(",", 2)[0])
945            result["testName"] = output.split("testName=", 2)[1].split(", result=", 2)[0]
946            result["isFinished"] = output.split("isFinished=", 2)[1].strip() == "true"
947            if result["isFinished"]:
948                result["result"] = int(output.split("result=", 2)[1].split(",", 2)[0])
949                result["message"] = output.split("message=", 2)[1].split(", stackTrace=", 2)[0]
950                result["stackTrace"] = output.split("stackTrace=", 2)[1].split(", runTime=", 2)[0]
951                result["runTime"] = int(output.split("runTime=", 2)[1].split(",", 2)[0])
952
953        return result
954
955    def get_parameter_colour_code(self, parameter):
956        if not parameter in self.parameter_colour_codes:
957            self.parameter_colour_codes[parameter] = AVAILABLE_PARAMETER_COLOUR_CODES[self.available_parameter_colour_codes_pointer]
958            self.available_parameter_colour_codes_pointer = (self.available_parameter_colour_codes_pointer + 1) % len(AVAILABLE_PARAMETER_COLOUR_CODES)
959        return self.parameter_colour_codes[parameter]
960
961    def print_result(self, test_result):
962        try:
963            time_str = format_nanos(test_result["runTime"])
964            if "btestRunTime" in test_result:
965                time_str = time_str + "/" + format_nanos(test_result["btestRunTime"])
966            if test_result["result"] == 0:
967                self.btest_run.passed_tests.append(test_result)
968                print(" ✅  " + PASSED_CODE + "PASSED" + RESET_CODE + " (" + time_str + ")", flush=True)
969            elif test_result["result"] == 1:
970                self.btest_run.failed_tests.append(test_result)
971                print(" ❌  " + FAILED_CODE + "FAILED (" + test_result["message"] + ")" + RESET_CODE + " (" + time_str + ")\n\n" + test_result["stackTrace"] + "\n", flush=True)
972            elif test_result["result"]  == 2:
973                self.btest_run.ignored_tests.append(test_result)
974                print(" " + IGNORED_CODE + "// IGNORED" + RESET_CODE + " (" + time_str + ")", flush=True)
975            elif test_result["result"] == 3:
976                self.btest_run.assumption_failed_tests.append(test_result)
977                print(" " + IGNORED_CODE + "// ASSUMPTION FAILED (" + test_result["message"] + ")" + RESET_CODE + " (" + time_str + ")", flush=True)
978            return
979        except Exception as e:
980            if not is_connected():
981                print("\nDevice not connected to ADB. Waiting for reconnection...")
982                wait_for_adb_reconnection()
983                return
984            print("Exception ", e)
985        print("ERROR PARSING TEST RESULT " + str(test_result), flush=True)
986        self.dump_output()
987        sys.exit(1)
988
989def format_nanos(nanos):
990    ms = int(nanos) / 1000000
991    seconds = ms / 1000
992    if ms < 800:
993        timestr = "{:.2f}ms".format(ms)
994    else:
995        if seconds < 60:
996            timestr = "{:.2f}s".format(seconds)
997        else:
998            minutes = seconds / 60
999            timestr = "{:.2f}m".format(minutes)
1000    if seconds > 30:
1001        timestr = FAILED_CODE + timestr + RESET_CODE
1002    return timestr
1003
1004class BtestRun:
1005    def __init__(self):
1006        self.passed_tests = []
1007        self.failed_tests = []
1008        self.ignored_tests = []
1009        self.assumption_failed_tests = []
1010
1011def execute_shell_command(stage, args, command, **extra_args):
1012    debug(args, "[" + stage + "] Executing '" + " ".join(command) + "'")
1013    r = subprocess.run(command, capture_output=True, text=True, **extra_args)
1014    output = r.stdout
1015    debug(args, "[" + stage + "] Output: '" + output + "' Err: '" + r.stderr + "'")
1016
1017    if r.stderr:
1018        if "no devices/emulators found" in r.stderr or "device offline" in r.stderr:
1019            print("\nDevice not connected to ADB. Waiting for reconnection...")
1020            wait_for_adb_reconnection()
1021            return execute_shell_command(stage, args, command, **extra_args)
1022
1023    return output, r.stderr
1024
1025def wait_for_adb_reconnection():
1026    while True:
1027        if is_connected():
1028            print("\nFound connection")
1029            return
1030        time.sleep(0.5)
1031
1032def is_connected():
1033    r = subprocess.run(["adb", "shell", "echo", "hello"], capture_output=True, text=True)
1034    output = r.stdout
1035
1036    return not r.stderr or (not "no devices/emulators found" in r.stderr and not "device offline" in r.stderr)
1037
1038def get_or_create_additional_user(device_state, args):
1039    users = get_users(args)
1040    secondary_users = sorted([u for u in users.keys() if is_secondary_user(device_state, users[u])])
1041    if (len(secondary_users) == 0):
1042        create_user(device_state, args)
1043    if len(secondary_users) < 2:
1044        return create_user(device_state, args)
1045
1046    return secondary_users[1]
1047
1048def get_or_create_secondary_user(device_state, args):
1049    users = get_users(args)
1050    secondary_users = sorted([u for u in users.keys() if is_secondary_user(device_state, users[u])])
1051    if len(secondary_users) > 0:
1052        return secondary_users[0]
1053    return create_user(device_state, args)
1054
1055def is_additional_user(device_state, user):
1056    if not is_secondary_user(device_state, user):
1057        return False
1058
1059    secondary_users = sorted([u for u in device_state["users"].keys() if is_secondary_user(device_state, device_state["users"][u])])
1060    return user["id"] != secondary_users[0]
1061
1062def is_for_testing(device_state, user):
1063    """ FOR_TESTING introduced in SDK 34 - before that we can assume all users are suitable """
1064    return device_state["sdk_version"] < 34 or "FOR_TESTING" in user["flags"]
1065
1066def is_secondary_user(device_state, user):
1067    if ("HEADLESS" in device_state["users"][0]["type"]):
1068        # The initial user is still useful for secondary user tests even if non-for-testing on
1069        # headless as it's default
1070        return user["type"] == "full.SECONDARY"
1071    return is_for_testing(device_state, user) and user["type"] == "full.SECONDARY"
1072
1073def is_clone_profile(device_state, user):
1074    return is_for_testing(device_state, user) and user["type"] == "profile.CLONE"
1075
1076def is_private_profile(device_state, user):
1077    return is_for_testing(device_state, user) and user["type"] == "profile.PRIVATE"
1078
1079def remove_user(args, id):
1080    execute_shell_command("Test", args, ["adb", "shell", "pm", "remove-user", str(id)])
1081
1082def create_user(device_state, args):
1083    ensure_no_dpcs(device_state, args) # avoid no_add_user TODO: Be more specific
1084    commands = ["adb", "shell", "pm", "create-user"]
1085
1086    if device_state["sdk_version"] >= 34:
1087        commands.append("--for-testing")
1088
1089    commands.append("user")
1090
1091    output, err = execute_shell_command("Test", args, commands)
1092    try:
1093        id = int(output.rsplit(" ", 1)[1].strip())
1094    except IndexError:
1095        print("Error parsing user id. Output: " + output + ", err: " + err)
1096        sys.exit(1)
1097    execute_shell_command("Test", args, ["adb", "shell", "am start-user " + str(id)])
1098    return id
1099
1100def create_work_profile(device_state, args, parent_id):
1101    ensure_no_dpcs(device_state, args) # avoid no_add_user TODO: Be more specific
1102
1103    commands = ["adb", "shell", "pm", "create-user", "--managed", "--profileOf", str(parent_id)]
1104
1105    if device_state["sdk_version"] >= 34:
1106        commands.append("--for-testing")
1107
1108    commands.append("user")
1109
1110    output, err = execute_shell_command("Test", args, commands)
1111    try:
1112        id = int(output.rsplit(" ", 1)[1].strip())
1113    except IndexError:
1114        print("Error parsing profile id. Output: " + output + ", err: " + err)
1115        sys.exit(1)
1116    return id
1117
1118def create_clone_profile(device_state, args, parent_id):
1119    ensure_no_dpcs(device_state, args) # avoid no_add_clone_profile TODO: Be more specific
1120    commands = ["adb", "shell", "pm", "create-user", "--profileOf", str(parent_id), "--user-type android.os.usertype.profile.CLONE"]
1121    if device_state["sdk_version"] >= 34:
1122        commands.append("--for-testing")
1123    commands.append("user")
1124
1125    output, err = execute_shell_command("Test", args, commands)
1126
1127    try:
1128        id = int(output.rsplit(" ", 1)[1].strip())
1129    except IndexError:
1130        print("Error parsing profile id. Output: " + output + ", err: " + err)
1131        sys.exit(1)
1132    return id
1133
1134def create_private_profile(device_state, args, parent_id):
1135    ensure_no_dpcs(device_state, args) # avoid no_add_private_profile TODO: Be more specific
1136    commands = ["adb", "shell", "pm", "create-user", "--profileOf", str(parent_id), "--user-type android.os.usertype.profile.PRIVATE"]
1137    if device_state["sdk_version"] >= 34:
1138        commands.append("--for-testing")
1139    commands.append("user")
1140
1141    output, err = execute_shell_command("Test", args, commands)
1142
1143    try:
1144        id = int(output.rsplit(" ", 1)[1].strip())
1145    except IndexError:
1146        print("Error parsing profile id. Output: " + output + ", err: " + err)
1147        sys.exit(1)
1148    return id
1149
1150def gather_device_state(args):
1151    current_user = get_current_user(args)
1152    users = get_users(args)
1153    return {"current_user": current_user, "users": users, "sdk_version": get_sdk_version(args), "features": get_features(args)}
1154
1155def get_features(args):
1156    return [n[8:].strip() for n in execute_shell_command("", args, ["adb", "shell", "pm", "list", "features"])[0].split("\n")]
1157
1158def get_sdk_version(args):
1159    return int(execute_shell_command("", args, ["adb", "shell", "getprop", "ro.build.version.sdk"])[0].strip())
1160
1161def get_users(args):
1162    users_output, err = execute_shell_command("Test", args, ["adb", "shell", "cmd user list -v"])
1163    users = {}
1164    for user_row in users_output.split("\n")[1:]:
1165        if not user_row:
1166            continue
1167
1168        id = int(user_row.split("id=", 2)[1].split(",", 2)[0])
1169        type = user_row.split("type=", 2)[1].split(",", 2)[0]
1170        flags = user_row.split("flags=", 2)[1].split(" ", 2)[0].split("|")
1171        parent = None
1172        if "PROFILE" in flags:
1173            parent = int(user_row.split("parentId=", 2)[1].split(")", 2)[0])
1174        user = {"id": id, "flags": flags, "type": type, "parent": parent}
1175        users[user["id"]] = user
1176
1177    for user in users.values():
1178        if user["type"] == "profile.MANAGED":
1179            users[user["parent"]]["work_profile_id"] = user["id"]
1180
1181    return users
1182
1183def get_current_user(args):
1184    output = execute_shell_command("Test", args, ["adb", "shell", "am", "get-current-user"])
1185    try:
1186        return int(output[0].strip())
1187    except (IndexError, ValueError):
1188        print("Error parsing current user. Output: " + output[0] + " Err: " + output[1])
1189        sys.exit(1)
1190
1191def ensure_no_dpcs(device_state, args):
1192    device_policy_output = execute_shell_command("", args, ["adb", "shell", "dumpsys", "device_policy"])[0]
1193
1194    for user in device_policy_output.split("Enabled Device Admins (")[1:]:
1195        user_line, device_admin_line, other = user.split("\n", 2)
1196        if len(device_admin_line.strip()) > 0:
1197            user = user_line.split(" ", 1)[1].split(",", 1)[0]
1198            admin_name = device_admin_line.strip().split(":", 1)[0]
1199            print(execute_shell_command("", args, ["adb", "shell", "dpm", "remove-active-admin", "--user", user, admin_name]))
1200
1201def increase_maximum_user_limit(args):
1202    default_maximum_users = 4
1203    if len(args.states) > default_maximum_users:
1204        execute_shell_command("Test", args, ["adb", "root"])
1205        execute_shell_command("Test", args, ["adb", "wait-for-device"])
1206        execute_shell_command("Test", args, ["adb", "shell", "setprop", "fw.max_users", str(len(args.states))])
1207
1208
1209def run_tests(args):
1210    test = None
1211    btest_run = BtestRun()
1212    total_test_count = 0
1213    next_test = 1
1214    has_quit = False
1215
1216    device_state = gather_device_state(args)
1217    args.headless = "HEADLESS" in device_state["users"][0]["type"]
1218
1219    ensure_correct_number_of_non_for_testing_users(args, device_state)
1220
1221    if args.headless:
1222        if RUN_ON_CLONE_PROFILE in args.states:
1223            print("Not running on clone profile on headless")
1224            args.states.remove(RUN_ON_CLONE_PROFILE)
1225        if RUN_ON_PRIVATE_PROFILE in args.states:
1226            print("Not running on private profile on headless")
1227            args.states.remove(RUN_ON_PRIVATE_PROFILE)
1228
1229    # TODO(b/298356062): Increase this only when there is a maximum user limit exception and
1230    # set the max-user limit to the default after the tests.
1231    increase_maximum_user_limit(args)
1232
1233    # Construct modules with args
1234
1235    states = set()
1236    for module in args.modules:
1237        for m in supported_modules[module][STATES]:
1238            states.add(m)
1239    states = [s(args) for s in states]
1240
1241    if not args.headless:
1242        states = [t for t in states if not t.name() == "RUN_ON_ADDITIONAL_USER"]
1243
1244    if not "android.software.device_admin" in device_state["features"]:
1245        filtered_states = []
1246        for t in states:
1247            if t.name() == "RUN_ON_WORK_PROFILE":
1248                print("device_admin not supported, skipping work profile state")
1249                continue
1250            filtered_states.append(t)
1251        states = filtered_states
1252
1253    if "i" in args.states:
1254        args.states.remove("i")
1255        # Initial - replace with system or secondary
1256        if args.headless:
1257            args.states.append(RUN_ON_SECONDARY_USER)
1258        else:
1259            args.states.append(RUN_ON_SYSTEM_USER)
1260
1261    if RUN_ON_CURRENT_USER in args.states and len(args.states) > 1:
1262        for state in states:
1263            if (state.is_active(device_state)):
1264                # Found current
1265                args.states.append(state.__class__)
1266                break
1267
1268    # We calculate annotations before filtering so we properly exclude all
1269    all_include_annotations = []
1270    for state in states:
1271        all_include_annotations.extend(state.include_annotations(args))
1272
1273    states = [m for m in states if m.__class__ in args.states]
1274
1275    first_state = None
1276
1277    for state in states:
1278        if (state.is_active(device_state)):
1279            first_state = state
1280            state.initialise(device_state) # Entering a state we are already in
1281            break
1282
1283    if first_state is None:
1284        # We are not in any state, enter the first one arbitrarily
1285        first_state = states[0]
1286        first_state.enter(device_state)
1287
1288    # Move to start
1289    states.insert(0, states.pop(states.index(first_state)))
1290    needs_to_enter_state = False
1291
1292    instrumented_runs = {}
1293    for module in args.modules:
1294        instrumented_runs[module] = [target[1] for target in args.targets if target[0] == module and target[1] is not None]
1295
1296    if (args.root):
1297        execute_shell_command("Test", args, ["adb", "root"])
1298    else:
1299        execute_shell_command("Test", args, ["adb", "unroot"])
1300
1301    try:
1302        for i, state in enumerate(states):
1303            print(state.name())
1304            debug(args, "[Test] Running tests for " + state.name())
1305            if needs_to_enter_state:
1306                state.enter(device_state)
1307            include_annotations = state.include_annotations(args)
1308            exclude_annotations = [x for x in all_include_annotations if not x in state.all_supported_annotations(args)]
1309
1310            for module in instrumented_runs:
1311                test = Test(args, module, instrumented_runs[module], state, btest_run, total_test_count, next_test, include_annotations, exclude_annotations, (i < len(states) - 1))
1312                test.run()
1313                total_test_count = test.total_test_count
1314                next_test = test.next_test
1315            needs_to_enter_state = True
1316    except KeyboardInterrupt:
1317        # Kill the test process then move on to print the results
1318        if test is not None:
1319            test.kill()
1320        has_quit = True
1321    except Exception as e:
1322        if test is not None:
1323            test.kill()
1324        raise e
1325
1326    return btest_run, has_quit
1327
1328def ensure_correct_number_of_non_for_testing_users(args, device_state):
1329    allowed_non_for_testing_users = 1
1330    if args.headless:
1331        allowed_non_for_testing_users = 2
1332    has_changed_users = False
1333
1334    for user_id in sorted(device_state["users"].keys()):
1335        if is_for_testing(device_state, device_state["users"][user_id]):
1336            continue
1337        if allowed_non_for_testing_users <= 0:
1338            print("Removing user " + str(user_id) + " as exceeded supported non-for-testing users")
1339            remove_user(args, user_id)
1340            has_changed_users = True
1341        allowed_non_for_testing_users -= 1
1342
1343    if has_changed_users:
1344        device_state["users"] = get_users(args)
1345
1346def main():
1347    args = get_args()
1348
1349    build_modules(args)
1350
1351    install(args)
1352
1353    if args.test:
1354        should_run = True
1355        while should_run:
1356            should_run = False
1357            btest_run, has_quit = run_tests(args)
1358
1359            out = os.environ["OUT"]
1360            with open(out + "/btest_passes.txt", "w") as o:
1361                for test in btest_run.passed_tests:
1362                    o.write(test["testName"] + "\n")
1363            with open(out + "/btest_failures.txt", "w") as o:
1364                for test in btest_run.failed_tests:
1365                    o.write(test["testName"] + "\n")
1366            with open(out + "/btest_ignored.txt", "w") as o:
1367                for test in btest_run.ignored_tests:
1368                    o.write(test["testName"] + "\n")
1369            with open(out + "/btest_assumption_failed.txt", "w") as o:
1370                for test in btest_run.assumption_failed_tests:
1371                    o.write(test["testName"] + "\n")
1372
1373            print("\n" + PASSED_CODE + "Passed: " + str(len(btest_run.passed_tests)) + RESET_CODE
1374                  + "," + FAILED_CODE + " Failed: " + str(len(btest_run.failed_tests)) + RESET_CODE
1375                  + "," + IGNORED_CODE + " Ignored: " + str(len(btest_run.ignored_tests)) + RESET_CODE
1376                  + ", " + IGNORED_CODE + "Assumption Failed: " + str(len(btest_run.assumption_failed_tests)) + RESET_CODE)
1377
1378            if len(btest_run.failed_tests) > 0:
1379                print("\n\nFailures:")
1380                for test_result in btest_run.failed_tests:
1381                    print(test_result["testName"] + " ❌  " + FAILED_CODE + "FAILED (" + test_result["message"] + ")" + RESET_CODE + " (" + format_nanos(test_result["runTime"]) + ")", flush=True)
1382            else:
1383                # No failures
1384                if args.rerun_after_all_pass and not has_quit:
1385                    print("All passed. rerun-after-all-pass specified. Rerunning...")
1386                    should_run = True
1387
1388def debug(args, msg):
1389    if args.debug:
1390        print(msg)
1391
1392if __name__ == '__main__':
1393    main()
1394