# Copyright 2021 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import common import numpy as np import time from autotest_lib.server.cros.cellular import cellular_simulator from enum import Enum class BaseSimulation(object): """ Base class for cellular connectivity simulations. Classes that inherit from this base class implement different simulation setups. The base class contains methods that are common to all simulation configurations. """ NUM_UL_CAL_READS = 3 NUM_DL_CAL_READS = 5 MAX_BTS_INPUT_POWER = 30 MAX_PHONE_OUTPUT_POWER = 23 UL_MIN_POWER = -60.0 # Keys to obtain settings from the test_config dictionary. KEY_CALIBRATION = "calibration" KEY_ATTACH_RETRIES = "attach_retries" KEY_ATTACH_TIMEOUT = "attach_timeout" # Filepath to the config files stored in the Anritsu callbox. Needs to be # formatted to replace {} with either A or B depending on the model. CALLBOX_PATH_FORMAT_STR = 'C:\\Users\\MD8475{}\\Documents\\DAN_configs\\' # Time in seconds to wait for the phone to settle # after attaching to the base station. SETTLING_TIME = 10 # Default time in seconds to wait for the phone to attach to the basestation # after toggling airplane mode. This setting can be changed with the # KEY_ATTACH_TIMEOUT keyword in the test configuration file. DEFAULT_ATTACH_TIMEOUT = 120 # The default number of attach retries. This setting can be changed with # the KEY_ATTACH_RETRIES keyword in the test configuration file. DEFAULT_ATTACH_RETRIES = 3 # These two dictionaries allow to map from a string to a signal level and # have to be overridden by the simulations inheriting from this class. UPLINK_SIGNAL_LEVEL_DICTIONARY = {} DOWNLINK_SIGNAL_LEVEL_DICTIONARY = {} # Units for downlink signal level. This variable has to be overridden by # the simulations inheriting from this class. DOWNLINK_SIGNAL_LEVEL_UNITS = None class BtsConfig(object): """ Base station configuration class. This class is only a container for base station parameters and should not interact with the instrument controller. Attributes: output_power: a float indicating the required signal level at the instrument's output. input_level: a float indicating the required signal level at the instrument's input. """ def __init__(self): """ Initialize the base station config by setting all its parameters to None. """ self.output_power = None self.input_power = None self.band = None def incorporate(self, new_config): """ Incorporates a different configuration by replacing the current values with the new ones for all the parameters different to None. """ for attr, value in vars(new_config).items(): if value: setattr(self, attr, value) def __init__(self, simulator, log, dut, test_config, calibration_table): """ Initializes the Simulation object. Keeps a reference to the callbox, log and dut handlers and initializes the class attributes. Args: simulator: a cellular simulator controller log: a logger handle dut: a device handler implementing BaseCellularDut test_config: test configuration obtained from the config file calibration_table: a dictionary containing path losses for different bands. """ self.simulator = simulator self.log = log self.dut = dut self.calibration_table = calibration_table # Turn calibration on or off depending on the test config value. If the # key is not present, set to False by default if self.KEY_CALIBRATION not in test_config: self.log.warning('The {} key is not set in the testbed ' 'parameters. Setting to off by default. To ' 'turn calibration on, include the key with ' 'a true/false value.'.format( self.KEY_CALIBRATION)) self.calibration_required = test_config.get(self.KEY_CALIBRATION, False) # Obtain the allowed number of retries from the test configs if self.KEY_ATTACH_RETRIES not in test_config: self.log.warning('The {} key is not set in the testbed ' 'parameters. Setting to {} by default.'.format( self.KEY_ATTACH_RETRIES, self.DEFAULT_ATTACH_RETRIES)) self.attach_retries = test_config.get(self.KEY_ATTACH_RETRIES, self.DEFAULT_ATTACH_RETRIES) # Obtain the attach timeout from the test configs if self.KEY_ATTACH_TIMEOUT not in test_config: self.log.warning('The {} key is not set in the testbed ' 'parameters. Setting to {} by default.'.format( self.KEY_ATTACH_TIMEOUT, self.DEFAULT_ATTACH_TIMEOUT)) self.attach_timeout = test_config.get(self.KEY_ATTACH_TIMEOUT, self.DEFAULT_ATTACH_TIMEOUT) # Configuration object for the primary base station self.primary_config = self.BtsConfig() # Store the current calibrated band self.current_calibrated_band = None # Path loss measured during calibration self.dl_path_loss = None self.ul_path_loss = None # Target signal levels obtained during configuration self.sim_dl_power = None self.sim_ul_power = None # Stores RRC status change timer self.rrc_sc_timer = None # Set to default APN log.info("Configuring APN.") self.dut.set_apn('test', 'test') # Enable roaming on the phone self.dut.toggle_data_roaming(True) # Make sure airplane mode is on so the phone won't attach right away self.dut.toggle_airplane_mode(True) # Wait for airplane mode setting to propagate # TODO @latware b/186880504 change this to a poll_for_condition (Q3 21) time.sleep(2) # Prepare the simulator for this simulation setup self.setup_simulator() def setup_simulator(self): """ Do initial configuration in the simulator. """ raise NotImplementedError() def attach(self): """ Attach the phone to the basestation. Sets a good signal level, toggles airplane mode and waits for the phone to attach. Returns: True if the phone was able to attach, False if not. """ # Turn on airplane mode self.dut.toggle_airplane_mode(True) # Wait for airplane mode setting to propagate # TODO @latware b/186880504 change this to a poll_for_condition (Q3 21) time.sleep(2) # Provide a good signal power for the phone to attach easily new_config = self.BtsConfig() new_config.input_power = -10 new_config.output_power = -30 self.simulator.configure_bts(new_config) self.primary_config.incorporate(new_config) # Try to attach the phone. for i in range(self.attach_retries): try: # Turn off airplane mode self.dut.toggle_airplane_mode(False) # Wait for the phone to attach. self.simulator.wait_until_attached(timeout=self.attach_timeout) except cellular_simulator.CellularSimulatorError: # The phone failed to attach self.log.info( "UE failed to attach on attempt number {}.".format(i + 1)) # Turn airplane mode on to prepare the phone for a retry. self.dut.toggle_airplane_mode(True) # Wait for APM to propagate # TODO @latware b/186880504 change this to a poll_for_condition (Q3 21) time.sleep(3) # Retry if i < self.attach_retries - 1: continue else: return False else: # The phone attached successfully. # TODO @latware b/186880504 change this to a poll_for_condition (Q3 21) time.sleep(self.SETTLING_TIME) self.log.info("UE attached to the callbox.") break return True def detach(self): """ Detach the phone from the basestation. Turns airplane mode and resets basestation. """ # Set the DUT to airplane mode so it doesn't see the # cellular network going off self.dut.toggle_airplane_mode(True) # Wait for APM to propagate # TODO @latware b/186880504 change this to a poll_for_condition (Q3 21) time.sleep(2) # Power off basestation self.simulator.detach() def stop(self): """ Detach phone from the basestation by stopping the simulation. Stop the simulation and turn airplane mode on. """ # Set the DUT to airplane mode so it doesn't see the # cellular network going off self.dut.toggle_airplane_mode(True) # Wait for APM to propagate # TODO @latware b/186880504 change this to a poll_for_condition (Q3 21) time.sleep(2) # Stop the simulation self.simulator.stop() def start(self): """ Start the simulation by attaching the phone and setting the required DL and UL power. Note that this refers to starting the simulated testing environment and not to starting the signaling on the cellular instruments, which might have been done earlier depending on the cellular instrument controller implementation. """ if not self.attach(): raise RuntimeError('Could not attach to base station.') # Starts IP traffic while changing this setting to force the UE to be # in Communication state, as UL power cannot be set in Idle state self.start_traffic_for_calibration() self.simulator.wait_until_communication_state() # Set uplink power to a minimum before going to the actual desired # value. This avoid inconsistencies produced by the hysteresis in the # PA switching points. self.log.info('Setting UL power to -30 dBm before going to the ' 'requested value to avoid incosistencies caused by ' 'hysteresis.') self.set_uplink_tx_power(-30) # Set signal levels obtained from the test parameters self.set_downlink_rx_power(self.sim_dl_power) self.set_uplink_tx_power(self.sim_ul_power) # Verify signal level try: rx_power, tx_power = self.dut.get_rx_tx_power_levels() if not tx_power or not rx_power[0]: raise RuntimeError('The method return invalid Tx/Rx values.') self.log.info('Signal level reported by the DUT in dBm: Tx = {}, ' 'Rx = {}.'.format(tx_power, rx_power)) if abs(self.sim_ul_power - tx_power) > 1: self.log.warning('Tx power at the UE is off by more than 1 dB') except RuntimeError as e: self.log.error('Could not verify Rx / Tx levels: %s.' % e) # Stop IP traffic after setting the UL power level self.stop_traffic_for_calibration() def parse_parameters(self, parameters): """ Configures simulation using a list of parameters. Consumes parameters from a list. Children classes need to call this method first. Args: parameters: list of parameters """ raise NotImplementedError() def consume_parameter(self, parameters, parameter_name, num_values=0): """ Parses a parameter from a list. Allows to parse the parameter list. Will delete parameters from the list after consuming them to ensure that they are not used twice. Args: parameters: list of parameters parameter_name: keyword to look up in the list num_values: number of arguments following the parameter name in the list Returns: A list containing the parameter name and the following num_values arguments """ try: i = parameters.index(parameter_name) except ValueError: # parameter_name is not set return [] return_list = [] try: for j in range(num_values + 1): return_list.append(parameters.pop(i)) except IndexError: raise ValueError( "Parameter {} has to be followed by {} values.".format( parameter_name, num_values)) return return_list def set_uplink_tx_power(self, signal_level): """ Configure the uplink tx power level Args: signal_level: calibrated tx power in dBm """ new_config = self.BtsConfig() new_config.input_power = self.calibrated_uplink_tx_power( self.primary_config, signal_level) self.simulator.configure_bts(new_config) self.primary_config.incorporate(new_config) def set_downlink_rx_power(self, signal_level): """ Configure the downlink rx power level Args: signal_level: calibrated rx power in dBm """ new_config = self.BtsConfig() new_config.output_power = self.calibrated_downlink_rx_power( self.primary_config, signal_level) self.simulator.configure_bts(new_config) self.primary_config.incorporate(new_config) def get_uplink_power_from_parameters(self, parameters): """ Reads uplink power from a list of parameters. """ values = self.consume_parameter(parameters, self.PARAM_UL_PW, 1) if values: if values[1] in self.UPLINK_SIGNAL_LEVEL_DICTIONARY: return self.UPLINK_SIGNAL_LEVEL_DICTIONARY[values[1]] else: try: if values[1][0] == 'n': # Treat the 'n' character as a negative sign return -int(values[1][1:]) else: return int(values[1]) except ValueError: pass # If the method got to this point it is because PARAM_UL_PW was not # included in the test parameters or the provided value was invalid. raise ValueError( "The test name needs to include parameter {} followed by the " "desired uplink power expressed by an integer number in dBm " "or by one the following values: {}. To indicate negative " "values, use the letter n instead of - sign.".format( self.PARAM_UL_PW, list(self.UPLINK_SIGNAL_LEVEL_DICTIONARY.keys()))) def get_downlink_power_from_parameters(self, parameters): """ Reads downlink power from a list of parameters. """ values = self.consume_parameter(parameters, self.PARAM_DL_PW, 1) if values: if values[1] not in self.DOWNLINK_SIGNAL_LEVEL_DICTIONARY: raise ValueError("Invalid signal level value {}.".format( values[1])) else: return self.DOWNLINK_SIGNAL_LEVEL_DICTIONARY[values[1]] else: # Use default value power = self.DOWNLINK_SIGNAL_LEVEL_DICTIONARY['excellent'] self.log.info("No DL signal level value was indicated in the test " "parameters. Using default value of {} {}.".format( power, self.DOWNLINK_SIGNAL_LEVEL_UNITS)) return power def calibrated_downlink_rx_power(self, bts_config, signal_level): """ Calculates the power level at the instrument's output in order to obtain the required rx power level at the DUT's input. If calibration values are not available, returns the uncalibrated signal level. Args: bts_config: the current configuration at the base station. derived classes implementations can use this object to indicate power as spectral power density or in other units. signal_level: desired downlink received power, can be either a key value pair, an int or a float """ # Obtain power value if the provided signal_level is a key value pair if isinstance(signal_level, Enum): power = signal_level.value else: power = signal_level # Try to use measured path loss value. If this was not set, it will # throw an TypeError exception try: calibrated_power = round(power + self.dl_path_loss) if calibrated_power > self.simulator.MAX_DL_POWER: self.log.warning( "Cannot achieve phone DL Rx power of {} dBm. Requested TX " "power of {} dBm exceeds callbox limit!".format( power, calibrated_power)) calibrated_power = self.simulator.MAX_DL_POWER self.log.warning( "Setting callbox Tx power to max possible ({} dBm)". format(calibrated_power)) self.log.info( "Requested phone DL Rx power of {} dBm, setting callbox Tx " "power at {} dBm".format(power, calibrated_power)) # Power has to be a natural number so calibration wont be exact. # Inform the actual received power after rounding. self.log.info( "Phone downlink received power is {0:.2f} dBm".format( calibrated_power - self.dl_path_loss)) return calibrated_power except TypeError: self.log.info("Phone downlink received power set to {} (link is " "uncalibrated).".format(round(power))) return round(power) def calibrated_uplink_tx_power(self, bts_config, signal_level): """ Calculates the power level at the instrument's input in order to obtain the required tx power level at the DUT's output. If calibration values are not available, returns the uncalibrated signal level. Args: bts_config: the current configuration at the base station. derived classes implementations can use this object to indicate power as spectral power density or in other units. signal_level: desired uplink transmitted power, can be either a key value pair, an int or a float """ # Obtain power value if the provided signal_level is a key value pair if isinstance(signal_level, Enum): power = signal_level.value else: power = signal_level # Try to use measured path loss value. If this was not set, it will # throw an TypeError exception try: calibrated_power = round(power - self.ul_path_loss) if calibrated_power < self.UL_MIN_POWER: self.log.warning( "Cannot achieve phone UL Tx power of {} dBm. Requested UL " "power of {} dBm exceeds callbox limit!".format( power, calibrated_power)) calibrated_power = self.UL_MIN_POWER self.log.warning( "Setting UL Tx power to min possible ({} dBm)".format( calibrated_power)) self.log.info( "Requested phone UL Tx power of {} dBm, setting callbox Rx " "power at {} dBm".format(power, calibrated_power)) # Power has to be a natural number so calibration wont be exact. # Inform the actual transmitted power after rounding. self.log.info( "Phone uplink transmitted power is {0:.2f} dBm".format( calibrated_power + self.ul_path_loss)) return calibrated_power except TypeError: self.log.info("Phone uplink transmitted power set to {} (link is " "uncalibrated).".format(round(power))) return round(power) def calibrate(self, band): """ Calculates UL and DL path loss if it wasn't done before. The should be already set to the required band before calling this method. Args: band: the band that is currently being calibrated. """ if self.dl_path_loss and self.ul_path_loss: self.log.info("Measurements are already calibrated.") # Attach the phone to the base station if not self.attach(): self.log.info( "Skipping calibration because the phone failed to attach.") return # If downlink or uplink were not yet calibrated, do it now if not self.dl_path_loss: self.dl_path_loss = self.downlink_calibration() if not self.ul_path_loss: self.ul_path_loss = self.uplink_calibration() # Detach after calibrating self.detach() # TODO @latware b/186880504 change this to a poll_for_condition (Q3 21) time.sleep(2) def start_traffic_for_calibration(self): """ Starts UDP IP traffic before running calibration. Uses APN_1 configured in the phone. """ self.simulator.start_data_traffic() def stop_traffic_for_calibration(self): """ Stops IP traffic after calibration. """ self.simulator.stop_data_traffic() def downlink_calibration(self, rat=None, power_units_conversion_func=None): """ Computes downlink path loss and returns the calibration value The DUT needs to be attached to the base station before calling this method. Args: rat: desired RAT to calibrate (matching the label reported by the phone) power_units_conversion_func: a function to convert the units reported by the phone to dBm. needs to take two arguments: the reported signal level and bts. use None if no conversion is needed. Returns: Downlink calibration value and measured DL power. """ # Check if this parameter was set. Child classes may need to override # this class passing the necessary parameters. if not rat: raise ValueError( "The parameter 'rat' has to indicate the RAT being used as " "reported by the phone.") # Save initial output level to restore it after calibration restoration_config = self.BtsConfig() restoration_config.output_power = self.primary_config.output_power # Set BTS to a good output level to minimize measurement error new_config = self.BtsConfig() new_config.output_power = self.simulator.MAX_DL_POWER - 5 self.simulator.configure_bts(new_config) # Starting IP traffic self.start_traffic_for_calibration() down_power_measured = [] for i in range(0, self.NUM_DL_CAL_READS): # For some reason, the RSRP gets updated on Screen ON event signal_strength = self.dut.get_telephony_signal_strength() down_power_measured.append(signal_strength[rat]) # TODO @latware b/186880504 change this to a poll_for_condition (Q3 21) time.sleep(5) # Stop IP traffic self.stop_traffic_for_calibration() # Reset bts to original settings self.simulator.configure_bts(restoration_config) # TODO @latware b/186880504 change this to a poll_for_condition (Q3 21) time.sleep(2) # Calculate the mean of the measurements reported_asu_power = np.nanmean(down_power_measured) # Convert from RSRP to signal power if power_units_conversion_func: avg_down_power = power_units_conversion_func( reported_asu_power, self.primary_config) else: avg_down_power = reported_asu_power # Calculate Path Loss dl_target_power = self.simulator.MAX_DL_POWER - 5 down_call_path_loss = dl_target_power - avg_down_power # Validate the result if not 0 < down_call_path_loss < 100: raise RuntimeError( "Downlink calibration failed. The calculated path loss value " "was {} dBm.".format(down_call_path_loss)) self.log.info("Measured downlink path loss: {} dB".format( down_call_path_loss)) return down_call_path_loss def uplink_calibration(self): """ Computes uplink path loss and returns the calibration value The DUT needs to be attached to the base station before calling this method. Returns: Uplink calibration value and measured UL power """ # Save initial input level to restore it after calibration restoration_config = self.BtsConfig() restoration_config.input_power = self.primary_config.input_power # Set BTS1 to maximum input allowed in order to perform # uplink calibration target_power = self.MAX_PHONE_OUTPUT_POWER new_config = self.BtsConfig() new_config.input_power = self.MAX_BTS_INPUT_POWER self.simulator.configure_bts(new_config) # Start IP traffic self.start_traffic_for_calibration() up_power_per_chain = [] # Get the number of chains cmd = 'MONITOR? UL_PUSCH' uplink_meas_power = self.anritsu.send_query(cmd) str_power_chain = uplink_meas_power.split(',') num_chains = len(str_power_chain) for ichain in range(0, num_chains): up_power_per_chain.append([]) for i in range(0, self.NUM_UL_CAL_READS): uplink_meas_power = self.anritsu.send_query(cmd) str_power_chain = uplink_meas_power.split(',') for ichain in range(0, num_chains): if (str_power_chain[ichain] == 'DEACTIVE'): up_power_per_chain[ichain].append(float('nan')) else: up_power_per_chain[ichain].append( float(str_power_chain[ichain])) # TODO @latware b/186880504 change this to a poll_for_condition (Q3 21) time.sleep(3) # Stop IP traffic self.stop_traffic_for_calibration() # Reset bts to original settings self.simulator.configure_bts(restoration_config) # TODO @latware b/186880504 change this to a poll_for_condition (Q3 21) time.sleep(2) # Phone only supports 1x1 Uplink so always chain 0 avg_up_power = np.nanmean(up_power_per_chain[0]) if np.isnan(avg_up_power): raise RuntimeError( "Calibration failed because the callbox reported the chain to " "be deactive.") up_call_path_loss = target_power - avg_up_power # Validate the result if not 0 < up_call_path_loss < 100: raise RuntimeError( "Uplink calibration failed. The calculated path loss value " "was {} dBm.".format(up_call_path_loss)) self.log.info( "Measured uplink path loss: {} dB".format(up_call_path_loss)) return up_call_path_loss def load_pathloss_if_required(self): """ If calibration is required, try to obtain the pathloss values from the calibration table and measure them if they are not available. """ # Invalidate the previous values self.dl_path_loss = None self.ul_path_loss = None # Load the new ones if self.calibration_required: band = self.primary_config.band # Try loading the path loss values from the calibration table. If # they are not available, use the automated calibration procedure. try: self.dl_path_loss = self.calibration_table[band]["dl"] self.ul_path_loss = self.calibration_table[band]["ul"] except KeyError: self.calibrate(band) # Complete the calibration table with the new values to be used in # the next tests. if band not in self.calibration_table: self.calibration_table[band] = {} if "dl" not in self.calibration_table[band] and self.dl_path_loss: self.calibration_table[band]["dl"] = self.dl_path_loss if "ul" not in self.calibration_table[band] and self.ul_path_loss: self.calibration_table[band]["ul"] = self.ul_path_loss def maximum_downlink_throughput(self): """ Calculates maximum achievable downlink throughput in the current simulation state. Because thoughput is dependent on the RAT, this method needs to be implemented by children classes. Returns: Maximum throughput in mbps """ raise NotImplementedError() def maximum_uplink_throughput(self): """ Calculates maximum achievable downlink throughput in the current simulation state. Because thoughput is dependent on the RAT, this method needs to be implemented by children classes. Returns: Maximum throughput in mbps """ raise NotImplementedError() def send_sms(self, sms_message): """ Sends the set SMS message. """ raise NotImplementedError()