1#!/usr/bin/env python3.4
2#
3#   Copyright 2021 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import itertools
19
20import pyvisa
21import time
22from acts import logger
23from acts import asserts as acts_asserts
24from acts_contrib.test_utils.cellular.performance import cellular_performance_test_utils as cputils
25
26SHORT_SLEEP = 1
27VERY_SHORT_SLEEP = 0.1
28MEDIUM_SLEEP = 5
29SUBFRAME_DURATION = 0.001
30VISA_QUERY_DELAY = 0.01
31
32
33class Keysight5GTestApp(object):
34    """Controller for the Keysight 5G NR Test Application.
35
36    This controller enables interacting with a Keysight Test Application
37    running on a remote test PC and implements many of the configuration
38    parameters supported in test app.
39    """
40
41    VISA_LOCATION = '/opt/keysight/iolibs/libktvisa32.so'
42
43    def __init__(self, config):
44        self.config = config
45        self.test_app_settings = {
46            'lte_cell_count': 0,
47            'nr_cell_count': 0,
48            'lte_cell_configs': [],
49            'nr_cell_configs': []
50        }
51        self.log = logger.create_tagged_trace_logger("{}{}".format(
52            self.config['brand'], self.config['model']))
53        self.resource_manager = pyvisa.ResourceManager(self.VISA_LOCATION)
54        self.test_app = self.resource_manager.open_resource(
55            'TCPIP0::{}::{}::INSTR'.format(self.config['ip_address'],
56                                           self.config['hislip_interface']))
57        self.test_app.timeout = 200000
58        self.test_app.write_termination = '\n'
59        self.test_app.read_termination = '\n'
60        self.test_app.query_delay = VISA_QUERY_DELAY
61        self.last_loaded_scpi = None
62
63        inst_id = self.send_cmd('*IDN?', 1)
64        if 'Keysight' not in inst_id[0]:
65            self.log.error(
66                'Failed to connect to Keysight Test App: {}'.format(inst_id))
67        else:
68            self.log.info("Test App ID: {}".format(inst_id))
69
70        self.test_app_settings['lte_cell_count'] = self.get_cell_count('LTE')
71        self.test_app_settings['nr_cell_count'] = self.get_cell_count('NR5G')
72
73    def destroy(self):
74        self.test_app.close()
75
76    ### Programming Utilities
77    @staticmethod
78    def _format_cells(cells):
79        "Helper function to format list of cells."
80        if isinstance(cells, int):
81            return 'CELL{}'.format(cells)
82        elif isinstance(cells, str):
83            return cells
84        elif isinstance(cells, list):
85            cell_list = [
86                Keysight5GTestApp._format_cells(cell) for cell in cells
87            ]
88            cell_list = ','.join(cell_list)
89            return cell_list
90
91    @staticmethod
92    def _format_response(response):
93        "Helper function to format test app response."
94
95        def _format_response_entry(entry):
96            try:
97                formatted_entry = float(entry)
98            except:
99                formatted_entry = entry
100            return formatted_entry
101
102        if ',' not in response:
103            return _format_response_entry(response)
104        response = response.split(',')
105        formatted_response = [
106            _format_response_entry(entry) for entry in response
107        ]
108        return formatted_response
109
110    def send_cmd(self, command, read_response=0, check_errors=1):
111        "Helper function to write to or query test app."
112        if read_response:
113            try:
114                response = Keysight5GTestApp._format_response(
115                    self.test_app.query(command))
116                time.sleep(VISA_QUERY_DELAY)
117                if check_errors:
118                    error = self.test_app.query('SYSTem:ERRor?')
119                    time.sleep(VISA_QUERY_DELAY)
120                    if 'No error' not in error:
121                        self.log.warning("Command: {}. Error: {}".format(
122                            command, error))
123                return response
124            except:
125                raise RuntimeError('Lost connection to test app.')
126        else:
127            try:
128                self.test_app.write(command)
129                time.sleep(VISA_QUERY_DELAY)
130                if check_errors:
131                    error = self.test_app.query('SYSTem:ERRor?')
132                    if 'No error' not in error:
133                        self.log.warning("Command: {}. Error: {}".format(
134                            command, error))
135                self.send_cmd('*OPC?', 1, check_errors)
136                time.sleep(VISA_QUERY_DELAY)
137            except:
138                raise RuntimeError('Lost connection to test app.')
139            return None
140
141    def check_error(self):
142        error = self.test_app.query('SYSTem:ERRor?')
143        if 'No error' not in error:
144            self.log.warning("Error: {}".format(error))
145            return True
146        else:
147            return False
148
149    def import_scpi_file(self, file_name, check_last_loaded=0):
150        """Function to import SCPI file specified in file_name.
151
152        Args:
153            file_name: name of SCPI file to run
154            check_last_loaded: flag to check last loaded scpi and
155            only load if different.
156        """
157        if file_name == self.last_loaded_scpi and check_last_loaded:
158            self.log.info('Skipping SCPI import.')
159        self.send_cmd("SYSTem:SCPI:IMPort '{}'".format(file_name))
160        while int(self.send_cmd('SYSTem:SCPI:IMPort:STATus?', 1)):
161            self.send_cmd('*OPC?', 1)
162        self.log.info('Done with SCPI import')
163
164    ### Configure Cells
165    def assert_cell_off_decorator(func):
166        "Decorator function that ensures cells or off when configuring them"
167
168        def inner(self, *args, **kwargs):
169            if "nr" in func.__name__:
170                cell_type = 'NR5G'
171            else:
172                cell_type = kwargs.get('cell_type', args[0])
173            cell = kwargs.get('cell', args[1])
174            cell_state = self.get_cell_state(cell_type, cell)
175            if cell_state:
176                self.log.error('Cell must be off when calling {}'.format(
177                    func.__name__))
178            return (func(self, *args, **kwargs))
179
180        return inner
181
182    ### Configure Cells
183    def skip_config_if_none_decorator(func):
184        "Decorator function that skips the config function if any args are none"
185
186        def inner(self, *args, **kwargs):
187            none_arg = False
188            for arg in args:
189                if arg is None:
190                    none_arg = True
191            for key, value in kwargs.items():
192                if value is None:
193                    none_arg = True
194            if none_arg:
195                self.log.warning(
196                    'Skipping {}. Received incomplete arguments.'.format(
197                        func.__name__))
198                return
199            return (func(self, *args, **kwargs))
200
201        return inner
202
203    def assert_cell_off(self, cell_type, cell):
204        cell_state = self.get_cell_state(cell_type, cell)
205        if cell_state:
206            self.log.error('Cell must be off')
207
208    def select_cell(self, cell_type, cell):
209        """Function to select active cell.
210
211        Args:
212            cell_type: LTE or NR5G cell
213            cell: cell/carrier number
214        """
215        self.send_cmd('BSE:SELected:CELL {},{}'.format(
216            cell_type, Keysight5GTestApp._format_cells(cell)))
217
218    def select_display_tab(self, cell_type, cell, tab, subtab):
219        """Function to select display tab.
220
221        Args:
222            cell_type: LTE or NR5G cell
223            cell: cell/carrier number
224            tab: tab to display for the selected cell
225        """
226        supported_tabs = {
227            'PHY': [
228                'BWP', 'HARQ', 'PDSCH', 'PDCCH', 'PRACH', 'PUSCH', 'PUCCH',
229                'SRSC'
230            ],
231            'BTHR': ['SUMMARY', 'OTAGRAPH', 'ULOTA', 'DLOTA'],
232            'CSI': []
233        }
234        if (tab not in supported_tabs) or (subtab not in supported_tabs[tab]):
235            return
236        self.select_cell(cell_type, cell)
237        self.send_cmd('DISPlay:{} {},{}'.format(cell_type, tab, subtab))
238
239    def get_cell_count(self, cell_type):
240        """Function to get cell count
241
242        Args:
243            cell_type: LTE or NR5G cell
244        Returns:
245            cell_count: number of cells of cell_type supported.
246        """
247        cell_count = int(
248            self.send_cmd('BSE:CONFig:{}:CELL:COUNt?'.format(cell_type), 1))
249        return cell_count
250
251    def get_cell_state(self, cell_type, cell):
252        """Function to get cell on/off state.
253
254        Args:
255            cell_type: LTE or NR5G cell
256            cell: cell/carrier number
257        Returns:
258            cell_state: boolean. True if cell on
259        """
260        cell_state = int(
261            self.send_cmd(
262                'BSE:CONFig:{}:{}:ACTive:STATe?'.format(
263                    cell_type, Keysight5GTestApp._format_cells(cell)), 1))
264        return cell_state
265
266    def wait_for_cell_status(self,
267                             cell_type,
268                             cell,
269                             states,
270                             timeout,
271                             polling_interval=SHORT_SLEEP):
272        """Function to wait for a specific cell status
273
274        Args:
275            cell_type: LTE or NR5G cell
276            cell: cell/carrier number
277            states: list of acceptable states (ON, CONN, AGG, ACT, etc)
278            timeout: amount of time to wait for requested status
279        Returns:
280            True if one of the listed states is achieved
281            False if timed out waiting for acceptable state.
282        """
283        states = [states] if isinstance(states, str) else states
284        for i in range(int(timeout / polling_interval)):
285            current_state = self.send_cmd(
286                'BSE:STATus:{}:{}?'.format(
287                    cell_type, Keysight5GTestApp._format_cells(cell)), 1)
288            if current_state in states:
289                return True
290            time.sleep(polling_interval)
291        self.log.warning('Timeout waiting for {} {} {}'.format(
292            cell_type, Keysight5GTestApp._format_cells(cell), states))
293        return False
294
295    def set_cell_state(self, cell_type, cell, state):
296        """Function to set cell state
297
298        Args:
299            cell_type: LTE or NR5G cell
300            cell: cell/carrier number
301            state: requested state
302        """
303        self.send_cmd('BSE:CONFig:{}:{}:ACTive:STATe {}'.format(
304            cell_type, Keysight5GTestApp._format_cells(cell), state))
305
306    def turn_all_cells_off(self):
307        for cell in range(self.test_app_settings['lte_cell_count']):
308            self.set_cell_state('LTE', cell + 1, 0)
309        for cell in range(self.test_app_settings['nr_cell_count']):
310            self.set_cell_state('NR5G', cell + 1, 0)
311
312    def set_nr_cell_type(self, cell_type, cell, nr_cell_type):
313        """Function to set cell duplex mode
314
315        Args:
316            cell_type: LTE or NR5G cell
317            cell: cell/carrier number
318            nr_cell_type: SA or NSA
319        """
320        self.assert_cell_off(cell_type, cell)
321        self.send_cmd('BSE:CONFig:{}:{}:TYPE {}'.format(
322            cell_type, Keysight5GTestApp._format_cells(cell), nr_cell_type))
323
324    def set_cell_duplex_mode(self, cell_type, cell, duplex_mode):
325        """Function to set cell duplex mode
326
327        Args:
328            cell_type: LTE or NR5G cell
329            cell: cell/carrier number
330            duplex_mode: TDD or FDD
331        """
332        self.assert_cell_off(cell_type, cell)
333        self.send_cmd('BSE:CONFig:{}:{}:DUPLEX:MODe {}'.format(
334            cell_type, Keysight5GTestApp._format_cells(cell), duplex_mode))
335
336    def set_cell_band(self, cell_type, cell, band):
337        """Function to set cell band
338
339        Args:
340            cell_type: LTE or NR5G cell
341            cell: cell/carrier number
342            band: LTE or NR band (e.g. 1,3,N260, N77)
343        """
344        self.assert_cell_off(cell_type, cell)
345        self.send_cmd('BSE:CONFig:{}:{}:BAND {}'.format(
346            cell_type, Keysight5GTestApp._format_cells(cell), band))
347
348    def set_cell_channel(self, cell_type, cell, channel, arfcn=1):
349        """Function to set cell frequency/channel
350
351        Args:
352            cell_type: LTE or NR5G cell
353            cell: cell/carrier number
354            channel: requested channel (ARFCN) or frequency in MHz
355        """
356        self.assert_cell_off(cell_type, cell)
357        if cell_type == 'NR5G' and isinstance(
358                channel, str) and channel.lower() in ['low', 'mid', 'high']:
359            self.send_cmd('BSE:CONFig:{}:{}:TESTChanLoc {}'.format(
360                cell_type, Keysight5GTestApp._format_cells(cell),
361                channel.upper()))
362        elif arfcn == 1:
363            self.send_cmd('BSE:CONFig:{}:{}:DL:CHANnel {}'.format(
364                cell_type, Keysight5GTestApp._format_cells(cell), channel))
365        else:
366            self.send_cmd('BSE:CONFig:{}:{}:DL:FREQuency:MAIN {}'.format(
367                cell_type, Keysight5GTestApp._format_cells(cell),
368                channel * 1e6))
369
370    def toggle_contiguous_nr_channels(self, force_contiguous):
371        self.assert_cell_off('NR5G', 1)
372        self.log.warning(
373            'Forcing contiguous NR channels overrides channel config.')
374        self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 0')
375        if force_contiguous:
376            self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 1')
377
378    def configure_contiguous_nr_channels(self, cell, band, channel):
379        """Function to set cell frequency/channel
380
381        Args:
382            cell: cell/carrier number
383            band: band to set channel in (only required for preset)
384            channel_preset: frequency in MHz or preset in [low, mid, or high]
385        """
386        self.assert_cell_off('NR5G', cell)
387        self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 0')
388        if channel.lower() in ['low', 'mid', 'high']:
389            pcc_arfcn = cputils.PCC_PRESET_MAPPING[band][channel]
390            self.set_cell_channel('NR5G', cell, pcc_arfcn, 1)
391        else:
392            self.set_cell_channel('NR5G', cell, channel, 0)
393        self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 1')
394
395    def configure_noncontiguous_nr_channels(self, cells, band, channels):
396        """Function to set cell frequency/channel
397
398        Args:
399            cell: cell/carrier number
400            band: band number
401            channel: frequency in MHz
402        """
403        for cell in cells:
404            self.assert_cell_off('NR5G', cell)
405        self.send_cmd('BSE:CONFig:NR5G:PHY:OPTimize:CONTiguous:STATe 0')
406        for cell, channel in zip(cells, channels):
407            self.set_cell_channel('NR5G', cell, channel, arfcn=0)
408
409    def set_cell_bandwidth(self, cell_type, cell, bandwidth):
410        """Function to set cell bandwidth
411
412        Args:
413            cell_type: LTE or NR5G cell
414            cell: cell/carrier number
415            bandwidth: requested bandwidth
416        """
417        self.assert_cell_off(cell_type, cell)
418        self.send_cmd('BSE:CONFig:{}:{}:DL:BW {}'.format(
419            cell_type, Keysight5GTestApp._format_cells(cell), bandwidth))
420
421    def set_nr_subcarrier_spacing(self, cell, subcarrier_spacing):
422        """Function to set cell bandwidth
423
424        Args:
425            cell: cell/carrier number
426            subcarrier_spacing: requested SCS
427        """
428        self.assert_cell_off('NR5G', cell)
429        self.send_cmd('BSE:CONFig:NR5G:{}:SUBCarrier:SPACing:COMMon {}'.format(
430            Keysight5GTestApp._format_cells(cell), subcarrier_spacing))
431
432    def set_cell_mimo_config(self, cell_type, cell, link, mimo_config):
433        """Function to set cell mimo config.
434
435        Args:
436            cell_type: LTE or NR5G cell
437            cell: cell/carrier number
438            link: uplink or downlink
439            mimo_config: requested mimo configuration (refer to SCPI
440                         documentation for allowed range of values)
441        """
442        self.assert_cell_off(cell_type, cell)
443        if cell_type == 'NR5G':
444            self.send_cmd('BSE:CONFig:{}:{}:{}:MIMO:CONFig {}'.format(
445                cell_type, Keysight5GTestApp._format_cells(cell), link,
446                mimo_config))
447        else:
448            self.send_cmd('BSE:CONFig:{}:{}:PHY:DL:ANTenna:CONFig {}'.format(
449                cell_type, Keysight5GTestApp._format_cells(cell), mimo_config))
450
451    def set_lte_cell_transmission_mode(self, cell, transmission_mode):
452        """Function to set LTE cell transmission mode.
453
454        Args:
455            cell: cell/carrier number
456            transmission_mode: one of TM1, TM2, TM3, TM4 ...
457        """
458
459        self.assert_cell_off('LTE', cell)
460        self.send_cmd('BSE:CONFig:LTE:{}:RRC:TMODe {}'.format(
461            Keysight5GTestApp._format_cells(cell), transmission_mode))
462
463    @skip_config_if_none_decorator
464    def set_lte_cell_num_layers(self, cell, num_layers):
465        """Function to set LTE cell number of layers."""
466
467        self.assert_cell_off('LTE', cell)
468        self.send_cmd('BSE:CONFig:LTE:{}:PHY:DL:NUMLayers {}'.format(
469            Keysight5GTestApp._format_cells(cell), num_layers))
470
471    @skip_config_if_none_decorator
472    def set_lte_cell_num_codewords(self, cell, num_codewords):
473        """Function to set LTE number of codewords."""
474
475        self.assert_cell_off('LTE', cell)
476        self.send_cmd('BSE:CONFig:LTE:{}:PHY:DL:NUMCodewords {}'.format(
477            Keysight5GTestApp._format_cells(cell), num_codewords))
478
479    @skip_config_if_none_decorator
480    def set_lte_cell_dl_subframe_allocation(self,
481                                            cell,
482                                            dl_subframe_allocation=[1] * 10):
483        """Function to set LTE downlink subrframe allocation.
484
485        Args:
486            cell: cell/carrier number
487            dl_subframe_allocation: string or bool list indicating allocation
488            (1 enabled, 0 disabled)
489        """
490        if isinstance(dl_subframe_allocation, list):
491            dl_subframe_allocation = str(dl_subframe_allocation)[1:-1].replace(
492                '\'', '')
493        self.assert_cell_off('LTE', cell)
494        self.send_cmd(
495            'BSE:CONFig:LTE:{}:PHY:DL:SFRame:ALLocation:ALL {}'.format(
496                Keysight5GTestApp._format_cells(cell), dl_subframe_allocation))
497
498    @skip_config_if_none_decorator
499    def set_lte_cell_tdd_frame_config(self, cell, frame_config=1, ssf_config=1):
500        self.send_cmd(
501            'BSE:CONFig:LTE:{}:PHY:TDD:ULDL:CONFig {}'.format(
502                Keysight5GTestApp._format_cells(cell), frame_config))
503        self.send_cmd(
504            'BSE:CONFig:LTE:{}:PHY:TDD:SSFRame:CONFig {}'.format(
505                Keysight5GTestApp._format_cells(cell), ssf_config))
506
507
508    def set_cell_dl_power(self, cell_type, cell, power, full_bw):
509        """Function to set cell power
510
511        Args:
512            cell_type: LTE or NR5G cell
513            cell: cell/carrier number
514            power: requested power
515            full_bw: boolean controlling if requested power is per channel
516                     or subcarrier
517        """
518        if full_bw:
519            self.send_cmd('BSE:CONFig:{}:{}:DL:POWer:CHANnel {}'.format(
520                cell_type, Keysight5GTestApp._format_cells(cell), power))
521        else:
522            self.send_cmd('BSE:CONFig:{}:{}:DL:POWer:EPRE {}'.format(
523                cell_type, Keysight5GTestApp._format_cells(cell), power))
524        time.sleep(VERY_SHORT_SLEEP)
525        self.send_cmd('BSE:CONFig:{}:APPLY'.format(cell_type))
526
527    def set_cell_ul_power_control(self, cell_type, cell, mode, target_power=0):
528        """Function configure UL power control
529
530        Args:
531            cell_type: LTE or NR5G cell
532            cell: cell/carrier number
533            mode: UL power control mode. One of [TARget | MANual | UP | DOWN | DISabled]
534            target_power: target power for PUSCH
535        """
536        self.send_cmd('BSE:CONFig:{}:{}:UL:PUSCh:CLPControl:MODE {}'.format(
537            cell_type, Keysight5GTestApp._format_cells(cell), mode))
538        if cell_type == 'NR5G' and mode == 'TARget':
539            self.send_cmd(
540                'BSE:CONFig:{}:{}:UL:PUSCh:CLPControl:TARGet:POWer {}'.format(
541                    cell_type, Keysight5GTestApp._format_cells(cell),
542                    target_power))
543        elif cell_type == 'LTE' and mode == 'TARget':
544            self.send_cmd(
545                'BSE:CONFig:{}:{}:UL:CLPControl:TARGet:POWer:PUSCH {}'.format(
546                    cell_type, Keysight5GTestApp._format_cells(cell),
547                    target_power))
548        self.send_cmd('BSE:CONFig:{}:APPLY'.format(cell_type))
549
550    def set_cell_input_power(self, cell_type, cell, power):
551        """Function to set cell input power
552
553        Args:
554            cell_type: LTE or NR5G cell
555            cell: cell/carrier number
556            power: expected input power
557        """
558        if power == "AUTO" and cell_type == "LTE":
559            self.send_cmd('BSE:CONFig:{}:{}:CONTrol:POWer:AUTO ON'.format(
560                cell_type, Keysight5GTestApp._format_cells(cell)))
561        elif cell_type == "LTE":
562            self.send_cmd('BSE:CONFig:{}:{}:CONTrol:POWer:AUTO OFF'.format(
563                cell_type, Keysight5GTestApp._format_cells(cell)))
564            self.send_cmd('BSE:CONFig:{}:{}:MANual:POWer {}'.format(
565                cell_type, Keysight5GTestApp._format_cells(cell), power))
566        if power == "AUTO" and cell_type == "NR5G":
567            self.send_cmd('BSE:CONFig:{}:UL:EIP:AUTO ON'.format(cell_type))
568        elif cell_type == "NR5G":
569            self.send_cmd('BSE:CONFig:{}:UL:EIP:AUTO OFF'.format(cell_type))
570            self.send_cmd('BSE:CONFig:{}:{}:MANual:POWer {}'.format(
571                cell_type, Keysight5GTestApp._format_cells(cell), power))
572        self.send_cmd('BSE:CONFig:{}:APPLY'.format(cell_type))
573
574    def set_cell_duplex_mode(self, cell_type, cell, duplex_mode):
575        """Function to set cell power
576
577        Args:
578            cell_type: LTE or NR5G cell
579            cell: cell/carrier number
580            duplex mode: TDD or FDD
581        """
582        self.assert_cell_off(cell_type, cell)
583        self.send_cmd('BSE:CONFig:{}:{}:DUPLEX:MODe {}'.format(
584            cell_type, Keysight5GTestApp._format_cells(cell), duplex_mode))
585
586    def set_dl_carriers(self, cells):
587        """Function to set aggregated DL NR5G carriers
588
589        Args:
590            cells: list of DL cells/carriers to aggregate with LTE (e.g. [1,2])
591        """
592        self.send_cmd('BSE:CONFig:NR5G:CELL1:CAGGregation:NRCC:DL {}'.format(
593            Keysight5GTestApp._format_cells(cells)))
594
595    def set_ul_carriers(self, cells):
596        """Function to set aggregated UL NR5G carriers
597
598        Args:
599            cells: list of DL cells/carriers to aggregate with LTE (e.g. [1,2])
600        """
601        self.send_cmd('BSE:CONFig:NR5G:CELL1:CAGGregation:NRCC:UL {}'.format(
602            Keysight5GTestApp._format_cells(cells)))
603
604    def set_nr_cell_schedule_scenario(self, cell, scenario):
605        """Function to set NR schedule to one of predefince quick configs.
606
607        Args:
608            cell: cell number to address. schedule will apply to all cells
609            scenario: one of the predefined test app schedlue quick configs
610                      (e.g. FULL_TPUT, BASIC).
611        """
612        self.assert_cell_off('NR5G', cell)
613        self.send_cmd(
614            'BSE:CONFig:NR5G:{}:SCHeduling:QCONFig:SCENario {}'.format(
615                Keysight5GTestApp._format_cells(cell), scenario))
616        self.send_cmd('BSE:CONFig:NR5G:SCHeduling:QCONFig:APPLy:ALL')
617
618    def set_nr_schedule_slot_ratio(self, cell, slot_ratio):
619        """Function to set NR schedule to one of predefince quick configs.
620
621        Args:
622            cell: cell number to address. schedule will apply to all cells
623            slot_ratio: downlink slot ratio
624        """
625        self.assert_cell_off('NR5G', cell)
626        self.send_cmd('BSE:CONFig:NR5G:{}:SCHeduling:QCONFig:RATIo {}'.format(
627            Keysight5GTestApp._format_cells(cell), slot_ratio))
628        self.send_cmd('BSE:CONFig:NR5G:SCHeduling:QCONFig:APPLy:ALL')
629
630    def set_nr_schedule_tdd_pattern(self, cell, tdd_pattern):
631        """Function to set NR schedule to one of predefince quick configs.
632
633        Args:
634            cell: cell number to address. schedule will apply to all cells
635            tdd_pattern: 0 for disabled, 1/enabled, or current
636        """
637        tdd_pattern_mapping = {
638            0: 'DISabled',
639            1: 'ENABled',
640            'current': 'CURRent'
641        }
642        self.assert_cell_off('NR5G', cell)
643        self.send_cmd(
644            'BSE:CONFig:NR5G:{}:SCHeduling:QCONFig:TDD:PATTern {}'.format(
645                Keysight5GTestApp._format_cells(cell),
646                tdd_pattern_mapping[tdd_pattern]))
647        self.send_cmd('BSE:CONFig:NR5G:SCHeduling:QCONFig:APPLy:ALL')
648
649    def set_nr_cell_mcs(self, cell, dl_mcs_table, dl_mcs, ul_mcs_table, ul_mcs):
650        """Function to set NR cell DL & UL MCS
651
652        Args:
653            cell: cell number to address. MCS will apply to all cells
654            dl_mcs: mcs index to use on DL
655            ul_mcs: mcs index to use on UL
656        """
657        self.assert_cell_off('NR5G', cell)
658        frame_config_count = 5
659        slot_config_count = 8
660        self.send_cmd(
661            'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL", "DL:MCS:TABle", "{}"'
662            .format(dl_mcs_table))
663        self.send_cmd(
664            'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL", "UL:MCS:TABle", "{}"'
665            .format(ul_mcs_table))
666        if isinstance(dl_mcs, dict):
667            self.configure_nr_link_adaptation(cell, link_config=dl_mcs)
668        else:
669            for frame, slot in itertools.product(range(frame_config_count),
670                                                 range(slot_config_count)):
671                self.send_cmd(
672                    'BSE:CONFig:NR5G:{}:SCHeduling:BWP0:FC{}:SC{}:DL:RRESource:APOLicy FIXed'
673                    .format(Keysight5GTestApp._format_cells(cell), frame,
674                            slot))
675            self.send_cmd(
676                'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL:FCALL:SCALL", "DL:IMCS", "{}"'
677                .format(dl_mcs))
678        self.send_cmd(
679            'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL:FCALL:SCALL", "UL:IMCS", "{}"'
680            .format(ul_mcs))
681
682    def configure_nr_link_adaptation(self, cell, link_config):
683        frame_config_count = 5
684        slot_config_count = 8
685        for frame, slot in itertools.product(range(frame_config_count),
686                                             range(slot_config_count)):
687            self.send_cmd(
688                'BSE:CONFig:NR5G:{}:SCHeduling:BWP0:FC{}:SC{}:DL:RRESource:APOLicy {}'
689                .format(Keysight5GTestApp._format_cells(cell), frame, slot,
690                        link_config['link_policy']))
691            self.send_cmd(
692                'BSE:CONFig:NR5G:{}:SCHeduling:BWP0:FC{}:SC{}:DL:IMCS {}'.
693                format(Keysight5GTestApp._format_cells(cell), frame, slot,
694                       link_config['initial_mcs']))
695            self.send_cmd(
696                'BSE:CONFig:NR5G:{}:SCHeduling:BWP0:FC{}:SC{}:DL:MAXimum:IMCS {}'
697                .format(Keysight5GTestApp._format_cells(cell), frame, slot,
698                        link_config['maximum_mcs']))
699        self.send_cmd(
700            'BSE:CONFig:NR5G:{}:MAC:LADaptation:NTX:BEValuation {}'.format(
701                Keysight5GTestApp._format_cells(cell),
702                link_config.get('adaptation_interval', 10000)))
703        self.send_cmd(
704            'BSE:CONFig:NR5G:{}:MAC:LADaptation:TARGet:NACK:COUNt {}'.format(
705                Keysight5GTestApp._format_cells(cell),
706                link_config.get('target_nack_count', 1000)))
707        self.send_cmd(
708            'BSE:CONFig:NR5G:{}:MAC:LADaptation:TARGet:NACK:MARGin {}'.format(
709                Keysight5GTestApp._format_cells(cell),
710                link_config.get('target_nack_margin', 100)))
711        self.send_cmd(
712            'BSE:CONFig:NR5G:{}:MAC:DL:LADaptation:MCS:INCRement {}'.format(
713                Keysight5GTestApp._format_cells(cell),
714                link_config.get('mcs_step', 1)))
715
716    def set_lte_cell_mcs(
717        self,
718        cell,
719        dl_mcs_table,
720        dl_mcs,
721        ul_mcs_table,
722        ul_mcs,
723    ):
724        """Function to set NR cell DL & UL MCS
725
726        Args:
727            cell: cell number to address. MCS will apply to all cells
728            dl_mcs: mcs index to use on DL
729            ul_mcs: mcs index to use on UL
730        """
731        if dl_mcs_table == 'QAM256':
732            dl_mcs_table_formatted = 'ASUBframe'
733        elif dl_mcs_table == 'QAM1024':
734            dl_mcs_table_formatted = 'ASUB1024'
735        elif dl_mcs_table == 'QAM64':
736            dl_mcs_table_formatted = 'DISabled'
737        self.assert_cell_off('LTE', cell)
738        self.send_cmd(
739            'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL", "DL:MCS:TABle", "{}"'
740            .format(dl_mcs_table_formatted))
741        self.configure_lte_periodic_csi_reporting(cell, 1)
742
743        if isinstance(dl_mcs, dict) and dl_mcs['link_policy'] == 'WCQI':
744            self.configure_lte_link_adaptation(cell, dl_mcs)
745        elif isinstance(dl_mcs, int):
746            self.send_cmd('BSE:CONFig:LTE:{}:PHY:DL:IMCS:MODE EXPLicit'.format(
747                Keysight5GTestApp._format_cells(cell)))
748            self.send_cmd(
749                'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL:SFALL:CWALL", "DL:IMCS", "{}"'
750                .format(dl_mcs))
751        else:
752            self.log.error('Invalid LTE MCS setting.')
753        self.send_cmd(
754            'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL", "UL:MCS:TABle", "{}"'
755            .format(ul_mcs_table))
756        self.send_cmd(
757            'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL:SFALL", "UL:IMCS", "{}"'
758            .format(ul_mcs))
759
760    def configure_lte_link_adaptation(self, cell, link_config):
761        self.send_cmd('BSE:CONFig:LTE:{}:PHY:DL:IMCS:MODE WCQI'.format(
762            Keysight5GTestApp._format_cells(cell)))
763        self.send_cmd('BSE:CONFig:LTE:{}:SCHeduling:DL:ALGorithm:BLER:STATE {}'.format(
764            Keysight5GTestApp._format_cells(cell), link_config['bler_adaptation']))
765        if int(link_config['bler_adaptation']):
766            full_link_config = {'max_mcs_offset': 20,  'min_mcs_offset': -20, 'bler_high_threshold': 15,
767                                   'bler_low_threshold': 5, 'bler_adaptation_window': 100, 'bler_min_ack_nack': 10,
768                                   'rank_adjustment': 0, 'rank_adjustment_mcs': 5, 'rank_adjustment_mcs_offset': 10,
769                                   'rank_adjustment_min_ack_nack': 50}
770            full_link_config.update(link_config)
771            self.log.info(full_link_config)
772
773            self.send_cmd('BSE:CONFig:LTE:{}:SCHeduling:DL:ALGorithm:BLER:MAX:MCS:OFFSet {}'.format(
774                Keysight5GTestApp._format_cells(cell), full_link_config['max_mcs_offset']))
775            self.send_cmd('BSE:CONFig:LTE:{}:SCHeduling:DL:ALGorithm:BLER:MCS:MIN:OFFSet {}'.format(
776                Keysight5GTestApp._format_cells(cell), full_link_config['min_mcs_offset']))
777            self.send_cmd('BSE:CONFig:LTE:{}:SCHeduling:DL:ALGorithm:BLER:HIGH:THReshold {}'.format(
778                Keysight5GTestApp._format_cells(cell), full_link_config['bler_high_threshold']))
779            self.send_cmd('BSE:CONFig:LTE:{}:SCHeduling:DL:ALGorithm:BLER:LOW:THReshold {}'.format(
780                Keysight5GTestApp._format_cells(cell), full_link_config['bler_low_threshold']))
781            self.send_cmd('BSE:CONFig:LTE:{}:SCHeduling:DL:ALGorithm:BLER:ACK:NACK:WINDow:TTI {}'.format(
782                Keysight5GTestApp._format_cells(cell), full_link_config['bler_adaptation_window']))
783            self.send_cmd('BSE:CONFig:LTE:{}:SCHeduling:DL:ALGorithm:BLER:NUMBer:MINimum:ACK:NACK {}'.format(
784                Keysight5GTestApp._format_cells(cell), full_link_config['bler_min_ack_nack']))
785            self.send_cmd('BSE:CONFig:LTE:{}:SCHeduling:DL:ALGorithm:BLER:RANK:ADJust:STATE {}'.format(
786                Keysight5GTestApp._format_cells(cell), full_link_config['rank_adjustment']))
787            if int(full_link_config['rank_adjustment']):
788                self.send_cmd('BSE:CONFig:LTE:{}:SCHeduling:DL:ALGorithm:BLER:RANK:ADJust:MCS:ADJust {}'.format(
789                    Keysight5GTestApp._format_cells(cell), full_link_config['rank_adjustment_mcs']))
790                self.send_cmd('BSE:CONFig:LTE:{}:SCHeduling:DL:ALGorithm:BLER:RANK:ADJust:MCS:OFFSet {}'.format(
791                    Keysight5GTestApp._format_cells(cell), full_link_config['rank_adjustment_mcs_offset']))
792                self.send_cmd('BSE:CONFig:LTE:{}:SCHeduling:DL:ALGorithm:BLER:RANK:ADJust:MINimum:NUMBer:ACK:NACK {}'.format(
793                    Keysight5GTestApp._format_cells(cell), full_link_config['rank_adjustment_min_ack_nack']))
794
795
796    def configure_lte_periodic_csi_reporting(self, cell, enable):
797        """Function to enable/disable LTE CSI reporting."""
798
799        self.send_cmd('BSE:CONFig:LTE:{}:PHY:CSI:PERiodic:STATe {}'.format(
800            Keysight5GTestApp._format_cells(cell), enable))
801
802    def set_lte_control_region_size(self, cell, num_symbols):
803        self.assert_cell_off('LTE', cell)
804        self.send_cmd('BSE:CONFig:LTE:{}:PHY:PCFich:CFI {}'.format(
805            Keysight5GTestApp._format_cells(cell), num_symbols))
806
807    def set_lte_ul_mac_padding(self, mac_padding):
808        self.assert_cell_off('LTE', 'CELL1')
809        padding_str = 'TRUE' if mac_padding else 'FALSE'
810        self.send_cmd(
811            'BSE:CONFig:LTE:SCHeduling:SETParameter "CELLALL", "UL:MAC:PADDING", "{}"'
812            .format(padding_str))
813
814    def set_nr_ul_dft_precoding(self, cell, precoding):
815        """Function to configure DFT-precoding on uplink.
816
817        Args:
818            cell: cell number to address. MCS will apply to all cells
819            precoding: 0/1 to disable/enable precoding
820        """
821        self.assert_cell_off('NR5G', cell)
822        precoding_str = "ENABled" if precoding else "DISabled"
823        self.send_cmd(
824            'BSE:CONFig:NR5G:{}:SCHeduling:QCONFig:UL:TRANsform:PRECoding {}'.
825            format(Keysight5GTestApp._format_cells(cell), precoding_str))
826        precoding_str = "True" if precoding else "False"
827        self.send_cmd(
828            'BSE:CONFig:NR5G:SCHeduling:SETParameter "CELLALL:BWPALL", "UL:TPEnabled", "{}"'
829            .format(precoding_str))
830
831    def configure_ul_clpc(self, channel, mode, target):
832        """Function to configure UL power control on all cells/carriers
833
834        Args:
835            channel: physical channel must be PUSCh or PUCCh
836            mode: mode supported by test app (all up/down bits, target, etc)
837            target: target power if mode is set to target
838        """
839        self.send_cmd('BSE:CONFig:NR5G:UL:{}:CLPControl:MODE:ALL {}'.format(
840            channel, mode))
841        if "tar" in mode.lower():
842            self.send_cmd(
843                'BSE:CONFig:NR5G:UL:{}:CLPControl:TARGet:POWer:ALL {}'.format(
844                    channel, target))
845
846    def configure_channel_emulator(self, cell_type, cell, fading_model):
847        if cell_type == 'LTE':
848            self.send_cmd('BSE:CONFig:{}:{}:CMODel {}'.format(
849                cell_type, Keysight5GTestApp._format_cells(cell),
850                fading_model['channel_model']))
851            self.send_cmd('BSE:CONFig:{}:{}:CMATrix {}'.format(
852                cell_type, Keysight5GTestApp._format_cells(cell),
853                fading_model['correlation_matrix']))
854            self.send_cmd('BSE:CONFig:{}:{}:MDSHift {}'.format(
855                cell_type, Keysight5GTestApp._format_cells(cell),
856                fading_model['max_doppler']))
857        elif cell_type == 'NR5G':
858            #TODO: check that this is FR1
859            self.send_cmd('BSE:CONFig:{}:{}:FRANge1:CMODel {}'.format(
860                cell_type, Keysight5GTestApp._format_cells(cell),
861                fading_model['channel_model']))
862            self.send_cmd('BSE:CONFig:{}:{}:FRANge1:CMATrix {}'.format(
863                cell_type, Keysight5GTestApp._format_cells(cell),
864                fading_model['correlation_matrix']))
865            self.send_cmd('BSE:CONFig:{}:{}:FRANge1:MDSHift {}'.format(
866                cell_type, Keysight5GTestApp._format_cells(cell),
867                fading_model['max_doppler']))
868
869    def set_channel_emulator_state(self, state):
870        self.send_cmd('BSE:CONFig:FADing:ENABle {}'.format(int(state)))
871
872    def enable_awgn_noise(self, cell_type, cell, enable, noise_level=-100):
873        self.send_cmd('BSE:CONFig:{}:{}:IMPairments:AWGN:POWer {}'.format(cell_type,
874                                                                          Keysight5GTestApp._format_cells(cell),
875                                                                          noise_level))
876        self.send_cmd('BSE:CONFig:{}:{}:IMPairments:AWGN:STATe {}'.format(cell_type,
877                                                                          Keysight5GTestApp._format_cells(cell),
878                                                                          enable))
879        self.send_cmd('BSE:CONFig:{}:APPLY'.format(cell_type))
880
881    def apply_lte_carrier_agg(self, cells):
882        """Function to start LTE carrier aggregation on already configured cells"""
883        if self.wait_for_cell_status('LTE', 'CELL1', 'CONN', 60):
884            self.send_cmd(
885                'BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:SCC {}'.format(
886                    Keysight5GTestApp._format_cells(cells)))
887            self.send_cmd(
888                'BSE:CONFig:LTE:CELL1:CAGGregation:ACTivate:SCC {}'.format(
889                    Keysight5GTestApp._format_cells(cells)))
890
891    def apply_carrier_agg(self):
892        """Function to start carrier aggregation on already configured cells"""
893        if not self.wait_for_cell_status('LTE', 'CELL1', 'CONN', 60):
894            raise RuntimeError('LTE must be connected to start aggregation.')
895        # Continue if LTE connected
896        self.send_cmd('BSE:CONFig:LTE:CELL1:CAGGregation:AGGRegate:NRCC:APPly',
897                      0, 0)
898        time.sleep(MEDIUM_SLEEP)
899        error = self.check_error()
900        if error:
901            acts_asserts.fail('Failed to apply NR carrier aggregation.')
902
903    def get_ip_throughput(self, cell_type):
904        """Function to query IP layer throughput on LTE or NR
905
906        Args:
907            cell_type: LTE or NR5G
908        Returns:
909            dict containing DL and UL IP-layer throughput
910        """
911        #Tester reply format
912        #{ report-count, total-bytes, current transfer-rate, average transfer-rate, peak transfer-rate }
913        dl_tput = self.send_cmd(
914            'BSE:MEASure:{}:BTHRoughput:DL:THRoughput:IP?'.format(cell_type),
915            1)
916        ul_tput = self.send_cmd(
917            'BSE:MEASure:{}:BTHRoughput:UL:THRoughput:IP?'.format(cell_type),
918            1)
919        return {'dl_tput': dl_tput, 'ul_tput': ul_tput}
920
921    def _get_throughput(self, cell_type, link, cell):
922        """Helper function to get PHY layer throughput on single cell"""
923        if cell_type == 'LTE':
924            tput_response = self.send_cmd(
925                'BSE:MEASure:LTE:BTHRoughput:{}:THRoughput:OTA:{}?'.format(
926                    link, Keysight5GTestApp._format_cells(cell)), 1)
927        elif cell_type == 'NR5G':
928            # Tester reply format
929            #progress-count, ack-count, ack-ratio, nack-count, nack-ratio,  statdtx-count,  statdtx-ratio,  pdschBlerCount,  pdschBlerRatio,  pdschTputRatio.
930            tput_response = self.send_cmd(
931                'BSE:MEASure:NR5G:BTHRoughput:{}:THRoughput:OTA:{}?'.format(
932                    link, Keysight5GTestApp._format_cells(cell)), 1)
933        tput_result = {
934            'frame_count': tput_response[0] / 1e6,
935            'current_tput': tput_response[1] / 1e6,
936            'min_tput': tput_response[2] / 1e6,
937            'max_tput': tput_response[3] / 1e6,
938            'average_tput': tput_response[4] / 1e6,
939            'theoretical_tput': tput_response[5] / 1e6,
940        }
941        return tput_result
942
943    def get_throughput(self, cell_type, dl_cells, ul_cells):
944        """Function to get PHY layer throughput on on or more cells
945
946        This function returns the throughput data on the requested cells
947        during the last BLER test run, i.e., throughpt data must be fetch at
948        the end/after a BLE test run on the Keysight Test App.
949
950        Args:
951            cell_type: LTE or NR5G
952            cells: list of cells to query for throughput data
953        Returns:
954            tput_result: dict containing all throughput statistics in Mbps
955        """
956        if not isinstance(dl_cells, list):
957            dl_cells = [dl_cells]
958        if not isinstance(ul_cells, list):
959            ul_cells = [ul_cells]
960        tput_result = collections.OrderedDict()
961        for cell in dl_cells:
962            tput_result.setdefault(cell, {})
963            tput_result[cell]['DL'] = self._get_throughput(
964                cell_type, 'DL', cell)
965            frame_count = tput_result[cell]['DL']['frame_count']
966        for cell in ul_cells:
967            tput_result.setdefault(cell, {})
968            tput_result[cell]['UL'] = self._get_throughput(
969                cell_type, 'UL', cell)
970        agg_tput = {
971            'DL': {
972                'frame_count': frame_count,
973                'current_tput': 0,
974                'min_tput': 0,
975                'max_tput': 0,
976                'average_tput': 0,
977                'theoretical_tput': 0
978            },
979            'UL': {
980                'frame_count': frame_count,
981                'current_tput': 0,
982                'min_tput': 0,
983                'max_tput': 0,
984                'average_tput': 0,
985                'theoretical_tput': 0
986            }
987        }
988        for cell, cell_tput in tput_result.items():
989            for link, link_tput in cell_tput.items():
990                for key, value in link_tput.items():
991                    if 'tput' in key:
992                        agg_tput[link][key] = agg_tput[link][key] + value
993        tput_result['total'] = agg_tput
994        return tput_result
995
996    def _clear_bler_measurement(self, cell_type):
997        """Helper function to clear BLER results."""
998        if cell_type == 'LTE':
999            self.send_cmd('BSE:MEASure:LTE:CELL1:BTHRoughput:CLEar')
1000        elif cell_type == 'NR5G':
1001            self.send_cmd('BSE:MEASure:NR5G:BTHRoughput:CLEar')
1002
1003    def _configure_bler_measurement(self, cell_type, continuous, length):
1004        """Helper function to configure BLER results."""
1005        if continuous:
1006            if cell_type == 'LTE':
1007                self.send_cmd('BSE:MEASure:LTE:CELL1:BTHRoughput:CONTinuous 1')
1008            elif cell_type == 'NR5G':
1009                self.send_cmd('BSE:MEASure:NR5G:BTHRoughput:CONTinuous 1')
1010        elif length > 1:
1011            if cell_type == 'LTE':
1012                self.send_cmd(
1013                    'BSE:MEASure:LTE:CELL1:BTHRoughput:LENGth {}'.format(
1014                        length))
1015                self.send_cmd('BSE:MEASure:LTE:CELL1:BTHRoughput:CONTinuous 0')
1016            elif cell_type == 'NR5G':
1017                self.send_cmd(
1018                    'BSE:MEASure:NR5G:BTHRoughput:LENGth {}'.format(length))
1019                self.send_cmd('BSE:MEASure:NR5G:BTHRoughput:CONTinuous 0')
1020
1021    def _set_bler_measurement_state(self, cell_type, state):
1022        """Helper function to start or stop BLER measurement."""
1023        if cell_type == 'LTE':
1024            self.send_cmd(
1025                'BSE:MEASure:LTE:CELL1:BTHRoughput:STATe {}'.format(state))
1026        elif cell_type == 'NR5G':
1027            self.send_cmd(
1028                'BSE:MEASure:NR5G:BTHRoughput:STATe {}'.format(state))
1029
1030    def start_bler_measurement(self, cell_type, cells, length):
1031        """Function to kick off a BLER measurement
1032
1033        Args:
1034            cell_type: LTE or NR5G
1035            length: integer length of BLER measurements in subframes
1036        """
1037        self._clear_bler_measurement(cell_type)
1038        self._set_bler_measurement_state(cell_type, 0)
1039        self._configure_bler_measurement(cell_type, 0, length)
1040        self._set_bler_measurement_state(cell_type, 1)
1041        time.sleep(0.1)
1042        bler_check = self.get_bler_result(cell_type, [1],[1], length, 0)
1043        if bler_check['total']['DL']['frame_count'] == 0:
1044            self.log.warning('BLER measurement did not start. Retrying')
1045            self._set_bler_measurement_state(cell_type, 0)
1046            self._set_bler_measurement_state(cell_type, 1)
1047
1048    def _get_bler(self, cell_type, link, cell):
1049        """Helper function to get single-cell BLER measurement results."""
1050        if cell_type == 'LTE':
1051            bler_response = self.send_cmd(
1052                'BSE:MEASure:LTE:BTHRoughput:{}:BLER:{}?'.format(
1053                    link, Keysight5GTestApp._format_cells(cell)), 1)
1054            bler_items = [
1055                'frame_count', 'ack_count', 'ack_ratio', 'nack_count',
1056                'nack_ratio', 'statDtx_count', 'statDtx_ratio',
1057                'nackStatDtx_count', 'nackStatDtx_ratio', 'pdschBler_count',
1058                'pdschBler_ratio', 'any_count', 'any_ratio'
1059            ]
1060            bler_result = {
1061                bler_items[x]: bler_response[x]
1062                for x in range(len(bler_response))
1063            }
1064        elif cell_type == 'NR5G':
1065            bler_response = self.send_cmd(
1066                'BSE:MEASure:NR5G:BTHRoughput:{}:BLER:{}?'.format(
1067                    link, Keysight5GTestApp._format_cells(cell)), 1)
1068            bler_items = [
1069                'frame_count', 'ack_count', 'ack_ratio', 'nack_count',
1070                'nack_ratio', 'statDtx_count', 'statDtx_ratio',
1071                'pdschBler_count', 'pdschBler_ratio', 'pdschTputRatio'
1072            ]
1073
1074            bler_result = {
1075                bler_items[x]: bler_response[x]
1076                for x in range(len(bler_response))
1077            }
1078        return bler_result
1079
1080    def get_bler_result(self,
1081                        cell_type,
1082                        dl_cells,
1083                        ul_cells,
1084                        length,
1085                        wait_for_length=1,
1086                        polling_interval=SHORT_SLEEP):
1087        """Function to get BLER results.
1088
1089        This function gets the BLER measurements results on one or more
1090        requested cells. The function can either return BLER statistics
1091        immediately or wait until a certain number of subframes have been
1092        counted (e.g. if the BLER measurement is done)
1093
1094        Args:
1095            cell_type: LTE or NR5G
1096            cells: list of cells for which to get BLER
1097            length: number of subframes to wait for (typically set to the
1098                    configured length of the BLER measurements)
1099            wait_for_length: boolean to block/wait till length subframes have
1100            been counted.
1101        Returns:
1102            bler_result: dict containing per-cell and aggregate BLER results
1103        """
1104        if not isinstance(dl_cells, list):
1105            dl_cells = [dl_cells]
1106        if not isinstance(ul_cells, list):
1107            ul_cells = [ul_cells]
1108        start_time = time.time()
1109        while wait_for_length:
1110            dl_bler = self._get_bler(cell_type, 'DL', dl_cells[0])
1111            elapsed_subframes = (time.time() - start_time)/SUBFRAME_DURATION
1112            if dl_bler['frame_count'] < length and elapsed_subframes < 2*length:
1113                time.sleep(polling_interval)
1114            else:
1115                break
1116
1117        bler_result = collections.OrderedDict()
1118        for cell in dl_cells:
1119            bler_result.setdefault(cell, {})
1120            bler_result[cell]['DL'] = self._get_bler(cell_type, 'DL', cell)
1121        for cell in ul_cells:
1122            bler_result.setdefault(cell, {})
1123            bler_result[cell]['UL'] = self._get_bler(cell_type, 'UL', cell)
1124        agg_bler = {
1125            'DL': {
1126                'frame_count': length,
1127                'ack_count': 0,
1128                'ack_ratio': 0,
1129                'nack_count': 0,
1130                'nack_ratio': 0
1131            },
1132            'UL': {
1133                'frame_count': length,
1134                'ack_count': 0,
1135                'ack_ratio': 0,
1136                'nack_count': 0,
1137                'nack_ratio': 0
1138            }
1139        }
1140        for cell, cell_bler in bler_result.items():
1141            for link, link_bler in cell_bler.items():
1142                for key, value in link_bler.items():
1143                    if 'ack_count' in key:
1144                        agg_bler[link][key] = agg_bler[link][key] + value
1145        dl_ack_nack = agg_bler['DL']['ack_count'] + agg_bler['DL']['nack_count']
1146        ul_ack_nack = agg_bler['UL']['ack_count'] + agg_bler['UL']['nack_count']
1147        try:
1148            agg_bler['DL'][
1149                'ack_ratio'] = agg_bler['DL']['ack_count'] / dl_ack_nack
1150            agg_bler['DL'][
1151                'nack_ratio'] = agg_bler['DL']['nack_count'] / dl_ack_nack
1152            agg_bler['UL'][
1153                'ack_ratio'] = agg_bler['UL']['ack_count'] / ul_ack_nack
1154            agg_bler['UL'][
1155                'nack_ratio'] = agg_bler['UL']['nack_count'] / ul_ack_nack
1156        except:
1157            self.log.debug(bler_result)
1158            agg_bler['DL']['ack_ratio'] = 0
1159            agg_bler['DL']['nack_ratio'] = 1
1160            agg_bler['UL']['ack_ratio'] = 0
1161            agg_bler['UL']['nack_ratio'] = 1
1162        bler_result['total'] = agg_bler
1163        return bler_result
1164
1165    def measure_bler(self, cell_type, cells, length):
1166        """Function to start and wait for BLER results.
1167
1168        This function starts a BLER test on a number of cells and waits for the
1169        test to complete before returning the BLER measurements.
1170
1171        Args:
1172            cell_type: LTE or NR5G
1173            cells: list of cells for which to get BLER
1174            length: number of subframes to wait for (typically set to the
1175                    configured length of the BLER measurements)
1176        Returns:
1177            bler_result: dict containing per-cell and aggregate BLER results
1178        """
1179        self.start_bler_measurement(cell_type, cells, length)
1180        time.sleep(length * SUBFRAME_DURATION)
1181        bler_result = self.get_bler_result(cell_type, cells, length, 1)
1182        return bler_result
1183
1184    def start_nr_rsrp_measurement(self, cells, length):
1185        """Function to start 5G NR RSRP measurement.
1186
1187        Args:
1188            cells: list of NR cells to get RSRP on
1189            length: length of RSRP measurement in milliseconds
1190        Returns:
1191            rsrp_result: dict containing per-cell and aggregate BLER results
1192        """
1193        for cell in cells:
1194            self.send_cmd('BSE:MEASure:NR5G:{}:L1:RSRPower:STOP'.format(
1195                Keysight5GTestApp._format_cells(cell)))
1196        for cell in cells:
1197            self.send_cmd('BSE:MEASure:NR5G:{}:L1:RSRPower:LENGth {}'.format(
1198                Keysight5GTestApp._format_cells(cell), length))
1199        for cell in cells:
1200            self.send_cmd('BSE:MEASure:NR5G:{}:L1:RSRPower:STARt'.format(
1201                Keysight5GTestApp._format_cells(cell)))
1202
1203    def get_nr_rsrp_measurement_state(self, cells):
1204        for cell in cells:
1205            self.log.info(
1206                self.send_cmd(
1207                    'BSE:MEASure:NR5G:{}:L1:RSRPower:STATe?'.format(
1208                        Keysight5GTestApp._format_cells(cell)), 1))
1209
1210    def get_nr_rsrp_measurement_results(self, cells):
1211        for cell in cells:
1212            self.log.info(
1213                self.send_cmd(
1214                    'BSE:MEASure:NR5G:{}:L1:RSRPower:REPorts:JSON?'.format(
1215                        Keysight5GTestApp._format_cells(cell)), 1))
1216
1217    def release_rrc_connection(self, cell_type, cell):
1218        if cell_type == 'LTE':
1219            self.send_cmd('BSE:FUNCtion:LTE:{}:RELease:SEND'.format(
1220                Keysight5GTestApp._format_cells(cell)))
1221        elif cell_type == 'NR5G':
1222            self.send_cmd(
1223                'BSE:CONFig:NR5G:{}:RCONtrol:RRC:STARt RRELease'.format(
1224                    Keysight5GTestApp._format_cells(cell)))
1225
1226    def send_rrc_paging(self, cell_type, cell):
1227        if cell_type == 'LTE':
1228            self.send_cmd('BSE:FUNCtion:LTE:{}:PAGing:PAGE'.format(
1229                Keysight5GTestApp._format_cells(cell)))
1230        elif cell_type == 'NR5G':
1231            self.send_cmd(
1232                'BSE:CONFig:NR5G:{}:RCONtrol:RRC:STARt PAGing'.format(
1233                    Keysight5GTestApp._format_cells(cell)))
1234
1235    def enable_rach(self, cell_type, cell, enabled):
1236        self.send_cmd('BSE:CONFig:{}:{}:MAC:RACH:IGNore {}'.format(
1237            cell_type, Keysight5GTestApp._format_cells(cell), int(enabled)))
1238        self.send_cmd('BSE:CONFig:{}:APPLY'.format(cell_type))
1239
1240    def enable_preamble_report(self, cell_type, enable):
1241        self.send_cmd('BSE:CONFig:{}:PReamble:REPort:STATe {}'.format(
1242            cell_type, int(enable)))
1243
1244    def fetch_preamble_report(self, cell_type, cell, num_reports=10):
1245        report = self.send_cmd(
1246            'BSE:CONFig:{}:{}:PReamble:REPort:FETCh:JSON? {}'.format(
1247                cell_type, Keysight5GTestApp._format_cells(cell), num_reports),
1248            read_response=1)
1249        self.send_cmd('BSE:CONFig:{}:PReamble:REPort:CLEAr'.format(cell_type))
1250        if 'No data available' in report:
1251            report = None
1252        return report
1253