xref: /aosp_15_r20/external/libnl/python/netlink/route/address.py (revision 4dc78e53d49367fa8e61b07018507c90983a077d)
1#
2# Copyright (c) 2011 Thomas Graf <[email protected]>
3#
4
5"""Module providing access to network addresses
6"""
7
8from __future__ import absolute_import
9
10
11__version__ = "1.0"
12__all__ = ["AddressCache", "Address"]
13
14import datetime
15from .. import core as netlink
16from . import capi as capi
17from . import link as Link
18from .. import util as util
19
20
21class AddressCache(netlink.Cache):
22    """Cache containing network addresses"""
23
24    def __init__(self, cache=None):
25        if not cache:
26            cache = self._alloc_cache_name("route/addr")
27
28        self._protocol = netlink.NETLINK_ROUTE
29        self._nl_cache = cache
30
31    def __getitem__(self, key):
32        # Using ifindex=0 here implies that the local address itself
33        # is unique, otherwise the first occurence is returned.
34        return self.lookup(0, key)
35
36    def lookup(self, ifindex, local):
37        if type(local) is str:
38            local = netlink.AbstractAddress(local)
39
40        addr = capi.rtnl_addr_get(self._nl_cache, ifindex, local._nl_addr)
41        if addr is None:
42            raise KeyError()
43
44        return Address._from_capi(addr)
45
46    @staticmethod
47    def _new_object(obj):
48        return Address(obj)
49
50    @staticmethod
51    def _new_cache(cache):
52        return AddressCache(cache=cache)
53
54
55class Address(netlink.Object):
56    """Network address"""
57
58    def __init__(self, obj=None):
59        netlink.Object.__init__(self, "route/addr", "address", obj)
60        self._rtnl_addr = self._obj2type(self._nl_object)
61
62    @classmethod
63    def _from_capi(cls, obj):
64        return cls(capi.addr2obj(obj))
65
66    @staticmethod
67    def _obj2type(obj):
68        return capi.obj2addr(obj)
69
70    def __cmp__(self, other):
71        # sort by:
72        #    1. network link
73        #    2. address family
74        #    3. local address (including prefixlen)
75        diff = self.ifindex - other.ifindex
76
77        if diff == 0:
78            diff = self.family - other.family
79            if diff == 0:
80                diff = capi.nl_addr_cmp(self.local, other.local)
81
82        return diff
83
84    @staticmethod
85    def _new_instance(obj):
86        return Address(obj)
87
88    @property
89    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
90    def ifindex(self):
91        """interface index"""
92        return capi.rtnl_addr_get_ifindex(self._rtnl_addr)
93
94    @ifindex.setter
95    def ifindex(self, value):
96        link = Link.resolve(value)
97        if not link:
98            raise ValueError()
99
100        self.link = link
101
102    @property
103    @netlink.nlattr(type=str, fmt=util.string)
104    def link(self):
105        link = capi.rtnl_addr_get_link(self._rtnl_addr)
106        if not link:
107            return None
108
109        return Link.Link.from_capi(link)
110
111    @link.setter
112    def link(self, value):
113        if type(value) is str:
114            try:
115                value = Link.resolve(value)
116            except KeyError:
117                raise ValueError()
118
119        capi.rtnl_addr_set_link(self._rtnl_addr, value._rtnl_link)
120
121        # ifindex is immutable but we assume that if _orig does not
122        # have an ifindex specified, it was meant to be given here
123        if capi.rtnl_addr_get_ifindex(self._orig) == 0:
124            capi.rtnl_addr_set_ifindex(self._orig, value.ifindex)
125
126    @property
127    @netlink.nlattr(type=str, fmt=util.string)
128    def label(self):
129        """address label"""
130        return capi.rtnl_addr_get_label(self._rtnl_addr)
131
132    @label.setter
133    def label(self, value):
134        capi.rtnl_addr_set_label(self._rtnl_addr, value)
135
136    @property
137    @netlink.nlattr(type=str, fmt=util.string)
138    def flags(self):
139        """Flags
140
141        Setting this property will *Not* reset flags to value you supply in
142
143        Examples:
144        addr.flags = '+xxx' # add xxx flag
145        addr.flags = 'xxx'  # exactly the same
146        addr.flags = '-xxx' # remove xxx flag
147        addr.flags = [ '+xxx', '-yyy' ] # list operation
148        """
149        flags = capi.rtnl_addr_get_flags(self._rtnl_addr)
150        return capi.rtnl_addr_flags2str(flags, 256)[0].split(",")
151
152    def _set_flag(self, flag):
153        if flag.startswith("-"):
154            i = capi.rtnl_addr_str2flags(flag[1:])
155            capi.rtnl_addr_unset_flags(self._rtnl_addr, i)
156        elif flag.startswith("+"):
157            i = capi.rtnl_addr_str2flags(flag[1:])
158            capi.rtnl_addr_set_flags(self._rtnl_addr, i)
159        else:
160            i = capi.rtnl_addr_str2flags(flag)
161            capi.rtnl_addr_set_flags(self._rtnl_addr, i)
162
163    @flags.setter
164    def flags(self, value):
165        if type(value) is list:
166            for flag in value:
167                self._set_flag(flag)
168        else:
169            self._set_flag(value)
170
171    @property
172    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
173    def family(self):
174        """Address family"""
175        fam = capi.rtnl_addr_get_family(self._rtnl_addr)
176        return netlink.AddressFamily(fam)
177
178    @family.setter
179    def family(self, value):
180        if not isinstance(value, netlink.AddressFamily):
181            value = netlink.AddressFamily(value)
182
183        capi.rtnl_addr_set_family(self._rtnl_addr, int(value))
184
185    @property
186    @netlink.nlattr(type=int, fmt=util.num)
187    def scope(self):
188        """Address scope"""
189        scope = capi.rtnl_addr_get_scope(self._rtnl_addr)
190        return capi.rtnl_scope2str(scope, 32)[0]
191
192    @scope.setter
193    def scope(self, value):
194        if type(value) is str:
195            value = capi.rtnl_str2scope(value)
196        capi.rtnl_addr_set_scope(self._rtnl_addr, value)
197
198    @property
199    @netlink.nlattr(type=str, immutable=True, fmt=util.addr)
200    def local(self):
201        """Local address"""
202        a = capi.rtnl_addr_get_local(self._rtnl_addr)
203        return netlink.AbstractAddress(a)
204
205    @local.setter
206    def local(self, value):
207        a = netlink.AbstractAddress(value)
208        capi.rtnl_addr_set_local(self._rtnl_addr, a._nl_addr)
209
210        # local is immutable but we assume that if _orig does not
211        # have a local address specified, it was meant to be given here
212        if capi.rtnl_addr_get_local(self._orig) is None:
213            capi.rtnl_addr_set_local(self._orig, a._nl_addr)
214
215    @property
216    @netlink.nlattr(type=str, fmt=util.addr)
217    def peer(self):
218        """Peer address"""
219        a = capi.rtnl_addr_get_peer(self._rtnl_addr)
220        return netlink.AbstractAddress(a)
221
222    @peer.setter
223    def peer(self, value):
224        a = netlink.AbstractAddress(value)
225        capi.rtnl_addr_set_peer(self._rtnl_addr, a._nl_addr)
226
227    @property
228    @netlink.nlattr(type=str, fmt=util.addr)
229    def broadcast(self):
230        """Broadcast address"""
231        a = capi.rtnl_addr_get_broadcast(self._rtnl_addr)
232        return netlink.AbstractAddress(a)
233
234    @broadcast.setter
235    def broadcast(self, value):
236        a = netlink.AbstractAddress(value)
237        capi.rtnl_addr_set_broadcast(self._rtnl_addr, a._nl_addr)
238
239    @property
240    @netlink.nlattr(type=str, fmt=util.addr)
241    def multicast(self):
242        """multicast address"""
243        a = capi.rtnl_addr_get_multicast(self._rtnl_addr)
244        return netlink.AbstractAddress(a)
245
246    @multicast.setter
247    def multicast(self, value):
248        try:
249            a = netlink.AbstractAddress(value)
250        except ValueError as err:
251            raise AttributeError("multicast", err)
252
253        capi.rtnl_addr_set_multicast(self._rtnl_addr, a._nl_addr)
254
255    @property
256    @netlink.nlattr(type=str, fmt=util.addr)
257    def anycast(self):
258        """anycast address"""
259        a = capi.rtnl_addr_get_anycast(self._rtnl_addr)
260        return netlink.AbstractAddress(a)
261
262    @anycast.setter
263    def anycast(self, value):
264        a = netlink.AbstractAddress(value)
265        capi.rtnl_addr_set_anycast(self._rtnl_addr, a._nl_addr)
266
267    @property
268    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
269    def valid_lifetime(self):
270        """Valid lifetime"""
271        msecs = capi.rtnl_addr_get_valid_lifetime(self._rtnl_addr)
272        if msecs == 0xFFFFFFFF:
273            return None
274        else:
275            return datetime.timedelta(seconds=msecs)
276
277    @valid_lifetime.setter
278    def valid_lifetime(self, value):
279        capi.rtnl_addr_set_valid_lifetime(self._rtnl_addr, int(value))
280
281    @property
282    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
283    def preferred_lifetime(self):
284        """Preferred lifetime"""
285        msecs = capi.rtnl_addr_get_preferred_lifetime(self._rtnl_addr)
286        if msecs == 0xFFFFFFFF:
287            return None
288        else:
289            return datetime.timedelta(seconds=msecs)
290
291    @preferred_lifetime.setter
292    def preferred_lifetime(self, value):
293        capi.rtnl_addr_set_preferred_lifetime(self._rtnl_addr, int(value))
294
295    @property
296    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
297    def create_time(self):
298        """Creation time"""
299        hsec = capi.rtnl_addr_get_create_time(self._rtnl_addr)
300        return datetime.timedelta(milliseconds=10 * hsec)
301
302    @property
303    @netlink.nlattr(type=int, immutable=True, fmt=util.num)
304    def last_update(self):
305        """Last update"""
306        hsec = capi.rtnl_addr_get_last_update_time(self._rtnl_addr)
307        return datetime.timedelta(milliseconds=10 * hsec)
308
309    def add(self, socket=None, flags=None):
310        if not socket:
311            socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
312
313        if not flags:
314            flags = netlink.NLM_F_CREATE
315
316        ret = capi.rtnl_addr_add(socket._sock, self._rtnl_addr, flags)
317        if ret < 0:
318            raise netlink.KernelError(ret)
319
320    def delete(self, socket, flags=0):
321        """Attempt to delete this address in the kernel"""
322        ret = capi.rtnl_addr_delete(socket._sock, self._rtnl_addr, flags)
323        if ret < 0:
324            raise netlink.KernelError(ret)
325
326    ###################################################################
327    # private properties
328    #
329    # Used for formatting output. USE AT OWN RISK
330    @property
331    def _flags(self):
332        return ",".join(self.flags)
333
334    def format(self, details=False, stats=False, nodev=False, indent=""):
335        """Return address as formatted text"""
336        fmt = util.MyFormatter(self, indent)
337
338        buf = fmt.format("{a|local!b}")
339
340        if not nodev:
341            buf += fmt.format(" {a|ifindex}")
342
343        buf += fmt.format(" {a|scope}")
344
345        if self.label:
346            buf += fmt.format(' "{a|label}"')
347
348        buf += fmt.format(" <{a|_flags}>")
349
350        if details:
351            buf += fmt.nl("\t{t|broadcast} {t|multicast}") + fmt.nl(
352                "\t{t|peer} {t|anycast}"
353            )
354
355            if self.valid_lifetime:
356                buf += fmt.nl("\t{s|valid-lifetime!k} " "{a|valid_lifetime}")
357
358            if self.preferred_lifetime:
359                buf += fmt.nl("\t{s|preferred-lifetime!k} " "{a|preferred_lifetime}")
360
361        if stats and (self.create_time or self.last_update):
362            buf += self.nl(
363                "\t{s|created!k} {a|create_time}" " {s|last-updated!k} {a|last_update}"
364            )
365
366        return buf
367