1#!/usr/bin/env python3 2# Copyright (c) PLUMgrid, Inc. 3# Licensed under the Apache License, Version 2.0 (the "License") 4 5from bcc import BPF 6import os 7import sys 8from unittest import main, TestCase 9 10class TestKprobeRgx(TestCase): 11 def setUp(self): 12 self.b = BPF(text=b""" 13 typedef struct { int idx; } Key; 14 typedef struct { u64 val; } Val; 15 BPF_HASH(stats, Key, Val, 3); 16 int hello(void *ctx) { 17 Val *val = stats.lookup_or_try_init(&(Key){1}, &(Val){0}); 18 if (val) { 19 val->val++; 20 } 21 return 0; 22 } 23 int goodbye(void *ctx) { 24 Val *val = stats.lookup_or_try_init(&(Key){2}, &(Val){0}); 25 if (val) { 26 val->val++; 27 } 28 return 0; 29 } 30 """) 31 self.b.attach_kprobe(event_re=b"^" + self.b.get_syscall_prefix() + b"bp.*", 32 fn_name=b"hello") 33 self.b.attach_kretprobe(event_re=b"^" + self.b.get_syscall_prefix() + b"bp.*", 34 fn_name=b"goodbye") 35 36 def test_send1(self): 37 k1 = self.b[b"stats"].Key(1) 38 k2 = self.b[b"stats"].Key(2) 39 self.assertTrue(self.b[b"stats"][k1].val >= 2) 40 self.assertTrue(self.b[b"stats"][k2].val == 1) 41 42class TestKprobeReplace(TestCase): 43 def setUp(self): 44 self.b = BPF(text=b"int empty(void *ctx) { return 0; }") 45 46 def test_periods(self): 47 self.b.attach_kprobe(event_re=b"^tcp_enter_cwr.*", fn_name=b"empty") 48 49if __name__ == "__main__": 50 main() 51