1# Copyright 2022 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14""" 15Common functionality for running xDS Test Client and Server remotely. 16""" 17from abc import ABCMeta 18from abc import abstractmethod 19import functools 20import pathlib 21import threading 22from typing import Dict, Optional 23import urllib.parse 24 25from absl import flags 26 27from framework import xds_flags 28from framework.helpers import logs 29 30flags.adopt_module_key_flags(logs) 31_LOGS_SUBDIR = 'test_app_logs' 32 33 34class RunnerError(Exception): 35 """Error running xDS Test App running remotely.""" 36 37 38class BaseRunner(metaclass=ABCMeta): 39 _logs_subdir: Optional[pathlib.Path] = None 40 _log_stop_event: Optional[threading.Event] = None 41 42 def __init__(self): 43 if xds_flags.COLLECT_APP_LOGS.value: 44 self._logs_subdir = logs.log_dir_mkdir(_LOGS_SUBDIR) 45 self._log_stop_event = threading.Event() 46 47 @property 48 @functools.lru_cache(None) 49 def should_collect_logs(self) -> bool: 50 return self._logs_subdir is not None 51 52 @property 53 @functools.lru_cache(None) 54 def logs_subdir(self) -> pathlib.Path: 55 if not self.should_collect_logs: 56 raise FileNotFoundError('Log collection is not enabled.') 57 return self._logs_subdir 58 59 @property 60 def log_stop_event(self) -> threading.Event: 61 if not self.should_collect_logs: 62 raise ValueError('Log collection is not enabled.') 63 return self._log_stop_event 64 65 def maybe_stop_logging(self): 66 if self.should_collect_logs and not self.log_stop_event.is_set(): 67 self.log_stop_event.set() 68 69 @abstractmethod 70 def run(self, **kwargs): 71 pass 72 73 @abstractmethod 74 def cleanup(self, *, force=False): 75 pass 76 77 @classmethod 78 def _logs_explorer_link_from_params( 79 cls, 80 *, 81 gcp_ui_url: str, 82 gcp_project: str, 83 query: Dict[str, str], 84 request: Optional[Dict[str, str]] = None) -> str: 85 req_merged = {'query': cls._logs_explorer_query(query)} 86 if request is not None: 87 req_merged.update(request) 88 89 req = cls._logs_explorer_request(req_merged) 90 return f'https://{gcp_ui_url}/logs/query;{req}?project={gcp_project}' 91 92 @classmethod 93 def _logs_explorer_query(cls, query: Dict[str, str]) -> str: 94 return '\n'.join(f'{k}="{v}"' for k, v in query.items()) 95 96 @classmethod 97 def _logs_explorer_request(cls, req: Dict[str, str]) -> str: 98 return ';'.join( 99 f'{k}={cls._logs_explorer_quote(v)}' for k, v in req.items()) 100 101 @classmethod 102 def _logs_explorer_quote(cls, value: str) -> str: 103 return urllib.parse.quote_plus(value, safe=':') 104