xref: /aosp_15_r20/external/libnl/python/netlink/route/link.py (revision 4dc78e53d49367fa8e61b07018507c90983a077d)
1#
2# Copyright (c) 2011 Thomas Graf <[email protected]>
3#
4
5"""Module providing access to network links
6
7This module provides an interface to view configured network links,
8modify them and to add and delete virtual network links.
9
10The following is a basic example:
11    import netlink.core as netlink
12    import netlink.route.link as link
13
14    sock = netlink.Socket()
15    sock.connect(netlink.NETLINK_ROUTE)
16
17    cache = link.LinkCache()	# create new empty link cache
18    cache.refill(sock)		# fill cache with all configured links
19    eth0 = cache['eth0']		# lookup link "eth0"
20    print eth0			# print basic configuration
21
22The module contains the following public classes:
23
24  - Link -- Represents a network link. Instances can be created directly
25        via the constructor (empty link objects) or via the refill()
26        method of a LinkCache.
27  - LinkCache -- Derived from netlink.Cache, holds any number of
28         network links (Link instances). Main purpose is to keep
29         a local list of all network links configured in the
30         kernel.
31
32The following public functions exist:
33  - get_from_kernel(socket, name)
34
35"""
36
37from __future__ import absolute_import
38
39__version__ = "0.1"
40__all__ = [
41    "LinkCache",
42    "Link",
43]
44
45import socket
46from .. import core as netlink
47from .. import capi as core_capi
48from . import capi as capi
49from .links import inet as inet
50from .. import util as util
51
52# Link statistics definitions
53RX_PACKETS = 0
54TX_PACKETS = 1
55RX_BYTES = 2
56TX_BYTES = 3
57RX_ERRORS = 4
58TX_ERRORS = 5
59RX_DROPPED = 6
60TX_DROPPED = 7
61RX_COMPRESSED = 8
62TX_COMPRESSED = 9
63RX_FIFO_ERR = 10
64TX_FIFO_ERR = 11
65RX_LEN_ERR = 12
66RX_OVER_ERR = 13
67RX_CRC_ERR = 14
68RX_FRAME_ERR = 15
69RX_MISSED_ERR = 16
70TX_ABORT_ERR = 17
71TX_CARRIER_ERR = 18
72TX_HBEAT_ERR = 19
73TX_WIN_ERR = 20
74COLLISIONS = 21
75MULTICAST = 22
76IP6_INPKTS = 23
77IP6_INHDRERRORS = 24
78IP6_INTOOBIGERRORS = 25
79IP6_INNOROUTES = 26
80IP6_INADDRERRORS = 27
81IP6_INUNKNOWNPROTOS = 28
82IP6_INTRUNCATEDPKTS = 29
83IP6_INDISCARDS = 30
84IP6_INDELIVERS = 31
85IP6_OUTFORWDATAGRAMS = 32
86IP6_OUTPKTS = 33
87IP6_OUTDISCARDS = 34
88IP6_OUTNOROUTES = 35
89IP6_REASMTIMEOUT = 36
90IP6_REASMREQDS = 37
91IP6_REASMOKS = 38
92IP6_REASMFAILS = 39
93IP6_FRAGOKS = 40
94IP6_FRAGFAILS = 41
95IP6_FRAGCREATES = 42
96IP6_INMCASTPKTS = 43
97IP6_OUTMCASTPKTS = 44
98IP6_INBCASTPKTS = 45
99IP6_OUTBCASTPKTS = 46
100IP6_INOCTETS = 47
101IP6_OUTOCTETS = 48
102IP6_INMCASTOCTETS = 49
103IP6_OUTMCASTOCTETS = 50
104IP6_INBCASTOCTETS = 51
105IP6_OUTBCASTOCTETS = 52
106ICMP6_INMSGS = 53
107ICMP6_INERRORS = 54
108ICMP6_OUTMSGS = 55
109ICMP6_OUTERRORS = 56
110
111
112class LinkCache(netlink.Cache):
113    """Cache of network links"""
114
115    def __init__(self, family=socket.AF_UNSPEC, cache=None):
116        if not cache:
117            cache = self._alloc_cache_name("route/link")
118
119        self._info_module = None
120        self._protocol = netlink.NETLINK_ROUTE
121        self._nl_cache = cache
122        self._set_arg1(family)
123
124    def __getitem__(self, key):
125        if type(key) is int:
126            link = capi.rtnl_link_get(self._nl_cache, key)
127        else:
128            link = capi.rtnl_link_get_by_name(self._nl_cache, key)
129
130        if link is None:
131            raise KeyError()
132        else:
133            return Link.from_capi(link)
134
135    @staticmethod
136    def _new_object(obj):
137        return Link(obj)
138
139    def _new_cache(self, cache):
140        return LinkCache(family=self.arg1, cache=cache)
141
142
143class Link(netlink.Object):
144    """Network link"""
145
146    def __init__(self, obj=None):
147        netlink.Object.__init__(self, "route/link", "link", obj)
148        self._rtnl_link = self._obj2type(self._nl_object)
149
150        if self.type:
151            self._module_lookup("netlink.route.links." + self.type)
152
153        self.inet = inet.InetLink(self)
154        self.af = {"inet": self.inet}
155
156    def __enter__(self):
157        return self
158
159    def __exit__(self, exc_type, exc_value, tb):
160        if exc_type is None:
161            self.change()
162        else:
163            return False
164
165    @classmethod
166    def from_capi(cls, obj):
167        return cls(capi.link2obj(obj))
168
169    @staticmethod
170    def _obj2type(obj):
171        return capi.obj2link(obj)
172
173    def __cmp__(self, other):
174        return self.ifindex - other.ifindex
175
176    @staticmethod
177    def _new_instance(obj):
178        if not obj:
179            raise ValueError()
180
181        return Link(obj)
182
183    @property
184    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
185    def ifindex(self):
186        """interface index"""
187        return capi.rtnl_link_get_ifindex(self._rtnl_link)
188
189    @ifindex.setter
190    def ifindex(self, value):
191        capi.rtnl_link_set_ifindex(self._rtnl_link, int(value))
192
193        # ifindex is immutable but we assume that if _orig does not
194        # have an ifindex specified, it was meant to be given here
195        if capi.rtnl_link_get_ifindex(self._orig) == 0:
196            capi.rtnl_link_set_ifindex(self._orig, int(value))
197
198    @property
199    @netlink.nlattr(type=str, fmt=util.bold)
200    def name(self):
201        """Name of link"""
202        return capi.rtnl_link_get_name(self._rtnl_link)
203
204    @name.setter
205    def name(self, value):
206        capi.rtnl_link_set_name(self._rtnl_link, value)
207
208        # name is the secondary identifier, if _orig does not have
209        # the name specified yet, assume it was meant to be specified
210        # here. ifindex will always take priority, therefore if ifindex
211        # is specified as well, this will be ignored automatically.
212        if capi.rtnl_link_get_name(self._orig) is None:
213            capi.rtnl_link_set_name(self._orig, value)
214
215    @property
216    @netlink.nlattr(type=str, fmt=util.string)
217    def flags(self):
218        """Flags
219        Setting this property will *Not* reset flags to value you supply in
220        Examples:
221        link.flags = '+xxx' # add xxx flag
222        link.flags = 'xxx'  # exactly the same
223        link.flags = '-xxx' # remove xxx flag
224        link.flags = [ '+xxx', '-yyy' ] # list operation
225        """
226        flags = capi.rtnl_link_get_flags(self._rtnl_link)
227        return capi.rtnl_link_flags2str(flags, 256)[0].split(",")
228
229    def _set_flag(self, flag):
230        if flag.startswith("-"):
231            i = capi.rtnl_link_str2flags(flag[1:])
232            capi.rtnl_link_unset_flags(self._rtnl_link, i)
233        elif flag.startswith("+"):
234            i = capi.rtnl_link_str2flags(flag[1:])
235            capi.rtnl_link_set_flags(self._rtnl_link, i)
236        else:
237            i = capi.rtnl_link_str2flags(flag)
238            capi.rtnl_link_set_flags(self._rtnl_link, i)
239
240    @flags.setter
241    def flags(self, value):
242        if not (type(value) is str):
243            for flag in value:
244                self._set_flag(flag)
245        else:
246            self._set_flag(value)
247
248    @property
249    @netlink.nlattr(type=int, fmt=util.num)
250    def mtu(self):
251        """Maximum Transmission Unit"""
252        return capi.rtnl_link_get_mtu(self._rtnl_link)
253
254    @mtu.setter
255    def mtu(self, value):
256        capi.rtnl_link_set_mtu(self._rtnl_link, int(value))
257
258    @property
259    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
260    def family(self):
261        """Address family"""
262        return capi.rtnl_link_get_family(self._rtnl_link)
263
264    @family.setter
265    def family(self, value):
266        capi.rtnl_link_set_family(self._rtnl_link, value)
267
268    @property
269    @netlink.nlattr(type=str, fmt=util.addr)
270    def address(self):
271        """Hardware address (MAC address)"""
272        a = capi.rtnl_link_get_addr(self._rtnl_link)
273        return netlink.AbstractAddress(a)
274
275    @address.setter
276    def address(self, value):
277        capi.rtnl_link_set_addr(self._rtnl_link, value._addr)
278
279    @property
280    @netlink.nlattr(type=str, fmt=util.addr)
281    def broadcast(self):
282        """Hardware broadcast address"""
283        a = capi.rtnl_link_get_broadcast(self._rtnl_link)
284        return netlink.AbstractAddress(a)
285
286    @broadcast.setter
287    def broadcast(self, value):
288        capi.rtnl_link_set_broadcast(self._rtnl_link, value._addr)
289
290    @property
291    @netlink.nlattr(type=str, immutable=True, fmt=util.string)
292    def qdisc(self):
293        """Name of qdisc (cannot be changed)"""
294        return capi.rtnl_link_get_qdisc(self._rtnl_link)
295
296    @qdisc.setter
297    def qdisc(self, value):
298        capi.rtnl_link_set_qdisc(self._rtnl_link, value)
299
300    @property
301    @netlink.nlattr(type=int, fmt=util.num)
302    def txqlen(self):
303        """Length of transmit queue"""
304        return capi.rtnl_link_get_txqlen(self._rtnl_link)
305
306    @txqlen.setter
307    def txqlen(self, value):
308        capi.rtnl_link_set_txqlen(self._rtnl_link, int(value))
309
310    @property
311    @netlink.nlattr(type=str, immutable=True, fmt=util.string)
312    def arptype(self):
313        """Type of link (cannot be changed)"""
314        type_ = capi.rtnl_link_get_arptype(self._rtnl_link)
315        return core_capi.nl_llproto2str(type_, 64)[0]
316
317    @arptype.setter
318    def arptype(self, value):
319        i = core_capi.nl_str2llproto(value)
320        capi.rtnl_link_set_arptype(self._rtnl_link, i)
321
322    @property
323    @netlink.nlattr(type=str, immutable=True, fmt=util.string, title="state")
324    def operstate(self):
325        """Operational status"""
326        operstate = capi.rtnl_link_get_operstate(self._rtnl_link)
327        return capi.rtnl_link_operstate2str(operstate, 32)[0]
328
329    @operstate.setter
330    def operstate(self, value):
331        i = capi.rtnl_link_str2operstate(value)
332        capi.rtnl_link_set_operstate(self._rtnl_link, i)
333
334    @property
335    @netlink.nlattr(type=str, immutable=True, fmt=util.string)
336    def mode(self):
337        """Link mode"""
338        mode = capi.rtnl_link_get_linkmode(self._rtnl_link)
339        return capi.rtnl_link_mode2str(mode, 32)[0]
340
341    @mode.setter
342    def mode(self, value):
343        i = capi.rtnl_link_str2mode(value)
344        capi.rtnl_link_set_linkmode(self._rtnl_link, i)
345
346    @property
347    @netlink.nlattr(type=str, fmt=util.string)
348    def alias(self):
349        """Interface alias (SNMP)"""
350        return capi.rtnl_link_get_ifalias(self._rtnl_link)
351
352    @alias.setter
353    def alias(self, value):
354        capi.rtnl_link_set_ifalias(self._rtnl_link, value)
355
356    @property
357    @netlink.nlattr(type=str, fmt=util.string)
358    def type(self):
359        """Link type"""
360        return capi.rtnl_link_get_type(self._rtnl_link)
361
362    @type.setter
363    def type(self, value):
364        if capi.rtnl_link_set_type(self._rtnl_link, value) < 0:
365            raise NameError("unknown info type")
366
367        self._module_lookup("netlink.route.links." + value)
368
369    def get_stat(self, stat):
370        """Retrieve statistical information"""
371        if type(stat) is str:
372            stat = capi.rtnl_link_str2stat(stat)
373            if stat < 0:
374                raise NameError("unknown name of statistic")
375
376        return capi.rtnl_link_get_stat(self._rtnl_link, stat)
377
378    def enslave(self, slave, sock=None):
379        if not sock:
380            sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
381
382        return capi.rtnl_link_enslave(sock._sock, self._rtnl_link, slave._rtnl_link)
383
384    def release(self, slave, sock=None):
385        if not sock:
386            sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
387
388        return capi.rtnl_link_release(sock._sock, self._rtnl_link, slave._rtnl_link)
389
390    def add(self, sock=None, flags=None):
391        if not sock:
392            sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
393
394        if not flags:
395            flags = netlink.NLM_F_CREATE
396
397        ret = capi.rtnl_link_add(sock._sock, self._rtnl_link, flags)
398        if ret < 0:
399            raise netlink.KernelError(ret)
400
401    def change(self, sock=None, flags=0):
402        """Commit changes made to the link object"""
403        if sock is None:
404            sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
405
406        if not self._orig:
407            raise netlink.NetlinkError("Original link not available")
408        ret = capi.rtnl_link_change(sock._sock, self._orig, self._rtnl_link, flags)
409        if ret < 0:
410            raise netlink.KernelError(ret)
411
412    def delete(self, sock=None):
413        """Attempt to delete this link in the kernel"""
414        if sock is None:
415            sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
416
417        ret = capi.rtnl_link_delete(sock._sock, self._rtnl_link)
418        if ret < 0:
419            raise netlink.KernelError(ret)
420
421    ###################################################################
422    # private properties
423    #
424    # Used for formatting output. USE AT OWN RISK
425    @property
426    def _state(self):
427        if "up" in self.flags:
428            buf = util.good("up")
429            if "lowerup" not in self.flags:
430                buf += " " + util.bad("no-carrier")
431        else:
432            buf = util.bad("down")
433        return buf
434
435    @property
436    def _brief(self):
437        return self._module_brief() + self._foreach_af("brief")
438
439    @property
440    def _flags(self):
441        ignore = [
442            "up",
443            "running",
444            "lowerup",
445        ]
446        return ",".join([flag for flag in self.flags if flag not in ignore])
447
448    def _foreach_af(self, name, args=None):
449        buf = ""
450        for af in self.af:
451            try:
452                func = getattr(self.af[af], name)
453                s = str(func(args))
454                if len(s) > 0:
455                    buf += " " + s
456            except AttributeError:
457                pass
458        return buf
459
460    def format(self, details=False, stats=False, indent=""):
461        """Return link as formatted text"""
462        fmt = util.MyFormatter(self, indent)
463
464        buf = fmt.format(
465            "{a|ifindex} {a|name} {a|arptype} {a|address} "
466            "{a|_state} <{a|_flags}> {a|_brief}"
467        )
468
469        if details:
470            buf += fmt.nl("\t{t|mtu} {t|txqlen} {t|weight} " "{t|qdisc} {t|operstate}")
471            buf += fmt.nl("\t{t|broadcast} {t|alias}")
472
473            buf += self._foreach_af("details", fmt)
474
475        if stats:
476            lst = [
477                ["Packets", RX_PACKETS, TX_PACKETS],
478                ["Bytes", RX_BYTES, TX_BYTES],
479                ["Errors", RX_ERRORS, TX_ERRORS],
480                ["Dropped", RX_DROPPED, TX_DROPPED],
481                ["Compressed", RX_COMPRESSED, TX_COMPRESSED],
482                ["FIFO Errors", RX_FIFO_ERR, TX_FIFO_ERR],
483                ["Length Errors", RX_LEN_ERR, None],
484                ["Over Errors", RX_OVER_ERR, None],
485                ["CRC Errors", RX_CRC_ERR, None],
486                ["Frame Errors", RX_FRAME_ERR, None],
487                ["Missed Errors", RX_MISSED_ERR, None],
488                ["Abort Errors", None, TX_ABORT_ERR],
489                ["Carrier Errors", None, TX_CARRIER_ERR],
490                ["Heartbeat Errors", None, TX_HBEAT_ERR],
491                ["Window Errors", None, TX_WIN_ERR],
492                ["Collisions", None, COLLISIONS],
493                ["Multicast", None, MULTICAST],
494                ["", None, None],
495                ["Ipv6:", None, None],
496                ["Packets", IP6_INPKTS, IP6_OUTPKTS],
497                ["Bytes", IP6_INOCTETS, IP6_OUTOCTETS],
498                ["Discards", IP6_INDISCARDS, IP6_OUTDISCARDS],
499                ["Multicast Packets", IP6_INMCASTPKTS, IP6_OUTMCASTPKTS],
500                ["Multicast Bytes", IP6_INMCASTOCTETS, IP6_OUTMCASTOCTETS],
501                ["Broadcast Packets", IP6_INBCASTPKTS, IP6_OUTBCASTPKTS],
502                ["Broadcast Bytes", IP6_INBCASTOCTETS, IP6_OUTBCASTOCTETS],
503                ["Delivers", IP6_INDELIVERS, None],
504                ["Forwarded", None, IP6_OUTFORWDATAGRAMS],
505                ["No Routes", IP6_INNOROUTES, IP6_OUTNOROUTES],
506                ["Header Errors", IP6_INHDRERRORS, None],
507                ["Too Big Errors", IP6_INTOOBIGERRORS, None],
508                ["Address Errors", IP6_INADDRERRORS, None],
509                ["Unknown Protocol", IP6_INUNKNOWNPROTOS, None],
510                ["Truncated Packets", IP6_INTRUNCATEDPKTS, None],
511                ["Reasm Timeouts", IP6_REASMTIMEOUT, None],
512                ["Reasm Requests", IP6_REASMREQDS, None],
513                ["Reasm Failures", IP6_REASMFAILS, None],
514                ["Reasm OK", IP6_REASMOKS, None],
515                ["Frag Created", None, IP6_FRAGCREATES],
516                ["Frag Failures", None, IP6_FRAGFAILS],
517                ["Frag OK", None, IP6_FRAGOKS],
518                ["", None, None],
519                ["ICMPv6:", None, None],
520                ["Messages", ICMP6_INMSGS, ICMP6_OUTMSGS],
521                ["Errors", ICMP6_INERRORS, ICMP6_OUTERRORS],
522            ]
523
524            buf += "\n\t%s%s%s%s\n" % (
525                33 * " ",
526                util.title("RX"),
527                15 * " ",
528                util.title("TX"),
529            )
530
531            for row in lst:
532                row[0] = util.kw(row[0])
533                row[1] = self.get_stat(row[1]) if row[1] else ""
534                row[2] = self.get_stat(row[2]) if row[2] else ""
535                buf += "\t{0[0]:27} {0[1]:>16} {0[2]:>16}\n".format(row)
536
537            buf += self._foreach_af("stats")
538
539        return buf
540
541
542def get(name, sock=None):
543    """Lookup Link object directly from kernel"""
544    if not name:
545        raise ValueError()
546
547    if not sock:
548        sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
549
550    link = capi.get_from_kernel(sock._sock, 0, name)
551    if not link:
552        return None
553
554    return Link.from_capi(link)
555
556
557_link_cache = LinkCache()
558
559
560def resolve(name):
561    _link_cache.refill()
562    return _link_cache[name]
563