1# Lint as python2, python3
2# Copyright (c) 2016 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import fcntl
7import logging
8import os
9import pyudev
10import random
11import re
12import socket
13import struct
14import subprocess
15import sys
16import time
17
18from autotest_lib.client.bin import test, utils
19from autotest_lib.client.common_lib import error
20
21
22class EthernetDongle(object):
23    """ Used for definining the desired module expect states. """
24
25    def __init__(self, expect_speed='100', expect_duplex='full'):
26        # Expected values for parameters.
27        self.expected_parameters = {
28                'ifconfig_status': 0,
29                'duplex': expect_duplex,
30                'speed': expect_speed,
31                'mac_address': None,
32                'ipaddress': None,
33        }
34
35    def GetParam(self, parameter):
36        """ pylint wants a docstring. """
37        return self.expected_parameters[parameter]
38
39
40class network_EthernetStressPlug(test.test):
41    """ base class for test """
42    version = 1
43
44    def initialize(self, interface=None):
45        """ Determines and defines the bus information and interface info. """
46
47        self.link_speed_failures = 0
48        sysnet = os.path.join('/', 'sys', 'class', 'net')
49
50        def get_ethernet_interface(interface):
51            """ Valid interface requires link and duplex status."""
52            avail_eth_interfaces = []
53            if interface is None:
54                # This is not the (bridged) eth dev we are looking for.
55                for x in os.listdir(sysnet):
56                    sysdev = os.path.join(sysnet, x, 'device')
57                    syswireless = os.path.join(sysnet, x, 'wireless')
58                    if os.path.exists(
59                            sysdev) and not os.path.exists(syswireless):
60                        avail_eth_interfaces.append(x)
61            else:
62                sysdev = os.path.join(sysnet, interface, 'device')
63                if os.path.exists(sysdev):
64                    avail_eth_interfaces.append(interface)
65                else:
66                    raise error.TestError(
67                            'Network Interface %s is not a device ' % iface)
68
69            link_status = 'unknown'
70            duplex_status = 'unknown'
71            iface = 'unknown'
72
73            for iface in avail_eth_interfaces:
74                syslink = os.path.join(sysnet, iface, 'operstate')
75                try:
76                    link_file = open(syslink)
77                    link_status = link_file.readline().strip()
78                    link_file.close()
79                except:
80                    pass
81
82                sysduplex = os.path.join(sysnet, iface, 'duplex')
83                try:
84                    duplex_file = open(sysduplex)
85                    duplex_status = duplex_file.readline().strip()
86                    duplex_file.close()
87                except:
88                    pass
89
90                if link_status == 'up':
91                    return iface
92
93            raise error.TestError('Network Interface %s not usable (%s, %s)' %
94                                  (iface, link_status, duplex_status))
95
96        def get_net_device_path(device=''):
97            """ Uses udev to get the path of the desired internet device.
98            Args:
99                device: look for the /sys entry for this ethX device
100            Returns:
101                /sys pathname for the found ethX device or raises an error.
102            """
103            net_list = pyudev.Context().list_devices(subsystem='net')
104            for dev in net_list:
105                if dev.sys_path.endswith('net/%s' % device):
106                    return dev.sys_path
107
108            raise error.TestError('Could not find /sys device path for %s' %
109                                  device)
110
111        self.interface = get_ethernet_interface(interface)
112        self.eth_syspath = get_net_device_path(self.interface)
113        self.eth_flagspath = os.path.join(self.eth_syspath, 'flags')
114
115        # USB Dongles: "authorized" file will disable the USB port and
116        # in some cases powers off the port. In either case, net/eth* goes
117        # away. And thus "../../.." won't be valid to access "authorized".
118        # Build the pathname that goes directly to authpath.
119        auth_path = os.path.join(self.eth_syspath, '../../../authorized')
120        if os.path.exists(auth_path):
121            # now rebuild the path w/o use of '..'
122            auth_path = os.path.split(self.eth_syspath)[0]
123            auth_path = os.path.split(auth_path)[0]
124            auth_path = os.path.split(auth_path)[0]
125
126            self.eth_authpath = os.path.join(auth_path, 'authorized')
127        else:
128            self.eth_authpath = None
129
130        # Stores the status of the most recently run iteration.
131        self.test_status = {
132                'ipaddress': None,
133                'eth_state': None,
134                'reason': None,
135                'last_wait': 0
136        }
137
138        self.secs_before_warning = 10
139
140        # Represents the current number of instances in which ethernet
141        # took longer than dhcp_warning_level to come up.
142        self.warning_count = 0
143
144        # The percentage of test warnings before we fail the test.
145        self.warning_threshold = .25
146
147    def GetIPAddress(self):
148        """ Obtains the ipaddress of the interface. """
149        try:
150            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
151            return socket.inet_ntoa(
152                    fcntl.ioctl(
153                            s.fileno(),
154                            0x8915,  # SIOCGIFADDR
155                            struct.pack('256s', self.interface[:15]))[20:24])
156        except:
157            return None
158
159    def GetEthernetStatus(self):
160        """
161        Updates self.test_status with the status of the ethernet interface.
162
163        Returns:
164            True if the ethernet device is up.  False otherwise.
165        """
166
167        def ReadEthVal(param):
168            """ Reads the network parameters of the interface. """
169            eth_path = os.path.join('/', 'sys', 'class', 'net', self.interface,
170                                    param)
171            val = None
172            try:
173                fp = open(eth_path)
174                val = fp.readline().strip()
175                fp.close()
176            except:
177                pass
178            return val
179
180        eth_out = self.ParseEthTool()
181        ethernet_status = {
182                'ifconfig_status':
183                utils.system('ifconfig %s' % self.interface,
184                             ignore_status=True),
185                'duplex':
186                eth_out.get('Duplex'),
187                'speed':
188                eth_out.get('Speed'),
189                'mac_address':
190                ReadEthVal('address'),
191                'ipaddress':
192                self.GetIPAddress()
193        }
194
195        self.test_status['ipaddress'] = ethernet_status['ipaddress']
196
197        for param, val in list(ethernet_status.items()):
198            if self.dongle.GetParam(param) is None:
199                # For parameters with expected values none, we check the
200                # existence of a value.
201                if not bool(val):
202                    self.test_status['eth_state'] = False
203                    self.test_status['reason'] = '%s is not ready: %s == %s' \
204                                                 % (self.interface, param, val)
205                    return False
206            else:
207                if val != self.dongle.GetParam(param):
208                    self.test_status['eth_state'] = False
209                    self.test_status['reason'] = '%s is not ready. (%s)\n' \
210                                                 "  Expected: '%s'\n" \
211                                                 "  Received: '%s'" \
212                                                 % (self.interface, param,
213                                                 self.dongle.GetParam(param),
214                                                 val)
215                    return False
216
217        self.test_status['eth_state'] = True
218        self.test_status['reason'] = None
219        return True
220
221    def _PowerEthernet(self, power=1):
222        """ Sends command to change the power state of ethernet.
223        Args:
224          power: 0 to unplug, 1 to plug.
225        """
226
227        if self.eth_authpath:
228            try:
229                fp = open(self.eth_authpath, 'w')
230                fp.write('%d' % power)
231                fp.close()
232            except:
233                raise error.TestError('Could not write %d to %s' %
234                                      (power, self.eth_authpath))
235
236        # Linux can set network link state by frobbing "flags" bitfields.
237        # Bit fields are documented in include/uapi/linux/if.h.
238        # Bit 0 is IFF_UP (link up=1 or down=0).
239        elif os.path.exists(self.eth_flagspath):
240            try:
241                fp = open(self.eth_flagspath, mode='r')
242                val = int(fp.readline().strip(), 16)
243                fp.close()
244            except:
245                raise error.TestError('Could not read %s' % self.eth_flagspath)
246
247            if power:
248                newval = val | 1
249            else:
250                newval = val & ~1
251
252            if val != newval:
253                try:
254                    fp = open(self.eth_flagspath, mode='w')
255                    fp.write('0x%x' % newval)
256                    fp.close()
257                except:
258                    raise error.TestError('Could not write 0x%x to %s' %
259                                          (newval, self.eth_flagspath))
260                logging.debug("eth flags: 0x%x to 0x%x", val, newval)
261
262        # else use ifconfig eth0 up/down to switch
263        else:
264            logging.warning(
265                    'plug/unplug event control not found. '
266                    'Use ifconfig %s %s instead', self.interface,
267                    'up' if power else 'down')
268            result = subprocess.check_call(
269                    ['ifconfig', self.interface, 'up' if power else 'down'])
270            if result:
271                raise error.TestError('Fail to change the power state of %s' %
272                                      self.interface)
273
274    def TestPowerEthernet(self, power=1, timeout=45):
275        """ Tests enabling or disabling the ethernet.
276        Args:
277            power: 0 to unplug, 1 to plug.
278            timeout: Indicates approximately the number of seconds to timeout
279                     how long we should check for the success of the ethernet
280                     state change.
281
282        Returns:
283            The time in seconds required for device to transfer to the desired
284            state.
285
286        Raises:
287            error.TestFail if the ethernet status is not in the desired state.
288        """
289
290        start_time = time.time()
291        end_time = start_time + timeout
292
293        power_str = ['off', 'on']
294        self._PowerEthernet(power)
295
296        while time.time() < end_time:
297            status = self.GetEthernetStatus()
298
299            # If GetEthernetStatus() detects the wrong link rate, "bouncing"
300            # the link _should_ recover. Keep count of how many times this
301            # happens. Test should fail if happens "frequently".
302            if power and not status and 'speed' in self.test_status['reason']:
303                self._PowerEthernet(0)
304                time.sleep(1)
305                self._PowerEthernet(power)
306                self.link_speed_failures += 1
307                logging.warning('Link Renegotiated %s',
308                                self.test_status['reason'])
309
310            # If ethernet is enabled  and has an IP, OR
311            # if ethernet is disabled and does not have an IP,
312            # then we are in the desired state.
313            # Return the number of "seconds" for this to happen.
314            # (translated to an approximation of the number of seconds)
315            if (power and status and \
316                self.test_status['ipaddress'] is not None) \
317                or \
318                (not power and not status and \
319                self.test_status['ipaddress'] is None):
320                return time.time() - start_time
321
322            time.sleep(1)
323
324        logging.debug(self.test_status['reason'])
325        raise error.TestFail(
326                'ERROR: TIMEOUT : %s IP is %s after setting '
327                'power %s (last_wait = %.2f seconds)', self.interface,
328                self.test_status['ipaddress'], power_str[power],
329                self.test_status['last_wait'])
330
331    def RandSleep(self, min_sleep, max_sleep):
332        """ Sleeps for a random duration.
333
334        Args:
335            min_sleep: Minimum sleep parameter in miliseconds.
336            max_sleep: Maximum sleep parameter in miliseconds.
337        """
338        duration = random.randint(min_sleep, max_sleep) / 1000.0
339        self.test_status['last_wait'] = duration
340        time.sleep(duration)
341
342    def _ParseEthTool_LinkModes(self, line):
343        """ Parses Ethtool Link Mode Entries.
344        Inputs:
345            line: Space separated string of link modes that have the format
346                  (\d+)baseT/(Half|Full) (eg. 100baseT/Full).
347
348        Outputs:
349            List of dictionaries where each dictionary has the format
350            { 'Speed': '<speed>', 'Duplex': '<duplex>' }
351        """
352        parameters = []
353
354        # QCA ESS EDMA driver doesn't report "Supported link modes:"
355        if 'Not reported' in line:
356            return parameters
357
358        for speed_to_parse in line.split():
359            speed_duplex = speed_to_parse.split('/')
360            parameters.append({
361                    'Speed':
362                    re.search('(\d*)', speed_duplex[0]).groups()[0],
363                    'Duplex':
364                    speed_duplex[1],
365            })
366        return parameters
367
368    def ParseEthTool(self):
369        """
370        Parses the output of Ethtools into a dictionary and returns
371        the dictionary with some cleanup in the below areas:
372            Speed: Remove the unit of speed.
373            Supported link modes: Construct a list of dictionaries.
374                                  The list is ordered (relying on ethtool)
375                                  and each of the dictionaries contains a Speed
376                                  kvp and a Duplex kvp.
377            Advertised link modes: Same as 'Supported link modes'.
378
379        Sample Ethtool Output:
380            Supported ports: [ TP MII ]
381            Supported link modes:   10baseT/Half 10baseT/Full
382                                    100baseT/Half 100baseT/Full
383                                    1000baseT/Half 1000baseT/Full
384            Supports auto-negotiation: Yes
385            Advertised link modes:  10baseT/Half 10baseT/Full
386                                    100baseT/Half 100baseT/Full
387                                    1000baseT/Full
388            Advertised auto-negotiation: Yes
389            Speed: 1000Mb/s
390            Duplex: Full
391            Port: MII
392            PHYAD: 2
393            Transceiver: internal
394            Auto-negotiation: on
395            Supports Wake-on: pg
396            Wake-on: d
397            Current message level: 0x00000007 (7)
398            Link detected: yes
399
400        Returns:
401          A dictionary representation of the above ethtool output, or an empty
402          dictionary if no ethernet dongle is present.
403          Eg.
404            {
405              'Supported ports': '[ TP MII ]',
406              'Supported link modes': [{'Speed': '10', 'Duplex': 'Half'},
407                                       {...},
408                                       {'Speed': '1000', 'Duplex': 'Full'}],
409              'Supports auto-negotiation: 'Yes',
410              'Advertised link modes': [{'Speed': '10', 'Duplex': 'Half'},
411                                        {...},
412                                        {'Speed': '1000', 'Duplex': 'Full'}],
413              'Advertised auto-negotiation': 'Yes'
414              'Speed': '1000',
415              'Duplex': 'Full',
416              'Port': 'MII',
417              'PHYAD': '2',
418              'Transceiver': 'internal',
419              'Auto-negotiation': 'on',
420              'Supports Wake-on': 'pg',
421              'Wake-on': 'd',
422              'Current message level': '0x00000007 (7)',
423              'Link detected': 'yes',
424            }
425        """
426        parameters = {}
427        ethtool_out = os.popen('ethtool %s' %
428                               self.interface).read().split('\n')
429        if 'No data available' in ethtool_out:
430            return parameters
431
432        # bridged interfaces only have two lines of ethtool output.
433        if len(ethtool_out) < 3:
434            return parameters
435
436        # For multiline entries, keep track of the key they belong to.
437        current_key = ''
438        for line in ethtool_out:
439            current_line = line.strip().partition(':')
440            if current_line[1] == ':':
441                current_key = current_line[0]
442
443                # Assumes speed does not span more than one line.
444                # Also assigns empty string if speed field
445                # is not available.
446                if current_key == 'Speed':
447                    speed = re.search('^\s*(\d*)', current_line[2])
448                    parameters[current_key] = ''
449                    if speed:
450                        parameters[current_key] = speed.groups()[0]
451                elif (current_key == 'Supported link modes'
452                      or current_key == 'Advertised link modes'):
453                    parameters[current_key] = []
454                    parameters[current_key] += \
455                        self._ParseEthTool_LinkModes(current_line[2])
456                else:
457                    parameters[current_key] = current_line[2].strip()
458            else:
459                if (current_key == 'Supported link modes'
460                            or current_key == 'Advertised link modes'):
461                    parameters[current_key] += \
462                        self._ParseEthTool_LinkModes(current_line[0])
463                else:
464                    parameters[current_key] += current_line[0].strip()
465
466        return parameters
467
468    def GetDongle(self):
469        """ Returns the ethernet dongle object associated with what's connected.
470
471        Dongle uniqueness is retrieved from the 'product' file that is
472        associated with each usb dongle in
473        /sys/devices/pci.*/0000.*/usb.*/.*-.*/product.  The correct
474        dongle object is determined and returned.
475
476        Returns:
477          Object of type EthernetDongle.
478
479        Raises:
480          error.TestFail if ethernet dongle is not found.
481        """
482        ethtool_dict = self.ParseEthTool()
483
484        if not ethtool_dict:
485            raise error.TestFail('Unable to parse ethtool output for %s.',
486                                 self.interface)
487
488        # Ethtool output is ordered in terms of speed so this obtains the
489        # fastest speed supported by dongle.
490        # QCA ESS EDMA driver doesn't report "Supported link modes".
491        max_link = ethtool_dict['Advertised link modes'][-1]
492
493        return EthernetDongle(expect_speed=max_link['Speed'],
494                              expect_duplex=max_link['Duplex'])
495
496    def run_once(self, num_iterations=1):
497        try:
498            self.dongle = self.GetDongle()
499
500            #Sleep for a random duration between .5 and 2 seconds
501            #for unplug and plug scenarios.
502            for i in range(num_iterations):
503                logging.debug('Iteration: %d start', i)
504                linkdown_time = self.TestPowerEthernet(power=0)
505                linkdown_wait = self.test_status['last_wait']
506                if linkdown_time > self.secs_before_warning:
507                    self.warning_count += 1
508
509                self.RandSleep(500, 2000)
510
511                linkup_time = self.TestPowerEthernet(power=1)
512                linkup_wait = self.test_status['last_wait']
513
514                if linkup_time > self.secs_before_warning:
515                    self.warning_count += 1
516
517                self.RandSleep(500, 2000)
518                logging.debug('Iteration: %d end (down:%f/%d up:%f/%d)', i,
519                              linkdown_wait, linkdown_time, linkup_wait,
520                              linkup_time)
521
522                if self.warning_count > num_iterations * self.warning_threshold:
523                    raise error.TestFail(
524                            'ERROR: %.2f%% of total runs (%d) '
525                            'took longer than %d seconds for '
526                            'ethernet to come up.',
527                            self.warning_threshold * 100, num_iterations,
528                            self.secs_before_warning)
529
530            # Link speed failures are secondary.
531            # Report after all iterations complete.
532            if self.link_speed_failures > 1:
533                raise error.TestFail('ERROR: %s : Link Renegotiated %d times',
534                                     self.interface, self.link_speed_failures)
535
536        except Exception as e:
537            exc_info = sys.exc_info()
538            self._PowerEthernet(1)
539            raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
540