1# 2# Copyright (c) 2011 Thomas Graf <[email protected]> 3# 4 5"""Module providing access to network addresses 6""" 7 8from __future__ import absolute_import 9 10 11__version__ = "1.0" 12__all__ = ["AddressCache", "Address"] 13 14import datetime 15from .. import core as netlink 16from . import capi as capi 17from . import link as Link 18from .. import util as util 19 20 21class AddressCache(netlink.Cache): 22 """Cache containing network addresses""" 23 24 def __init__(self, cache=None): 25 if not cache: 26 cache = self._alloc_cache_name("route/addr") 27 28 self._protocol = netlink.NETLINK_ROUTE 29 self._nl_cache = cache 30 31 def __getitem__(self, key): 32 # Using ifindex=0 here implies that the local address itself 33 # is unique, otherwise the first occurence is returned. 34 return self.lookup(0, key) 35 36 def lookup(self, ifindex, local): 37 if type(local) is str: 38 local = netlink.AbstractAddress(local) 39 40 addr = capi.rtnl_addr_get(self._nl_cache, ifindex, local._nl_addr) 41 if addr is None: 42 raise KeyError() 43 44 return Address._from_capi(addr) 45 46 @staticmethod 47 def _new_object(obj): 48 return Address(obj) 49 50 @staticmethod 51 def _new_cache(cache): 52 return AddressCache(cache=cache) 53 54 55class Address(netlink.Object): 56 """Network address""" 57 58 def __init__(self, obj=None): 59 netlink.Object.__init__(self, "route/addr", "address", obj) 60 self._rtnl_addr = self._obj2type(self._nl_object) 61 62 @classmethod 63 def _from_capi(cls, obj): 64 return cls(capi.addr2obj(obj)) 65 66 @staticmethod 67 def _obj2type(obj): 68 return capi.obj2addr(obj) 69 70 def __cmp__(self, other): 71 # sort by: 72 # 1. network link 73 # 2. address family 74 # 3. local address (including prefixlen) 75 diff = self.ifindex - other.ifindex 76 77 if diff == 0: 78 diff = self.family - other.family 79 if diff == 0: 80 diff = capi.nl_addr_cmp(self.local, other.local) 81 82 return diff 83 84 @staticmethod 85 def _new_instance(obj): 86 return Address(obj) 87 88 @property 89 @netlink.nlattr(type=int, immutable=True, fmt=util.num) 90 def ifindex(self): 91 """interface index""" 92 return capi.rtnl_addr_get_ifindex(self._rtnl_addr) 93 94 @ifindex.setter 95 def ifindex(self, value): 96 link = Link.resolve(value) 97 if not link: 98 raise ValueError() 99 100 self.link = link 101 102 @property 103 @netlink.nlattr(type=str, fmt=util.string) 104 def link(self): 105 link = capi.rtnl_addr_get_link(self._rtnl_addr) 106 if not link: 107 return None 108 109 return Link.Link.from_capi(link) 110 111 @link.setter 112 def link(self, value): 113 if type(value) is str: 114 try: 115 value = Link.resolve(value) 116 except KeyError: 117 raise ValueError() 118 119 capi.rtnl_addr_set_link(self._rtnl_addr, value._rtnl_link) 120 121 # ifindex is immutable but we assume that if _orig does not 122 # have an ifindex specified, it was meant to be given here 123 if capi.rtnl_addr_get_ifindex(self._orig) == 0: 124 capi.rtnl_addr_set_ifindex(self._orig, value.ifindex) 125 126 @property 127 @netlink.nlattr(type=str, fmt=util.string) 128 def label(self): 129 """address label""" 130 return capi.rtnl_addr_get_label(self._rtnl_addr) 131 132 @label.setter 133 def label(self, value): 134 capi.rtnl_addr_set_label(self._rtnl_addr, value) 135 136 @property 137 @netlink.nlattr(type=str, fmt=util.string) 138 def flags(self): 139 """Flags 140 141 Setting this property will *Not* reset flags to value you supply in 142 143 Examples: 144 addr.flags = '+xxx' # add xxx flag 145 addr.flags = 'xxx' # exactly the same 146 addr.flags = '-xxx' # remove xxx flag 147 addr.flags = [ '+xxx', '-yyy' ] # list operation 148 """ 149 flags = capi.rtnl_addr_get_flags(self._rtnl_addr) 150 return capi.rtnl_addr_flags2str(flags, 256)[0].split(",") 151 152 def _set_flag(self, flag): 153 if flag.startswith("-"): 154 i = capi.rtnl_addr_str2flags(flag[1:]) 155 capi.rtnl_addr_unset_flags(self._rtnl_addr, i) 156 elif flag.startswith("+"): 157 i = capi.rtnl_addr_str2flags(flag[1:]) 158 capi.rtnl_addr_set_flags(self._rtnl_addr, i) 159 else: 160 i = capi.rtnl_addr_str2flags(flag) 161 capi.rtnl_addr_set_flags(self._rtnl_addr, i) 162 163 @flags.setter 164 def flags(self, value): 165 if type(value) is list: 166 for flag in value: 167 self._set_flag(flag) 168 else: 169 self._set_flag(value) 170 171 @property 172 @netlink.nlattr(type=int, immutable=True, fmt=util.num) 173 def family(self): 174 """Address family""" 175 fam = capi.rtnl_addr_get_family(self._rtnl_addr) 176 return netlink.AddressFamily(fam) 177 178 @family.setter 179 def family(self, value): 180 if not isinstance(value, netlink.AddressFamily): 181 value = netlink.AddressFamily(value) 182 183 capi.rtnl_addr_set_family(self._rtnl_addr, int(value)) 184 185 @property 186 @netlink.nlattr(type=int, fmt=util.num) 187 def scope(self): 188 """Address scope""" 189 scope = capi.rtnl_addr_get_scope(self._rtnl_addr) 190 return capi.rtnl_scope2str(scope, 32)[0] 191 192 @scope.setter 193 def scope(self, value): 194 if type(value) is str: 195 value = capi.rtnl_str2scope(value) 196 capi.rtnl_addr_set_scope(self._rtnl_addr, value) 197 198 @property 199 @netlink.nlattr(type=str, immutable=True, fmt=util.addr) 200 def local(self): 201 """Local address""" 202 a = capi.rtnl_addr_get_local(self._rtnl_addr) 203 return netlink.AbstractAddress(a) 204 205 @local.setter 206 def local(self, value): 207 a = netlink.AbstractAddress(value) 208 capi.rtnl_addr_set_local(self._rtnl_addr, a._nl_addr) 209 210 # local is immutable but we assume that if _orig does not 211 # have a local address specified, it was meant to be given here 212 if capi.rtnl_addr_get_local(self._orig) is None: 213 capi.rtnl_addr_set_local(self._orig, a._nl_addr) 214 215 @property 216 @netlink.nlattr(type=str, fmt=util.addr) 217 def peer(self): 218 """Peer address""" 219 a = capi.rtnl_addr_get_peer(self._rtnl_addr) 220 return netlink.AbstractAddress(a) 221 222 @peer.setter 223 def peer(self, value): 224 a = netlink.AbstractAddress(value) 225 capi.rtnl_addr_set_peer(self._rtnl_addr, a._nl_addr) 226 227 @property 228 @netlink.nlattr(type=str, fmt=util.addr) 229 def broadcast(self): 230 """Broadcast address""" 231 a = capi.rtnl_addr_get_broadcast(self._rtnl_addr) 232 return netlink.AbstractAddress(a) 233 234 @broadcast.setter 235 def broadcast(self, value): 236 a = netlink.AbstractAddress(value) 237 capi.rtnl_addr_set_broadcast(self._rtnl_addr, a._nl_addr) 238 239 @property 240 @netlink.nlattr(type=str, fmt=util.addr) 241 def multicast(self): 242 """multicast address""" 243 a = capi.rtnl_addr_get_multicast(self._rtnl_addr) 244 return netlink.AbstractAddress(a) 245 246 @multicast.setter 247 def multicast(self, value): 248 try: 249 a = netlink.AbstractAddress(value) 250 except ValueError as err: 251 raise AttributeError("multicast", err) 252 253 capi.rtnl_addr_set_multicast(self._rtnl_addr, a._nl_addr) 254 255 @property 256 @netlink.nlattr(type=str, fmt=util.addr) 257 def anycast(self): 258 """anycast address""" 259 a = capi.rtnl_addr_get_anycast(self._rtnl_addr) 260 return netlink.AbstractAddress(a) 261 262 @anycast.setter 263 def anycast(self, value): 264 a = netlink.AbstractAddress(value) 265 capi.rtnl_addr_set_anycast(self._rtnl_addr, a._nl_addr) 266 267 @property 268 @netlink.nlattr(type=int, immutable=True, fmt=util.num) 269 def valid_lifetime(self): 270 """Valid lifetime""" 271 msecs = capi.rtnl_addr_get_valid_lifetime(self._rtnl_addr) 272 if msecs == 0xFFFFFFFF: 273 return None 274 else: 275 return datetime.timedelta(seconds=msecs) 276 277 @valid_lifetime.setter 278 def valid_lifetime(self, value): 279 capi.rtnl_addr_set_valid_lifetime(self._rtnl_addr, int(value)) 280 281 @property 282 @netlink.nlattr(type=int, immutable=True, fmt=util.num) 283 def preferred_lifetime(self): 284 """Preferred lifetime""" 285 msecs = capi.rtnl_addr_get_preferred_lifetime(self._rtnl_addr) 286 if msecs == 0xFFFFFFFF: 287 return None 288 else: 289 return datetime.timedelta(seconds=msecs) 290 291 @preferred_lifetime.setter 292 def preferred_lifetime(self, value): 293 capi.rtnl_addr_set_preferred_lifetime(self._rtnl_addr, int(value)) 294 295 @property 296 @netlink.nlattr(type=int, immutable=True, fmt=util.num) 297 def create_time(self): 298 """Creation time""" 299 hsec = capi.rtnl_addr_get_create_time(self._rtnl_addr) 300 return datetime.timedelta(milliseconds=10 * hsec) 301 302 @property 303 @netlink.nlattr(type=int, immutable=True, fmt=util.num) 304 def last_update(self): 305 """Last update""" 306 hsec = capi.rtnl_addr_get_last_update_time(self._rtnl_addr) 307 return datetime.timedelta(milliseconds=10 * hsec) 308 309 def add(self, socket=None, flags=None): 310 if not socket: 311 socket = netlink.lookup_socket(netlink.NETLINK_ROUTE) 312 313 if not flags: 314 flags = netlink.NLM_F_CREATE 315 316 ret = capi.rtnl_addr_add(socket._sock, self._rtnl_addr, flags) 317 if ret < 0: 318 raise netlink.KernelError(ret) 319 320 def delete(self, socket, flags=0): 321 """Attempt to delete this address in the kernel""" 322 ret = capi.rtnl_addr_delete(socket._sock, self._rtnl_addr, flags) 323 if ret < 0: 324 raise netlink.KernelError(ret) 325 326 ################################################################### 327 # private properties 328 # 329 # Used for formatting output. USE AT OWN RISK 330 @property 331 def _flags(self): 332 return ",".join(self.flags) 333 334 def format(self, details=False, stats=False, nodev=False, indent=""): 335 """Return address as formatted text""" 336 fmt = util.MyFormatter(self, indent) 337 338 buf = fmt.format("{a|local!b}") 339 340 if not nodev: 341 buf += fmt.format(" {a|ifindex}") 342 343 buf += fmt.format(" {a|scope}") 344 345 if self.label: 346 buf += fmt.format(' "{a|label}"') 347 348 buf += fmt.format(" <{a|_flags}>") 349 350 if details: 351 buf += fmt.nl("\t{t|broadcast} {t|multicast}") + fmt.nl( 352 "\t{t|peer} {t|anycast}" 353 ) 354 355 if self.valid_lifetime: 356 buf += fmt.nl("\t{s|valid-lifetime!k} " "{a|valid_lifetime}") 357 358 if self.preferred_lifetime: 359 buf += fmt.nl("\t{s|preferred-lifetime!k} " "{a|preferred_lifetime}") 360 361 if stats and (self.create_time or self.last_update): 362 buf += self.nl( 363 "\t{s|created!k} {a|create_time}" " {s|last-updated!k} {a|last_update}" 364 ) 365 366 return buf 367