xref: /aosp_15_r20/external/bcc/tests/python/test_usdt3.py (revision 387f9dfdfa2baef462e92476d413c7bc2470293e)
1#!/usr/bin/env python3
2#
3# USAGE: test_usdt3.py
4#
5# Copyright 2018 Facebook, Inc
6# Licensed under the Apache License, Version 2.0 (the "License")
7
8from __future__ import print_function
9from bcc import BPF, USDT
10from unittest import main, TestCase
11from subprocess import Popen, PIPE
12import ctypes as ct
13import inspect, os, tempfile
14
15class TestUDST(TestCase):
16    def setUp(self):
17        common_h = b"""
18#include "folly/tracing/StaticTracepoint.h"
19
20static inline void record_val(int val)
21{
22  FOLLY_SDT(test, probe, val);
23  FOLLY_SDT(test_dup_name, probe, val);
24}
25
26extern void record_a(int val);
27extern void record_b(int val);
28"""
29
30        a_c = b"""
31#include <stdio.h>
32#include "common.h"
33
34void record_a(int val)
35{
36    record_val(val);
37}
38"""
39
40        b_c = b"""
41#include <stdio.h>
42#include "common.h"
43
44void record_b(int val)
45{
46    record_val(val);
47}
48"""
49
50        m_c = b"""
51#include <stdio.h>
52#include <unistd.h>
53#include "common.h"
54
55int main() {
56   while (1) {
57     record_a(1);
58     record_b(2);
59     record_val(3);
60     sleep(1);
61   }
62   return 0;
63}
64"""
65        # BPF program
66        self.bpf_text = b"""
67BPF_PERF_OUTPUT(event);
68int do_trace(struct pt_regs *ctx) {
69    int result = 0;
70    bpf_usdt_readarg(1, ctx, &result);
71    event.perf_submit(ctx, &result, sizeof(result));
72    return 0;
73};
74"""
75
76        def _create_file(name, text):
77            text_file = open(name, "wb")
78            text_file.write(text)
79            text_file.close()
80
81        # Create source files
82        self.tmp_dir = tempfile.mkdtemp()
83        print("temp directory: " + self.tmp_dir)
84        _create_file(self.tmp_dir + "/common.h", common_h)
85        _create_file(self.tmp_dir + "/a.cpp", a_c)
86        _create_file(self.tmp_dir + "/b.cpp", b_c)
87        _create_file(self.tmp_dir + "/m.cpp", m_c)
88
89        # Compilation
90        # the usdt test:probe exists in liba.so, libb.so and a.out
91        include_path = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) + "/include"
92        a_src = self.tmp_dir + "/a.cpp"
93        a_obj = self.tmp_dir + "/a.o"
94        a_lib = self.tmp_dir + "/liba.so"
95        b_src = self.tmp_dir + "/b.cpp"
96        b_obj = self.tmp_dir + "/b.o"
97        b_lib = self.tmp_dir + "/libb.so"
98        m_src = self.tmp_dir + "/m.cpp"
99        m_bin = self.tmp_dir + "/a.out"
100        m_linker_opt = " -L" + self.tmp_dir + " -la -lb"
101        self.assertEqual(os.system("gcc -I" + include_path + " -fpic -c -o " + a_obj + " " + a_src), 0)
102        self.assertEqual(os.system("gcc -I" + include_path + " -fpic -c -o " + b_obj + " " + b_src), 0)
103        self.assertEqual(os.system("gcc -shared -o " + a_lib + " " + a_obj), 0)
104        self.assertEqual(os.system("gcc -shared -o " + b_lib + " " + b_obj), 0)
105        self.assertEqual(os.system("gcc -I" + include_path + " " + m_src + " -o " + m_bin + m_linker_opt), 0)
106
107        # Run the application
108        self.app = Popen([m_bin], env=dict(os.environ, LD_LIBRARY_PATH=self.tmp_dir))
109        os.system("../../tools/tplist.py -vvv -p " + str(self.app.pid))
110
111    def test_attach1(self):
112        # enable USDT probe from given PID and verifier generated BPF programs
113        u = USDT(pid=int(self.app.pid))
114        u.enable_probe(probe="test:probe", fn_name="do_trace")
115        b = BPF(text=self.bpf_text, usdt_contexts=[u])
116
117        # processing events
118        self.probe_value_1 = 0
119        self.probe_value_2 = 0
120        self.probe_value_3 = 0
121        self.probe_value_other = 0
122
123        def print_event(cpu, data, size):
124            result = ct.cast(data, ct.POINTER(ct.c_int)).contents
125            if result.value == 1:
126                self.probe_value_1 = 1
127            elif result.value == 2:
128                self.probe_value_2 = 1
129            elif result.value == 3:
130                self.probe_value_3 = 1
131            else:
132                self.probe_value_other = 1
133
134        b[b"event"].open_perf_buffer(print_event)
135        for i in range(100):
136            if (self.probe_value_1 == 0 or
137                self.probe_value_2 == 0 or
138                self.probe_value_3 == 0 or
139                self.probe_value_other != 0):
140                b.perf_buffer_poll()
141            else:
142                break;
143
144        self.assertTrue(self.probe_value_1 != 0)
145        self.assertTrue(self.probe_value_2 != 0)
146        self.assertTrue(self.probe_value_3 != 0)
147        self.assertTrue(self.probe_value_other == 0)
148
149    def tearDown(self):
150        # kill the subprocess, clean the environment
151        self.app.kill()
152        self.app.wait()
153        os.system("rm -rf " + self.tmp_dir)
154
155if __name__ == "__main__":
156    main()
157