1# Lint as: python2, python3 2# Copyright 2015 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from __future__ import absolute_import 7from __future__ import division 8from __future__ import print_function 9 10import collections 11import dbus 12import logging 13import pipes 14import re 15import shlex 16import six 17 18from autotest_lib.client.bin import utils 19from autotest_lib.client.common_lib import error 20 21 22# Represents the result of a dbus-send call. |sender| refers to the temporary 23# bus name of dbus-send, |responder| to the remote process, and |response| 24# contains the parsed response. 25DBusSendResult = collections.namedtuple('DBusSendResult', ['sender', 26 'responder', 27 'response']) 28# Used internally. 29DictEntry = collections.namedtuple('DictEntry', ['key', 'value']) 30 31 32def _build_token_stream(headerless_dbus_send_output): 33 """A tokenizer for dbus-send output. 34 35 The output is basically just like splitting on whitespace, except that 36 strings are kept together by " characters. 37 38 @param headerless_dbus_send_output: list of lines of dbus-send output 39 without the meta-information prefix. 40 @return list of tokens in dbus-send output. 41 """ 42 return shlex.split(' '.join(headerless_dbus_send_output)) 43 44 45def _parse_value(token_stream): 46 """Turn a stream of tokens from dbus-send output into builtin python types. 47 48 @param token_stream: output from _build_token_stream() above. 49 50 """ 51 if len(token_stream) == 0: 52 # Return None for dbus-send output with no return values. 53 return None 54 # Assumes properly tokenized output (strings with spaces handled). 55 # Assumes tokens are pre-stripped 56 token_type = token_stream.pop(0) 57 if token_type == 'variant': 58 token_type = token_stream.pop(0) 59 if token_type == 'object': 60 token_type = token_stream.pop(0) # Should be 'path' 61 token_value = token_stream.pop(0) 62 INT_TYPES = ('int16', 'uint16', 'int32', 'uint32', 63 'int64', 'uint64', 'byte') 64 if token_type in INT_TYPES: 65 return int(token_value) 66 if token_type == 'string' or token_type == 'path': 67 return token_value # shlex removed surrounding " chars. 68 if token_type == 'boolean': 69 return token_value == 'true' 70 if token_type == 'double': 71 return float(token_value) 72 if token_type == 'array': 73 values = [] 74 while token_stream[0] != ']': 75 values.append(_parse_value(token_stream)) 76 token_stream.pop(0) 77 if values and all([isinstance(x, DictEntry) for x in values]): 78 values = dict(values) 79 return values 80 if token_type == 'dict': 81 assert token_value == 'entry(' 82 key = _parse_value(token_stream) 83 value = _parse_value(token_stream) 84 assert token_stream.pop(0) == ')' 85 return DictEntry(key=key, value=value) 86 raise error.TestError('Unhandled DBus type found: %s' % token_type) 87 88 89def _parse_dbus_send_output(dbus_send_stdout): 90 """Turn dbus-send output into usable Python types. 91 92 This looks like: 93 94 localhost ~ # dbus-send --system --dest=org.chromium.flimflam \ 95 --print-reply --reply-timeout=2000 / \ 96 org.chromium.flimflam.Manager.GetProperties 97 method return time=1490931987.170070 sender=org.chromium.flimflam -> \ 98 destination=:1.37 serial=6 reply_serial=2 99 array [ 100 dict entry( 101 string "ActiveProfile" 102 variant string "/profile/default" 103 ) 104 dict entry( 105 string "ArpGateway" 106 variant boolean true 107 ) 108 ... 109 ] 110 111 @param dbus_send_output: string stdout from dbus-send 112 @return a DBusSendResult. 113 114 """ 115 lines = dbus_send_stdout.strip().splitlines() 116 # The first line contains meta-information about the response 117 header = lines[0] 118 lines = lines[1:] 119 dbus_address_pattern = r'[:\d\\.]+|[a-zA-Z.]+' 120 # The header may or may not have a time= field. 121 match = re.match(r'method return (time=[\d\\.]+ )?sender=(%s) -> ' 122 r'destination=(%s) serial=\d+ reply_serial=\d+' % 123 (dbus_address_pattern, dbus_address_pattern), header) 124 125 if match is None: 126 raise error.TestError('Could not parse dbus-send header: %s' % header) 127 128 sender = match.group(2) 129 responder = match.group(3) 130 token_stream = _build_token_stream(lines) 131 ret_val = _parse_value(token_stream) 132 # Note that DBus permits multiple response values, and this is not handled. 133 logging.debug('Got DBus response: %r', ret_val) 134 return DBusSendResult(sender=sender, responder=responder, response=ret_val) 135 136 137def _dbus2string(raw_arg): 138 """Turn a dbus.* type object into a string that dbus-send expects. 139 140 @param raw_dbus dbus.* type object to stringify. 141 @return string suitable for dbus-send. 142 143 """ 144 int_map = { 145 dbus.Int16: 'int16:', 146 dbus.Int32: 'int32:', 147 dbus.Int64: 'int64:', 148 dbus.UInt16: 'uint16:', 149 dbus.UInt32: 'uint32:', 150 dbus.UInt64: 'uint64:', 151 dbus.Double: 'double:', 152 dbus.Byte: 'byte:', 153 } 154 155 if isinstance(raw_arg, dbus.String): 156 return pipes.quote('string:%s' % raw_arg.replace('"', r'\"')) 157 158 if isinstance(raw_arg, dbus.Boolean): 159 if raw_arg: 160 return 'boolean:true' 161 else: 162 return 'boolean:false' 163 164 for prim_type, prefix in six.iteritems(int_map): 165 if isinstance(raw_arg, prim_type): 166 return prefix + str(raw_arg) 167 168 raise error.TestError('No support for serializing %r' % raw_arg) 169 170 171def _build_arg_string(raw_args): 172 """Construct a string of arguments to a DBus method as dbus-send expects. 173 174 @param raw_args list of dbus.* type objects to seriallize. 175 @return string suitable for dbus-send. 176 177 """ 178 return ' '.join([_dbus2string(arg) for arg in raw_args]) 179 180 181def dbus_send(bus_name, interface, object_path, method_name, args=None, 182 host=None, timeout_seconds=2, tolerate_failures=False, user=None): 183 """Call dbus-send without arguments. 184 185 @param bus_name: string identifier of DBus connection to send a message to. 186 @param interface: string DBus interface of object to call method on. 187 @param object_path: string DBus path of remote object to call method on. 188 @param method_name: string name of method to call. 189 @param args: optional list of arguments. Arguments must be of types 190 from the python dbus module. 191 @param host: An optional host object if running against a remote host. 192 @param timeout_seconds: number of seconds to wait for a response. 193 @param tolerate_failures: boolean True to ignore problems receiving a 194 response. 195 @param user: An option argument to run dbus-send as a given user. 196 197 """ 198 run = utils.run if host is None else host.run 199 cmd = ('dbus-send --system --print-reply --reply-timeout=%d --dest=%s ' 200 '%s %s.%s' % (int(timeout_seconds * 1000), bus_name, 201 object_path, interface, method_name)) 202 203 if user is not None: 204 cmd = ('sudo -u %s %s' % (user, cmd)) 205 if args is not None: 206 cmd = cmd + ' ' + _build_arg_string(args) 207 result = run(cmd, ignore_status=tolerate_failures) 208 if result.exit_status != 0: 209 logging.debug('%r', result.stdout) 210 return None 211 return _parse_dbus_send_output(result.stdout) 212 213 214def get_property(bus_name, interface, object_path, property_name, host=None): 215 """A helpful wrapper that extracts the value of a DBus property. 216 217 @param bus_name: string identifier of DBus connection to send a message to. 218 @param interface: string DBus interface exposing the property. 219 @param object_path: string DBus path of remote object to call method on. 220 @param property_name: string name of property to get. 221 @param host: An optional host object if running against a remote host. 222 223 """ 224 return dbus_send(bus_name, dbus.PROPERTIES_IFACE, object_path, 'Get', 225 args=[dbus.String(interface), dbus.String(property_name)], 226 host=host) 227