1#!/usr/bin/env python 2# @lint-avoid-python-3-compatibility-imports 3# 4# tcptop Summarize TCP send/recv throughput by host. 5# For Linux, uses BCC, eBPF. Embedded C. 6# 7# USAGE: tcptop [-h] [-C] [-S] [-p PID] [interval [count]] [-4 | -6] 8# 9# This uses dynamic tracing of kernel functions, and will need to be updated 10# to match kernel changes. 11# 12# WARNING: This traces all send/receives at the TCP level, and while it 13# summarizes data in-kernel to reduce overhead, there may still be some 14# overhead at high TCP send/receive rates (eg, ~13% of one CPU at 100k TCP 15# events/sec. This is not the same as packet rate: funccount can be used to 16# count the kprobes below to find out the TCP rate). Test in a lab environment 17# first. If your send/receive rate is low (eg, <1k/sec) then the overhead is 18# expected to be negligible. 19# 20# ToDo: Fit output to screen size (top X only) in default (not -C) mode. 21# 22# Copyright 2016 Netflix, Inc. 23# Licensed under the Apache License, Version 2.0 (the "License") 24# 25# 02-Sep-2016 Brendan Gregg Created this. 26 27from __future__ import print_function 28from bcc import BPF 29from bcc.containers import filter_by_containers 30import argparse 31from socket import inet_ntop, AF_INET, AF_INET6 32from struct import pack 33from time import sleep, strftime 34from subprocess import call 35from collections import namedtuple, defaultdict 36 37# arguments 38def range_check(string): 39 value = int(string) 40 if value < 1: 41 msg = "value must be stricly positive, got %d" % (value,) 42 raise argparse.ArgumentTypeError(msg) 43 return value 44 45examples = """examples: 46 ./tcptop # trace TCP send/recv by host 47 ./tcptop -C # don't clear the screen 48 ./tcptop -p 181 # only trace PID 181 49 ./tcptop --cgroupmap mappath # only trace cgroups in this BPF map 50 ./tcptop --mntnsmap mappath # only trace mount namespaces in the map 51 ./tcptop -4 # trace IPv4 family only 52 ./tcptop -6 # trace IPv6 family only 53""" 54parser = argparse.ArgumentParser( 55 description="Summarize TCP send/recv throughput by host", 56 formatter_class=argparse.RawDescriptionHelpFormatter, 57 epilog=examples) 58parser.add_argument("-C", "--noclear", action="store_true", 59 help="don't clear the screen") 60parser.add_argument("-S", "--nosummary", action="store_true", 61 help="skip system summary line") 62parser.add_argument("-p", "--pid", 63 help="trace this PID only") 64parser.add_argument("interval", nargs="?", default=1, type=range_check, 65 help="output interval, in seconds (default 1)") 66parser.add_argument("count", nargs="?", default=-1, type=range_check, 67 help="number of outputs") 68parser.add_argument("--cgroupmap", 69 help="trace cgroups in this BPF map only") 70parser.add_argument("--mntnsmap", 71 help="trace mount namespaces in this BPF map only") 72group = parser.add_mutually_exclusive_group() 73group.add_argument("-4", "--ipv4", action="store_true", 74 help="trace IPv4 family only") 75group.add_argument("-6", "--ipv6", action="store_true", 76 help="trace IPv6 family only") 77parser.add_argument("--ebpf", action="store_true", 78 help=argparse.SUPPRESS) 79args = parser.parse_args() 80debug = 0 81 82# linux stats 83loadavg = "/proc/loadavg" 84 85# define BPF program 86bpf_text = """ 87#include <uapi/linux/ptrace.h> 88#include <net/sock.h> 89#include <bcc/proto.h> 90 91struct ipv4_key_t { 92 u32 pid; 93 char name[TASK_COMM_LEN]; 94 u32 saddr; 95 u32 daddr; 96 u16 lport; 97 u16 dport; 98}; 99BPF_HASH(ipv4_send_bytes, struct ipv4_key_t); 100BPF_HASH(ipv4_recv_bytes, struct ipv4_key_t); 101 102struct ipv6_key_t { 103 unsigned __int128 saddr; 104 unsigned __int128 daddr; 105 u32 pid; 106 char name[TASK_COMM_LEN]; 107 u16 lport; 108 u16 dport; 109 u64 __pad__; 110}; 111BPF_HASH(ipv6_send_bytes, struct ipv6_key_t); 112BPF_HASH(ipv6_recv_bytes, struct ipv6_key_t); 113BPF_HASH(sock_store, u32, struct sock *); 114 115static int tcp_sendstat(int size) 116{ 117 if (container_should_be_filtered()) { 118 return 0; 119 } 120 121 u32 pid = bpf_get_current_pid_tgid() >> 32; 122 FILTER_PID 123 u32 tid = bpf_get_current_pid_tgid(); 124 struct sock **sockpp; 125 sockpp = sock_store.lookup(&tid); 126 if (sockpp == 0) { 127 return 0; //miss the entry 128 } 129 struct sock *sk = *sockpp; 130 u16 dport = 0, family; 131 bpf_probe_read_kernel(&family, sizeof(family), 132 &sk->__sk_common.skc_family); 133 FILTER_FAMILY 134 135 if (family == AF_INET) { 136 struct ipv4_key_t ipv4_key = {.pid = pid}; 137 bpf_get_current_comm(&ipv4_key.name, sizeof(ipv4_key.name)); 138 bpf_probe_read_kernel(&ipv4_key.saddr, sizeof(ipv4_key.saddr), 139 &sk->__sk_common.skc_rcv_saddr); 140 bpf_probe_read_kernel(&ipv4_key.daddr, sizeof(ipv4_key.daddr), 141 &sk->__sk_common.skc_daddr); 142 bpf_probe_read_kernel(&ipv4_key.lport, sizeof(ipv4_key.lport), 143 &sk->__sk_common.skc_num); 144 bpf_probe_read_kernel(&dport, sizeof(dport), 145 &sk->__sk_common.skc_dport); 146 ipv4_key.dport = ntohs(dport); 147 ipv4_send_bytes.increment(ipv4_key, size); 148 149 } else if (family == AF_INET6) { 150 struct ipv6_key_t ipv6_key = {.pid = pid}; 151 bpf_get_current_comm(&ipv6_key.name, sizeof(ipv6_key.name)); 152 bpf_probe_read_kernel(&ipv6_key.saddr, sizeof(ipv6_key.saddr), 153 &sk->__sk_common.skc_v6_rcv_saddr.in6_u.u6_addr32); 154 bpf_probe_read_kernel(&ipv6_key.daddr, sizeof(ipv6_key.daddr), 155 &sk->__sk_common.skc_v6_daddr.in6_u.u6_addr32); 156 bpf_probe_read_kernel(&ipv6_key.lport, sizeof(ipv6_key.lport), 157 &sk->__sk_common.skc_num); 158 bpf_probe_read_kernel(&dport, sizeof(dport), 159 &sk->__sk_common.skc_dport); 160 ipv6_key.dport = ntohs(dport); 161 ipv6_send_bytes.increment(ipv6_key, size); 162 } 163 sock_store.delete(&tid); 164 // else drop 165 166 return 0; 167} 168 169int tcp_send_ret(struct pt_regs *ctx) 170{ 171 int size = PT_REGS_RC(ctx); 172 if (size > 0) 173 return tcp_sendstat(size); 174 else 175 return 0; 176} 177 178int tcp_send_entry(struct pt_regs *ctx, struct sock *sk) 179{ 180 if (container_should_be_filtered()) { 181 return 0; 182 } 183 u32 pid = bpf_get_current_pid_tgid() >> 32; 184 FILTER_PID 185 u32 tid = bpf_get_current_pid_tgid(); 186 u16 family = sk->__sk_common.skc_family; 187 FILTER_FAMILY 188 sock_store.update(&tid, &sk); 189 return 0; 190} 191 192/* 193 * tcp_recvmsg() would be obvious to trace, but is less suitable because: 194 * - we'd need to trace both entry and return, to have both sock and size 195 * - misses tcp_read_sock() traffic 196 * we'd much prefer tracepoints once they are available. 197 */ 198int kprobe__tcp_cleanup_rbuf(struct pt_regs *ctx, struct sock *sk, int copied) 199{ 200 if (container_should_be_filtered()) { 201 return 0; 202 } 203 204 u32 pid = bpf_get_current_pid_tgid() >> 32; 205 FILTER_PID 206 207 u16 dport = 0, family = sk->__sk_common.skc_family; 208 u64 *val, zero = 0; 209 210 if (copied <= 0) 211 return 0; 212 213 FILTER_FAMILY 214 215 if (family == AF_INET) { 216 struct ipv4_key_t ipv4_key = {.pid = pid}; 217 bpf_get_current_comm(&ipv4_key.name, sizeof(ipv4_key.name)); 218 ipv4_key.saddr = sk->__sk_common.skc_rcv_saddr; 219 ipv4_key.daddr = sk->__sk_common.skc_daddr; 220 ipv4_key.lport = sk->__sk_common.skc_num; 221 dport = sk->__sk_common.skc_dport; 222 ipv4_key.dport = ntohs(dport); 223 ipv4_recv_bytes.increment(ipv4_key, copied); 224 225 } else if (family == AF_INET6) { 226 struct ipv6_key_t ipv6_key = {.pid = pid}; 227 bpf_get_current_comm(&ipv6_key.name, sizeof(ipv6_key.name)); 228 bpf_probe_read_kernel(&ipv6_key.saddr, sizeof(ipv6_key.saddr), 229 &sk->__sk_common.skc_v6_rcv_saddr.in6_u.u6_addr32); 230 bpf_probe_read_kernel(&ipv6_key.daddr, sizeof(ipv6_key.daddr), 231 &sk->__sk_common.skc_v6_daddr.in6_u.u6_addr32); 232 ipv6_key.lport = sk->__sk_common.skc_num; 233 dport = sk->__sk_common.skc_dport; 234 ipv6_key.dport = ntohs(dport); 235 ipv6_recv_bytes.increment(ipv6_key, copied); 236 } 237 // else drop 238 239 return 0; 240} 241""" 242 243# code substitutions 244if args.pid: 245 bpf_text = bpf_text.replace('FILTER_PID', 246 'if (pid != %s) { return 0; }' % args.pid) 247else: 248 bpf_text = bpf_text.replace('FILTER_PID', '') 249if args.ipv4: 250 bpf_text = bpf_text.replace('FILTER_FAMILY', 251 'if (family != AF_INET) { return 0; }') 252elif args.ipv6: 253 bpf_text = bpf_text.replace('FILTER_FAMILY', 254 'if (family != AF_INET6) { return 0; }') 255bpf_text = bpf_text.replace('FILTER_FAMILY', '') 256bpf_text = filter_by_containers(args) + bpf_text 257if debug or args.ebpf: 258 print(bpf_text) 259 if args.ebpf: 260 exit() 261 262TCPSessionKey = namedtuple('TCPSession', ['pid', 'name', 'laddr', 'lport', 'daddr', 'dport']) 263 264def get_ipv4_session_key(k): 265 return TCPSessionKey(pid=k.pid, 266 name=k.name, 267 laddr=inet_ntop(AF_INET, pack("I", k.saddr)), 268 lport=k.lport, 269 daddr=inet_ntop(AF_INET, pack("I", k.daddr)), 270 dport=k.dport) 271 272def get_ipv6_session_key(k): 273 return TCPSessionKey(pid=k.pid, 274 name=k.name, 275 laddr=inet_ntop(AF_INET6, k.saddr), 276 lport=k.lport, 277 daddr=inet_ntop(AF_INET6, k.daddr), 278 dport=k.dport) 279 280# initialize BPF 281b = BPF(text=bpf_text) 282 283b.attach_kprobe(event='tcp_sendmsg', fn_name='tcp_send_entry') 284b.attach_kretprobe(event='tcp_sendmsg', fn_name='tcp_send_ret') 285if BPF.get_kprobe_functions(b'tcp_sendpage'): 286 b.attach_kprobe(event='tcp_sendpage', fn_name='tcp_send_entry') 287 b.attach_kretprobe(event='tcp_sendpage', fn_name='tcp_send_ret') 288 289ipv4_send_bytes = b["ipv4_send_bytes"] 290ipv4_recv_bytes = b["ipv4_recv_bytes"] 291ipv6_send_bytes = b["ipv6_send_bytes"] 292ipv6_recv_bytes = b["ipv6_recv_bytes"] 293 294print('Tracing... Output every %s secs. Hit Ctrl-C to end' % args.interval) 295 296# output 297i = 0 298exiting = False 299while i != args.count and not exiting: 300 try: 301 sleep(args.interval) 302 except KeyboardInterrupt: 303 exiting = True 304 305 # header 306 if args.noclear: 307 print() 308 else: 309 call("clear") 310 if not args.nosummary: 311 with open(loadavg) as stats: 312 print("%-8s loadavg: %s" % (strftime("%H:%M:%S"), stats.read())) 313 314 # IPv4: build dict of all seen keys 315 ipv4_throughput = defaultdict(lambda: [0, 0]) 316 for k, v in ipv4_send_bytes.items(): 317 key = get_ipv4_session_key(k) 318 ipv4_throughput[key][0] = v.value 319 ipv4_send_bytes.clear() 320 321 for k, v in ipv4_recv_bytes.items(): 322 key = get_ipv4_session_key(k) 323 ipv4_throughput[key][1] = v.value 324 ipv4_recv_bytes.clear() 325 326 if ipv4_throughput: 327 print("%-7s %-12s %-21s %-21s %6s %6s" % ("PID", "COMM", 328 "LADDR", "RADDR", "RX_KB", "TX_KB")) 329 330 # output 331 for k, (send_bytes, recv_bytes) in sorted(ipv4_throughput.items(), 332 key=lambda kv: sum(kv[1]), 333 reverse=True): 334 print("%-7d %-12.12s %-21s %-21s %6d %6d" % (k.pid, 335 k.name, 336 k.laddr + ":" + str(k.lport), 337 k.daddr + ":" + str(k.dport), 338 int(recv_bytes / 1024), int(send_bytes / 1024))) 339 340 # IPv6: build dict of all seen keys 341 ipv6_throughput = defaultdict(lambda: [0, 0]) 342 for k, v in ipv6_send_bytes.items(): 343 key = get_ipv6_session_key(k) 344 ipv6_throughput[key][0] = v.value 345 ipv6_send_bytes.clear() 346 347 for k, v in ipv6_recv_bytes.items(): 348 key = get_ipv6_session_key(k) 349 ipv6_throughput[key][1] = v.value 350 ipv6_recv_bytes.clear() 351 352 if ipv6_throughput: 353 # more than 80 chars, sadly. 354 print("\n%-7s %-12s %-32s %-32s %6s %6s" % ("PID", "COMM", 355 "LADDR6", "RADDR6", "RX_KB", "TX_KB")) 356 357 # output 358 for k, (send_bytes, recv_bytes) in sorted(ipv6_throughput.items(), 359 key=lambda kv: sum(kv[1]), 360 reverse=True): 361 print("%-7d %-12.12s %-32s %-32s %6d %6d" % (k.pid, 362 k.name, 363 k.laddr + ":" + str(k.lport), 364 k.daddr + ":" + str(k.dport), 365 int(recv_bytes / 1024), int(send_bytes / 1024))) 366 367 i += 1 368