1#!/usr/bin/env python3 2# 3# Copyright 2022 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import json 18import os 19import tempfile 20import subprocess 21import time 22 23from pathlib import Path 24from typing import Any, MutableMapping, Optional 25 26from acts import context 27from acts import logger 28from acts import signals 29from acts import utils 30 31 32FFX_DEFAULT_COMMAND_TIMEOUT: int = 60 33 34 35class FFXError(signals.TestError): 36 """Non-zero error code returned from a ffx command.""" 37 38 def __init__(self, command: str, 39 process: subprocess.CalledProcessError) -> None: 40 self.command = command 41 self.stdout: str = process.stdout.decode('utf-8', errors='replace') 42 self.stderr: str = process.stderr.decode('utf-8', errors='replace') 43 self.exit_status = process.returncode 44 45 def __str__(self) -> str: 46 return f'ffx subcommand "{self.command}" returned {self.exit_status}, stdout: "{self.stdout}", stderr: "{self.stderr}"' 47 48 49class FFXTimeout(signals.TestError): 50 """Timed out running a ffx command.""" 51 52 53class FFX: 54 """Device-specific controller for the ffx tool. 55 56 Attributes: 57 log: Logger for the device-specific instance of ffx. 58 binary_path: Path to the ffx binary. 59 mdns_name: mDNS nodename of the default Fuchsia target. 60 ip: IP address of the default Fuchsia target. 61 ssh_private_key_path: Path to Fuchsia DUT SSH private key. 62 """ 63 64 def __init__(self, 65 binary_path: str, 66 mdns_name: str, 67 ip: str = None, 68 ssh_private_key_path: str = None): 69 """ 70 Args: 71 binary_path: Path to ffx binary. 72 target: Fuchsia mDNS nodename of default target. 73 ssh_private_key_path: Path to SSH private key for talking to the 74 Fuchsia DUT. 75 """ 76 self.log = logger.create_tagged_trace_logger(f"ffx | {mdns_name}") 77 self.binary_path = binary_path 78 self.mdns_name = mdns_name 79 self.ip = ip 80 self.ssh_private_key_path = ssh_private_key_path 81 82 self._env_config_path: Optional[str] = None 83 self._ssh_auth_sock_path: Optional[str] = None 84 self._overnet_socket_path: Optional[str] = None 85 self._has_been_reachable = False 86 self._has_logged_version = False 87 88 def clean_up(self) -> None: 89 if self._env_config_path: 90 self.run("daemon stop", skip_reachability_check=True) 91 if self._ssh_auth_sock_path: 92 Path(self._ssh_auth_sock_path).unlink(missing_ok=True) 93 if self._overnet_socket_path: 94 Path(self._overnet_socket_path).unlink(missing_ok=True) 95 96 self._env_config_path = None 97 self._ssh_auth_sock_path = None 98 self._overnet_socket_path = None 99 self._has_been_reachable = False 100 self._has_logged_version = False 101 102 def run(self, 103 command: str, 104 timeout_sec: int = FFX_DEFAULT_COMMAND_TIMEOUT, 105 skip_status_code_check: bool = False, 106 skip_reachability_check: bool = False 107 ) -> subprocess.CompletedProcess: 108 """Runs an ffx command. 109 110 Verifies reachability before running, if it hasn't already. 111 112 Args: 113 command: Command to run with ffx. 114 timeout_sec: Seconds to wait for a command to complete. 115 skip_status_code_check: Whether to check for the status code. 116 verify_reachable: Whether to verify reachability before running. 117 118 Raises: 119 FFXTimeout: when the command times out. 120 FFXError: when the command returns non-zero and skip_status_code_check is False. 121 122 Returns: 123 The results of the command. Note subprocess.CompletedProcess returns 124 stdout and stderr as a byte-array, not a string. Treat these members 125 as such or convert to a string using bytes.decode('utf-8'). 126 """ 127 if not self._env_config_path: 128 self._create_isolated_environment() 129 if not self._has_been_reachable and not skip_reachability_check: 130 self.log.info(f'Verifying reachability before running "{command}"') 131 self.verify_reachable() 132 133 self.log.debug(f'Running "{command}".') 134 full_command = f'{self.binary_path} -e {self._env_config_path} {command}' 135 136 try: 137 result = subprocess.run(full_command.split(), 138 capture_output=True, 139 timeout=timeout_sec, 140 check=not skip_status_code_check) 141 except subprocess.CalledProcessError as e: 142 raise FFXError(command, e) from e 143 except subprocess.TimeoutExpired as e: 144 raise FFXTimeout(f'Timed out running "{full_command}"') from e 145 146 return result 147 148 def _create_isolated_environment(self) -> None: 149 """ Create a new isolated environment for ffx. 150 151 This is needed to avoid overlapping ffx daemons while testing in 152 parallel, causing the ffx invocations to “upgrade” one daemon to 153 another, which appears as a flap/restart to another test. 154 """ 155 # Store ffx files in a unique directory. Timestamp is used to prevent 156 # files from being overwritten in the case when a test intentionally 157 # reboots or resets the device such that a new isolated ffx environment 158 # is created. 159 root_dir = context.get_current_context().get_full_output_path() 160 epoch = utils.get_current_epoch_time() 161 time_stamp = logger.normalize_log_line_timestamp( 162 logger.epoch_to_log_line_timestamp(epoch)) 163 target_dir = os.path.join(root_dir, f'{self.mdns_name}_{time_stamp}') 164 os.makedirs(target_dir, exist_ok=True) 165 166 # Sockets need to be created in a different directory to be guaranteed 167 # to stay under the maximum socket path length of 104 characters. 168 # See https://unix.stackexchange.com/q/367008 169 self._ssh_auth_sock_path = tempfile.mkstemp(suffix="ssh_auth_sock")[1] 170 self._overnet_socket_path = tempfile.mkstemp( 171 suffix="overnet_socket")[1] 172 173 config: MutableMapping[str, Any] = { 174 "target": { 175 "default": self.mdns_name, 176 }, 177 # Use user-specific and device-specific locations for sockets. 178 # Avoids user permission errors in a multi-user test environment. 179 # Avoids daemon upgrades when running tests in parallel in a CI 180 # environment. 181 "ssh": { 182 "auth-sock": self._ssh_auth_sock_path, 183 }, 184 "overnet": { 185 "socket": self._overnet_socket_path, 186 }, 187 # Configure the ffx daemon to log to a place where we can read it. 188 # Note, ffx client will still output to stdout, not this log 189 # directory. 190 "log": { 191 "enabled": True, 192 "dir": [target_dir], 193 }, 194 # Disable analytics to decrease noise on the network. 195 "ffx": { 196 "analytics": { 197 "disabled": True, 198 }, 199 }, 200 } 201 202 if self.ip: 203 config["discovery"] = { 204 "mdns": { 205 "enabled": False, 206 }, 207 } 208 209 # ffx looks for the private key in several default locations. For 210 # testbeds which have the private key in another location, set it now. 211 if self.ssh_private_key_path: 212 config["ssh"]["priv"] = self.ssh_private_key_path 213 214 config_path = os.path.join(target_dir, "ffx_config.json") 215 with open(config_path, 'w', encoding="utf-8") as f: 216 json.dump(config, f, ensure_ascii=False, indent=4) 217 218 env = { 219 "user": config_path, 220 "build": None, 221 "global": None, 222 } 223 self._env_config_path = os.path.join(target_dir, "ffx_env.json") 224 with open(self._env_config_path, 'w', encoding="utf-8") as f: 225 json.dump(env, f, ensure_ascii=False, indent=4) 226 227 # The ffx daemon will started automatically when needed. There is no 228 # need to start it manually here. 229 230 def verify_reachable(self, 231 timeout_sec: int = FFX_DEFAULT_COMMAND_TIMEOUT 232 ) -> None: 233 """Verify the target is reachable via RCS and various services. 234 235 Blocks until the device allows for an RCS connection. If the device 236 isn't reachable within a short time, logs a warning before waiting 237 longer. 238 239 Verifies the RCS connection by fetching information from the device, 240 which exercises several debug and informational FIDL services. 241 242 When called for the first time, the versions will be checked for 243 compatibility. 244 245 Args: 246 timeout_sec: Seconds to wait for reachability check 247 248 Raises: 249 FFXError: when an unknown error occurs 250 FFXTimeout: when the target is unreachable 251 """ 252 cmd = "target wait" 253 if self.ip: 254 # `target add` does what `target wait` does but adds an entry 255 # to ensure connections can happen without mDNS. 256 # TODO(https://fxbug.dev/105530): Update manual target parsing in 257 # ffx. 258 cmd = f"target add {self.ip}" 259 260 timeout = time.perf_counter() + timeout_sec 261 while True: 262 try: 263 self.run(cmd, timeout_sec=5, skip_reachability_check=True) 264 break 265 except FFXError as e: 266 if 'took too long connecting to ascendd socket' in e.stderr: 267 err = e 268 else: 269 raise e 270 except FFXTimeout as e: 271 err = e 272 273 if time.perf_counter() > timeout: 274 raise FFXTimeout( 275 f'Waited over {timeout_sec}s for ffx to become reachable' 276 ) from err 277 278 # Use a shorter timeout than default because device information 279 # gathering can hang for a long time if the device is not actually 280 # connectable. 281 try: 282 result = self.run("target show --json", 283 timeout_sec=15, 284 skip_reachability_check=True) 285 except Exception as e: 286 self.log.error( 287 f'Failed to reach target device. Try running "{self.binary_path}' 288 + ' doctor" to diagnose issues.') 289 raise e 290 291 self._has_been_reachable = True 292 293 if not self._has_logged_version: 294 self._has_logged_version = True 295 self.compare_version(result) 296 297 def compare_version( 298 self, target_show_result: subprocess.CompletedProcess) -> None: 299 """Compares the version of Fuchsia with the version of ffx. 300 301 Args: 302 target_show_result: Result of the target show command with JSON 303 output mode enabled 304 """ 305 result_json = json.loads(target_show_result.stdout) 306 build_info = next( 307 filter(lambda s: s.get('label') == 'build', result_json)) 308 version_info = next( 309 filter(lambda s: s.get('label') == 'version', build_info['child'])) 310 device_version = version_info.get('value') 311 ffx_version = self.run("version").stdout.decode('utf-8') 312 313 self.log.info( 314 f"Device version: {device_version}, ffx version: {ffx_version}") 315 if device_version != ffx_version: 316 self.log.warning( 317 "ffx versions that differ from device versions may" + 318 " have compatibility issues. It is recommended to" + 319 " use versions within 6 weeks of each other.") 320