xref: /aosp_15_r20/external/bcc/src/python/bcc/utils.py (revision 387f9dfdfa2baef462e92476d413c7bc2470293e)
1# Copyright 2016 Catalysts GmbH
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import ctypes as ct
15import sys
16import traceback
17import warnings
18import re
19
20from .libbcc import lib
21
22def _read_cpu_range(path):
23    cpus = []
24    with open(path, 'r') as f:
25        cpus_range_str = f.read()
26        for cpu_range in cpus_range_str.split(','):
27            rangeop = cpu_range.find('-')
28            if rangeop == -1:
29                cpus.append(int(cpu_range))
30            else:
31                start = int(cpu_range[:rangeop])
32                end = int(cpu_range[rangeop+1:])
33                cpus.extend(range(start, end+1))
34    return cpus
35
36def get_online_cpus():
37    return _read_cpu_range('/sys/devices/system/cpu/online')
38
39def get_possible_cpus():
40    return _read_cpu_range('/sys/devices/system/cpu/possible')
41
42def detect_language(candidates, pid):
43    res = lib.bcc_procutils_language(pid)
44    language = ct.cast(res, ct.c_char_p).value.decode()
45    return language if language in candidates else None
46
47FILESYSTEMENCODING = sys.getfilesystemencoding()
48
49def printb(s, file=sys.stdout, nl=1):
50    """
51    printb(s)
52
53    print a bytes object to stdout and flush
54    """
55    buf = file.buffer if hasattr(file, "buffer") else file
56
57    buf.write(s)
58    if nl:
59        buf.write(b"\n")
60    file.flush()
61
62class ArgString(object):
63    """
64    ArgString(arg)
65
66    encapsulate a system argument that can be easily coerced to a bytes()
67    object, which is better for comparing to kernel or probe data (which should
68    never be en/decode()'ed).
69    """
70    def __init__(self, arg):
71        if sys.version_info[0] >= 3:
72            self.s = arg
73        else:
74            self.s = arg.decode(FILESYSTEMENCODING)
75
76    def __bytes__(self):
77        return self.s.encode(FILESYSTEMENCODING)
78
79    def __str__(self):
80        return self.s
81
82def warn_with_traceback(message, category, filename, lineno, file=None, line=None):
83    log = file if hasattr(file, "write") else sys.stderr
84    traceback.print_stack(f=sys._getframe(2), file=log)
85    log.write(warnings.formatwarning(message, category, filename, lineno, line))
86
87# uncomment to get full tracebacks for invalid uses of python3+str in arguments
88#warnings.showwarning = warn_with_traceback
89
90_strict_bytes = False
91def _assert_is_bytes(arg):
92    if arg is None:
93        return arg
94    if _strict_bytes:
95        assert type(arg) is bytes, "not a bytes object: %r" % arg
96    elif type(arg) is not bytes:
97        warnings.warn("not a bytes object: %r" % arg, DeprecationWarning, 2)
98        return ArgString(arg).__bytes__()
99    return arg
100
101class StrcmpRewrite(object):
102    @staticmethod
103    def _generate_streq_function(string, probe_read_func, streq_functions,
104                                probeid):
105        fname = "streq_%d" % probeid
106        streq_functions += """
107static inline bool %s(char const *ignored, uintptr_t str) {
108        char needle[] = %s;
109        char haystack[sizeof(needle)];
110        %s(&haystack, sizeof(haystack), (void *)str);
111        for (int i = 0; i < sizeof(needle) - 1; ++i) {
112                if (needle[i] != haystack[i]) {
113                        return false;
114                }
115        }
116        return true;
117}
118        """ % (fname, string, probe_read_func)
119        return fname, streq_functions
120
121    @staticmethod
122    def rewrite_expr(expr, bin_cmp, is_user, probe_user_list, streq_functions,
123                    probeid):
124        if bin_cmp:
125            STRCMP_RE = 'STRCMP\\(\"([^"]+)\\",(.+?)\\)'
126        else:
127            STRCMP_RE = 'STRCMP\\(("[^"]+\\"),(.+?)\\)'
128        matches = re.finditer(STRCMP_RE, expr)
129        for match in matches:
130            string = match.group(1)
131            probe_read_func = "bpf_probe_read"
132            # if user probe or @user tag is specified, use
133            # bpf_probe_read_user for char* read
134            if is_user or \
135                match.group(2).strip() in probe_user_list:
136                    probe_read_func = "bpf_probe_read_user"
137            fname, streq_functions = StrcmpRewrite._generate_streq_function(
138                                            string, probe_read_func,
139                                            streq_functions, probeid)
140            probeid += 1
141            expr = expr.replace("STRCMP", fname, 1)
142        rdict = {
143            "expr": expr,
144            "streq_functions": streq_functions,
145            "probeid": probeid
146        }
147        return rdict
148