1# Copyright 2022 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15Common functionality for running xDS Test Client and Server remotely.
16"""
17from abc import ABCMeta
18from abc import abstractmethod
19import functools
20import pathlib
21import threading
22from typing import Dict, Optional
23import urllib.parse
24
25from absl import flags
26
27from framework import xds_flags
28from framework.helpers import logs
29
30flags.adopt_module_key_flags(logs)
31_LOGS_SUBDIR = 'test_app_logs'
32
33
34class RunnerError(Exception):
35    """Error running xDS Test App running remotely."""
36
37
38class BaseRunner(metaclass=ABCMeta):
39    _logs_subdir: Optional[pathlib.Path] = None
40    _log_stop_event: Optional[threading.Event] = None
41
42    def __init__(self):
43        if xds_flags.COLLECT_APP_LOGS.value:
44            self._logs_subdir = logs.log_dir_mkdir(_LOGS_SUBDIR)
45            self._log_stop_event = threading.Event()
46
47    @property
48    @functools.lru_cache(None)
49    def should_collect_logs(self) -> bool:
50        return self._logs_subdir is not None
51
52    @property
53    @functools.lru_cache(None)
54    def logs_subdir(self) -> pathlib.Path:
55        if not self.should_collect_logs:
56            raise FileNotFoundError('Log collection is not enabled.')
57        return self._logs_subdir
58
59    @property
60    def log_stop_event(self) -> threading.Event:
61        if not self.should_collect_logs:
62            raise ValueError('Log collection is not enabled.')
63        return self._log_stop_event
64
65    def maybe_stop_logging(self):
66        if self.should_collect_logs and not self.log_stop_event.is_set():
67            self.log_stop_event.set()
68
69    @abstractmethod
70    def run(self, **kwargs):
71        pass
72
73    @abstractmethod
74    def cleanup(self, *, force=False):
75        pass
76
77    @classmethod
78    def _logs_explorer_link_from_params(
79            cls,
80            *,
81            gcp_ui_url: str,
82            gcp_project: str,
83            query: Dict[str, str],
84            request: Optional[Dict[str, str]] = None) -> str:
85        req_merged = {'query': cls._logs_explorer_query(query)}
86        if request is not None:
87            req_merged.update(request)
88
89        req = cls._logs_explorer_request(req_merged)
90        return f'https://{gcp_ui_url}/logs/query;{req}?project={gcp_project}'
91
92    @classmethod
93    def _logs_explorer_query(cls, query: Dict[str, str]) -> str:
94        return '\n'.join(f'{k}="{v}"' for k, v in query.items())
95
96    @classmethod
97    def _logs_explorer_request(cls, req: Dict[str, str]) -> str:
98        return ';'.join(
99            f'{k}={cls._logs_explorer_quote(v)}' for k, v in req.items())
100
101    @classmethod
102    def _logs_explorer_quote(cls, value: str) -> str:
103        return urllib.parse.quote_plus(value, safe=':')
104