1#!/usr/bin/python 2import argparse 3from time import sleep, strftime 4from sys import argv 5import ctypes as ct 6from bcc import BPF, USDT 7import inspect 8import os 9 10# Parse command line arguments 11parser = argparse.ArgumentParser(description="Trace the moving average of the latency of an operation using usdt probes.", 12 formatter_class=argparse.RawDescriptionHelpFormatter) 13parser.add_argument("-p", "--pid", type=int, help="The id of the process to trace.") 14parser.add_argument("-i", "--interval", type=int, help="The interval in seconds on which to report the latency distribution.") 15parser.add_argument("-c", "--count", type=int, default=16, help="The maximum number of samples over which to calculate the moving average.") 16parser.add_argument("-f", "--filterstr", type=str, default="", help="The prefix filter for the operation input. If specified, only operations for which the input string starts with the filterstr are traced.") 17parser.add_argument("-v", "--verbose", dest="verbose", action="store_true", help="If true, will output generated bpf program and verbose logging information.") 18parser.add_argument("-s", "--sdt", dest="sdt", action="store_true", help="If true, will use the probes, created by systemtap's dtrace.") 19 20parser.set_defaults(verbose=False) 21args = parser.parse_args() 22this_pid = int(args.pid) 23this_interval = int(args.interval) 24this_maxsamplesize = int(args.count) 25this_filter = str(args.filterstr) 26 27if this_interval < 1: 28 print("Invalid value for interval, using 1.") 29 this_interval = 1 30 31if this_maxsamplesize < 1: 32 print("Invalid value for this_maxsamplesize, using 1.") 33 this_maxsamplesize = 1 34 35debugLevel=0 36if args.verbose: 37 debugLevel=4 38 39# BPF program 40bpf_text_shared = "%s/bpf_text_shared.c" % os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) 41bpf_text = open(bpf_text_shared, 'r').read() 42bpf_text += """ 43 44const u32 max_sample_size = MAX_SAMPLE_SIZE; 45 46struct hash_key_t 47{ 48 char input[64]; // The operation id is used as key 49}; 50 51struct hash_leaf_t 52{ 53 u32 sample_size; // Number of operation samples taken 54 u64 total; // Cumulative duration of the operations 55 u64 average; // Moving average duration of the operations 56}; 57 58/** 59 * @brief Contains the averages for the operation latencies by operation input. 60 */ 61BPF_HASH(lat_hash, struct hash_key_t, struct hash_leaf_t, 512); 62 63/** 64 * @brief Reads the operation response arguments, calculates the latency, and stores it in the histogram. 65 * @param ctx The BPF context. 66 */ 67int trace_operation_end(struct pt_regs* ctx) 68{ 69 u64 operation_id; 70 bpf_usdt_readarg(1, ctx, &operation_id); 71 72 struct start_data_t* start_data = start_hash.lookup(&operation_id); 73 if (0 == start_data) { 74 return 0; 75 } 76 77 u64 duration = bpf_ktime_get_ns() - start_data->start; 78 struct hash_key_t hash_key = {}; 79 __builtin_memcpy(&hash_key.input, start_data->input, sizeof(hash_key.input)); 80 start_hash.delete(&operation_id); 81 82 struct hash_leaf_t zero = {}; 83 struct hash_leaf_t* hash_leaf = lat_hash.lookup_or_try_init(&hash_key, &zero); 84 if (0 == hash_leaf) { 85 return 0; 86 } 87 88 if (hash_leaf->sample_size < max_sample_size) { 89 ++hash_leaf->sample_size; 90 } else { 91 hash_leaf->total -= hash_leaf->average; 92 } 93 94 hash_leaf->total += duration; 95 hash_leaf->average = hash_leaf->total / hash_leaf->sample_size; 96 97 return 0; 98} 99""" 100 101bpf_text = bpf_text.replace("MAX_SAMPLE_SIZE", str(this_maxsamplesize)) 102bpf_text = bpf_text.replace("FILTER_STRING", this_filter) 103if this_filter: 104 bpf_text = bpf_text.replace("FILTER_STATEMENT", "if (!filter(start_data.input)) { return 0; }") 105else: 106 bpf_text = bpf_text.replace("FILTER_STATEMENT", "") 107 108# Create USDT context 109print("lat_avg.py - Attaching probes to pid: %d; filter: %s" % (this_pid, this_filter)) 110usdt_ctx = USDT(pid=this_pid) 111 112if args.sdt: 113 usdt_ctx.enable_probe(probe="usdt_sample_lib1_sdt:operation_start_sdt", fn_name="trace_operation_start") 114 usdt_ctx.enable_probe(probe="usdt_sample_lib1_sdt:operation_end_sdt", fn_name="trace_operation_end") 115else: 116 usdt_ctx.enable_probe(probe="usdt_sample_lib1:operation_start", fn_name="trace_operation_start") 117 usdt_ctx.enable_probe(probe="usdt_sample_lib1:operation_end", fn_name="trace_operation_end") 118 119# Create BPF context, load BPF program 120bpf_ctx = BPF(text=bpf_text, usdt_contexts=[usdt_ctx], debug=debugLevel) 121 122print("Tracing... Hit Ctrl-C to end.") 123 124lat_hash = bpf_ctx.get_table("lat_hash") 125print("%-12s %-64s %8s %16s" % ("time", "input", "sample_size", "latency (us)")) 126while (1): 127 try: 128 sleep(this_interval) 129 except KeyboardInterrupt: 130 exit() 131 132 for k, v in lat_hash.items(): 133 print("%-12s %-64s %8d %16d" % (strftime("%H:%M:%S"), k.input, v.sample_size, v.average / 1000)) 134