xref: /aosp_15_r20/external/bcc/tests/python/test_map_in_map.py (revision 387f9dfdfa2baef462e92476d413c7bc2470293e)
1#!/usr/bin/env python3
2#
3# USAGE: test_map_in_map.py
4#
5# Copyright 2019 Facebook, Inc
6# Licensed under the Apache License, Version 2.0 (the "License")
7
8from __future__ import print_function
9from bcc import BPF
10from unittest import main, skipUnless, TestCase
11from utils import kernel_version_ge
12import ctypes as ct
13import os
14
15
16class CustomKey(ct.Structure):
17  _fields_ = [
18    ("value_1", ct.c_int),
19    ("value_2", ct.c_int)
20  ]
21
22@skipUnless(kernel_version_ge(4,11), "requires kernel >= 4.11")
23class TestUDST(TestCase):
24    def test_hash_table(self):
25        bpf_text = b"""
26      BPF_ARRAY(cntl, int, 1);
27      BPF_TABLE("hash", int, int, ex1, 1024);
28      BPF_TABLE("hash", int, int, ex2, 1024);
29      BPF_HASH_OF_MAPS(maps_hash, int, "ex1", 10);
30
31      int syscall__getuid(void *ctx) {
32         int key = 0, data, *val, cntl_val;
33         void *inner_map;
34
35         val = cntl.lookup(&key);
36         if (!val || *val == 0)
37           return 0;
38
39         cntl_val = *val;
40         inner_map = maps_hash.lookup(&cntl_val);
41         if (!inner_map)
42           return 0;
43
44         val = bpf_map_lookup_elem(inner_map, &key);
45         if (!val) {
46           data = 1;
47           bpf_map_update_elem(inner_map, &key, &data, 0);
48         } else {
49           data = 1 + *val;
50           bpf_map_update_elem(inner_map, &key, &data, 0);
51         }
52
53         return 0;
54      }
55"""
56        b = BPF(text=bpf_text)
57        cntl_map = b.get_table(b"cntl")
58        ex1_map = b.get_table(b"ex1")
59        ex2_map = b.get_table(b"ex2")
60        hash_maps = b.get_table(b"maps_hash")
61
62        hash_maps[ct.c_int(1)] = ct.c_int(ex1_map.get_fd())
63        hash_maps[ct.c_int(2)] = ct.c_int(ex2_map.get_fd())
64
65        syscall_fnname = b.get_syscall_fnname(b"getuid")
66        b.attach_kprobe(event=syscall_fnname, fn_name=b"syscall__getuid")
67
68        try:
69          ex1_map[ct.c_int(0)]
70          raise Exception("Unexpected success for ex1_map[0]")
71        except KeyError:
72          pass
73
74        cntl_map[0] = ct.c_int(1)
75        os.getuid()
76        assert(ex1_map[ct.c_int(0)].value >= 1)
77
78        try:
79          ex2_map[ct.c_int(0)]
80          raise Exception("Unexpected success for ex2_map[0]")
81        except KeyError:
82          pass
83
84        cntl_map[0] = ct.c_int(2)
85        os.getuid()
86        assert(ex2_map[ct.c_int(0)].value >= 1)
87
88        b.detach_kprobe(event=syscall_fnname)
89        del hash_maps[ct.c_int(1)]
90        del hash_maps[ct.c_int(2)]
91
92    def test_hash_table_custom_key(self):
93        bpf_text = b"""
94        struct custom_key {
95          int value_1;
96          int value_2;
97        };
98
99        BPF_ARRAY(cntl, int, 1);
100        BPF_TABLE("hash", int, int, ex1, 1024);
101        BPF_TABLE("hash", int, int, ex2, 1024);
102        BPF_HASH_OF_MAPS(maps_hash, struct custom_key, "ex1", 10);
103
104        int syscall__getuid(void *ctx) {
105          struct custom_key hash_key = {1, 0};
106          int key = 0, data, *val, cntl_val;
107          void *inner_map;
108
109          val = cntl.lookup(&key);
110          if (!val || *val == 0)
111            return 0;
112
113          hash_key.value_2 = *val;
114          inner_map = maps_hash.lookup(&hash_key);
115          if (!inner_map)
116            return 0;
117
118          val = bpf_map_lookup_elem(inner_map, &key);
119          if (!val) {
120            data = 1;
121            bpf_map_update_elem(inner_map, &key, &data, 0);
122          } else {
123            data = 1 + *val;
124            bpf_map_update_elem(inner_map, &key, &data, 0);
125          }
126
127          return 0;
128        }
129"""
130        b = BPF(text=bpf_text)
131        cntl_map = b.get_table(b"cntl")
132        ex1_map = b.get_table(b"ex1")
133        ex2_map = b.get_table(b"ex2")
134        hash_maps = b.get_table(b"maps_hash")
135
136        hash_maps[CustomKey(1, 1)] = ct.c_int(ex1_map.get_fd())
137        hash_maps[CustomKey(1, 2)] = ct.c_int(ex2_map.get_fd())
138        syscall_fnname = b.get_syscall_fnname(b"getuid")
139        b.attach_kprobe(event=syscall_fnname, fn_name=b"syscall__getuid")
140
141        try:
142          ex1_map[ct.c_int(0)]
143          raise Exception("Unexpected success for ex1_map[0]")
144        except KeyError:
145          pass
146
147        cntl_map[0] = ct.c_int(1)
148        os.getuid()
149        assert(ex1_map[ct.c_int(0)].value >= 1)
150
151        try:
152          ex2_map[ct.c_int(0)]
153          raise Exception("Unexpected success for ex2_map[0]")
154        except KeyError:
155          pass
156
157        cntl_map[0] = ct.c_int(2)
158        os.getuid()
159        assert(ex2_map[ct.c_int(0)].value >= 1)
160
161        b.detach_kprobe(event=syscall_fnname)
162        del hash_maps[CustomKey(1, 1)]
163        del hash_maps[CustomKey(1, 2)]
164
165    def test_array_table(self):
166        bpf_text = b"""
167      BPF_ARRAY(cntl, int, 1);
168      BPF_ARRAY(ex1, int, 1024);
169      BPF_ARRAY(ex2, int, 1024);
170      BPF_ARRAY_OF_MAPS(maps_array, "ex1", 10);
171
172      int syscall__getuid(void *ctx) {
173         int key = 0, data, *val, cntl_val;
174         void *inner_map;
175
176         val = cntl.lookup(&key);
177         if (!val || *val == 0)
178           return 0;
179
180         cntl_val = *val;
181         inner_map = maps_array.lookup(&cntl_val);
182         if (!inner_map)
183           return 0;
184
185         val = bpf_map_lookup_elem(inner_map, &key);
186         if (val) {
187           data = 1 + *val;
188           bpf_map_update_elem(inner_map, &key, &data, 0);
189         }
190
191         return 0;
192      }
193"""
194        b = BPF(text=bpf_text)
195        cntl_map = b.get_table(b"cntl")
196        ex1_map = b.get_table(b"ex1")
197        ex2_map = b.get_table(b"ex2")
198        array_maps = b.get_table(b"maps_array")
199
200        array_maps[ct.c_int(1)] = ct.c_int(ex1_map.get_fd())
201        array_maps[ct.c_int(2)] = ct.c_int(ex2_map.get_fd())
202
203        syscall_fnname = b.get_syscall_fnname(b"getuid")
204        b.attach_kprobe(event=syscall_fnname, fn_name=b"syscall__getuid")
205
206        cntl_map[0] = ct.c_int(1)
207        os.getuid()
208        assert(ex1_map[ct.c_int(0)].value >= 1)
209
210        cntl_map[0] = ct.c_int(2)
211        os.getuid()
212        assert(ex2_map[ct.c_int(0)].value >= 1)
213
214        b.detach_kprobe(event=syscall_fnname)
215
216if __name__ == "__main__":
217    main()
218