Source code for taf.testlib.clicmd_iss

# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""``clicmd_iss.py``

`Module for CLI specific functionality`

"""

import re
import os
import traceback
import time

import pytest
from . import clitelnet

from . import loggers
from .custom_exceptions import CLICMDException


[docs]def get_table_title(line): """Get table name. Args: line(str): output """ if ":" in line: table_title = line.split(":")[0] return table_title else: return None
[docs]def get_column_ranges(line): """Get column ranges. Args: line(str): string with "--" column delimiters Returns: list[list[str]]: column_indexes - list of lists with string indexes of columns """ if "--" in line: column_indexes = [] columns = line.split() idx = 0 for column_item in columns: column_indexes.append([idx, idx + len(column_item.lstrip())]) idx = idx + len(column_item) + 1 return column_indexes else: return None
[docs]def get_column_names(table_data, column_ranges): """Get column name. Args: table_data(str): String fetched from SSH CLI with column names column_ranges(list): List with columns width string indexes used to extract column names Returns: list[str]: column_names_list - List of strings with columns names """ column_names_dict = {} column_names_list = [] i = 0 for line in table_data.split('\n'): col_name_lines = [] for index_pair in column_ranges: col_name_lines.append(line[index_pair[0]: index_pair[1]].rstrip().lstrip()) column_names_dict.update({i: col_name_lines}) i += 1 if len(list(column_names_dict.keys())) > 1: for j in range(len(column_names_dict[0])): column_name = '' for key in column_names_dict: column_name += "%s " % (column_names_dict[key][j], ) column_names_list.append(column_name.rstrip()) else: column_names_list = column_names_dict[0] return column_names_list
[docs]def get_dotted_table(table_lines): """Get table data. Args: table_lines(list): list of table rows Returns: list[list]: table_list - list of lists with row names and values """ table_list = [] for line in table_lines: if "...." in line: table_list.append([re.compile(r' \.{2,} ').split(line)[0], re.compile(r' \.{2,} ').split(line)[1]]) return table_list
[docs]def get_table_value(table_data, identifier=None, checker=None): """Gets necessary field value from the table. Args: table_data(str): console output data (string), identifier(list[]): Column name and row number['column', 'value'], checker(str): column name to check value (string). Raises: CLICMDException: invalid row length Returns: str: Field value. """ table_value = '' # === parse received data for table header and type of table ================== i = 0 raw_tables_dict = {} tables_dict = {} table_lines_list = [] raw_lines_list = [] for line in table_data.split("\n"): raw_lines_list.append(line.replace("\r", "")) raw_lines_list.pop(0) raw_lines_list.pop(-1) for line in raw_lines_list: if line.strip(): table_lines_list.append(line) else: if len(table_lines_list) > 0: raw_tables_dict.update({i: table_lines_list}) table_lines_list = [] i += 1 for key in raw_tables_dict: # table_header = '' # check if table header present in line if ":" in raw_tables_dict[key][0].rstrip()[-1]: # table_header = raw_tables_dict[key][0].rstrip() raw_tables_dict[key].pop(0) elif ".." in raw_tables_dict[key][0]: tables_dict[key] = get_dotted_table(raw_tables_dict[key]) else: j = 0 for line in raw_tables_dict[key]: if '--' not in line: j += 1 else: col_names_text = '' column_ranges = get_column_ranges(line) for idx in range(j): col_names_text += '%s\n' % (raw_tables_dict[key][idx], ) column_names_list = get_column_names(col_names_text, column_ranges) tables_dict[key] = [] for table_column_idx, value in enumerate(column_names_list): table_column = [] table_column.append(column_names_list[table_column_idx]) for table_row_idx in range(j + 1, len(raw_tables_dict[key])): table_column.append(raw_tables_dict[key][table_row_idx][column_ranges[table_column_idx][0]: column_ranges[table_column_idx][1]].rstrip().lstrip()) tables_dict[key].append(table_column) # ===== getting value from table ================================================== # in case we have multiple tables lets find out do they have similar columns row_index = None if len(list(tables_dict.keys())) > 1: flags_dict = {} # in case tables have equal length if len(tables_dict[list(tables_dict.keys())[0]]) == len(tables_dict[list(tables_dict.keys())[1]]): # take columns from first to last from first table from dictionary for col_idx in range(len(tables_dict[0])): for table_key in range(1, len(list(tables_dict.keys()))): # and compare it with according column from next tables in tables dictionary if tables_dict[0][col_idx][0] == tables_dict[table_key][col_idx][0]: flags_dict[table_key] = 1 # append values to first table for list_index in range(1, len(tables_dict[table_key][col_idx])): tables_dict[0][col_idx].append(tables_dict[table_key][col_idx][list_index]) else: flags_dict[table_key] = False for table_key in range(1, len(list(tables_dict.keys()))): if flags_dict[table_key] == 1: tables_dict.pop(table_key) # check if transfered symbols are present, check if empty element are present. trans_flag = False for key in tables_dict: for element_list in tables_dict[key]: for element in element_list: if not element: trans_flag = True # lead all tables in one format (model will be firs row) if trans_flag: for key in tables_dict: lead_row = tables_dict[key][0] # check len for row in tables_dict[key][1:]: if len(lead_row) != len(row): message = "Row length is invalid: {0} != {1}".format(lead_row, row) raise CLICMDException(message) else: for lead_elem, row_elem in zip(lead_row, row): if bool(lead_elem) != bool(row_elem): # transform row_elem in lead_format # print "lead_elem, row_elem", lead_elem, row_elem rowidx = tables_dict[key].index(row) # print "tables_dict[key][rowidx]", tables_dict[key][rowidx] row_elemidx = tables_dict[key][rowidx].index(row_elem) # print "tables_dict[key][rowidx][row_elemidx]", tables_dict[key][rowidx][row_elemidx] new_value = "{0}{1}".format(tables_dict[key][rowidx][row_elemidx - 1], tables_dict[key][rowidx][row_elemidx]) tables_dict[key][rowidx][row_elemidx - 1] = new_value tables_dict[key][rowidx][row_elemidx] = "" for key in tables_dict: for column in tables_dict[key]: if column[0] == identifier[0]: for value in column: if value == identifier[1]: row_index = column.index(value) for column1 in tables_dict[key]: if column1[0] == checker: table_value = column1[row_index] break if table_value == '': row_index = None break if table_value != '': break if table_value == '': table_value = "CLIException: Specified table row has not been found" return table_value
[docs]class CLICmd(object): """Class for CLI specific functionality. Args: config: environment config. switches: switches list. """ suite_logger = loggers.ClassLogger() # TODO: add wait_until_value_is_changed method for CLI
[docs] def __init__(self, ipaddr, port, login, passw, prompt, devtype, delay=None, build_path=None, img_path=None, page_break="<?> - help.", xmlrpcport=None): """Initialize CLICmd class """ self.timeout = 9 # find out login, password and command prompt for switches from config for defined user self.ipaddr = ipaddr self.port = port self.xmlrpcport = xmlrpcport self.login = login self.passw = passw self.prompt = prompt self.devtype = devtype self.build_path = build_path self.img_path = img_path self.is_shell = False self.page_break = page_break # create ssh connection to switches and store it in self.conn dictionary self.conn = clitelnet.TelnetCMD(self.ipaddr, username=self.login, password=self.passw, page_break=page_break, prompt=self.prompt) if delay: self.conn.delay = delay
[docs] def _connect_to_switch(self, prompt, timeout=20): """SSH connect to switch and wait untill prompt string appeared. Args: prompt(str): expected CLI prompt. timeout(int): connection timeout. Returns: None Examples:: self._connect_to_switches(sw_keys=1, prompt="Switch ") """ cli_start_path = '' self.suite_logger.debug("Login on switch with login: {0} and expected prompt is: {1}".format(self.login, prompt)) self.conn.connect() self.suite_logger.debug("Create Shell") self.conn.open_shell(raw_output=True)
[docs] def cli_get(self, arguments_list, prompt=None, show=True, timeout=25): """Getting values by CLI. Args: arguments_list(list): list of arguments to get values. prompt(str): expected promt or message, takes from cli_set_result. show(bool): execute command with show prefix timeout(int) command execution timeout Raises: Exception: error on command execution Returns: list: List of CLI-GET command results. Examples:: env.switch[1].cli.cli_get(['enable, none 0 none', 'statistics, Port 1, RX Discards']]) """ if not prompt: prompt = self.prompt result = [] try: self._connect_to_switch(prompt) if arguments_list != [["readOnly"]]: for arguments in arguments_list: args = arguments table = args[0].strip() identifier = [] if len(args[1].strip().split(' ')) > 2: # " @" - delimiter between column name and value. if "@" in args[1]: identifier = args[1].split(" @") else: str_val_temp = [] for str_val_idx in range(len(args[1].strip().split(' ')) - 1): str_val_temp.append(args[1].strip().split(' ')[str_val_idx]) identifier.append(" ".join(str_val_temp)) identifier.append(args[1].strip().split(' ')[-1]) else: identifier = args[1].strip().split(' ') checker = args[2].strip() if checker == 'none': command = table else: if show: command = 'show ' + table else: command = table # Run cli command and get output "<?> - help." alternatives = [("<?> - help.", " ", False, False), ] data, err = self.conn.shell_command(command, alternatives=alternatives, timeout=timeout, ret_code=False, quiet=True, raw_output=True) # Data validation if len(data.split('\n')) == 5 and "....." not in data.split('\n')[-3]: result.append([data.split('\n')[-3].strip()]) elif len(data.split('\n')) == 2: result.append([data.split('\n')[-1].strip()]) else: # Remove page break from data data = data.replace("<?> - help.", "") value = get_table_value(data, identifier=identifier, checker=checker) result.append([value]) else: result = [["readOnly"]] # Close SSH connections except Exception: self.suite_logger.debug("Cli_get. Exception traceback data: {0}".format(traceback.format_exc())) raise finally: if self.conn: self.conn.close() return result
[docs] def cli_get_all(self, arguments_list, prompt=None, timeout=25, interval=0.1): """Getting values by CLI. Args: arguments_list(list): list of arguments to get values. prompt(str): expected promt or message, takes from cli_set_result. timeout(int): command execution timeout interval(int): time interval between read attempts Raises: Exception: error on command execution Returns: list: List of CLI-GET command results. """ result = [] alternatives = [] try: for command in arguments_list: # Run cli command and get output "<?> - help." alternatives.append(("<?> - help.", " ", False, False)) if "'" in command[0]: command[0] = command[0].replace("'", '"') if "::::" in command[0]: command[0], answer = command[0].split("::::") alternatives.append(("('yes'/'no'):", answer, False, False)) data, err = self.conn.shell_command(command[0], alternatives=alternatives, timeout=timeout, ret_code=False, quiet=True, raw_output=True, interval=interval) data = data.replace("<?> - help.", "") result.append(data.replace(command[0], "").strip()) except Exception: self.suite_logger.debug("Cli_get_all. Exception traceback data: {0}".format(traceback.format_exc())) raise return result
[docs] def cli_set(self, commands_list, timeout=5, prompt=None, connect=True): """Setting values by CLI. Args: commands_list(list): list of commands. (str): expected promt or message, takes from cli_set_result. timeout(int): commnad execution timeout connect(bool): Flag if connection should be established before login procedure. Raises: Exception: error on command execution Returns: list: List of CLI-SET command results. Examples:: env.switch[1].cli.cli_set([["enable"], ["vlan-database"], ["vlan 10"]]) """ alternatives = [] tabulation = False if not prompt: prompt = self.prompt result = [] try: if connect: self._connect_to_switch(prompt) self.suite_logger.debug("Connection to switch established.") if commands_list != [["readOnly"]]: for commands in commands_list: command = commands[0] if len(commands) == 2: if isinstance(commands[1], int): timeout = commands[1] elif isinstance(commands[1], str) or isinstance(commands[1], str): if "\t" in commands[1]: tabulation = commands[1] # Replace ' by ", for cli support if "'" in command: command = command.replace("'", '"') if "::::" in command: command, answer = command.split("::::") alternatives = [("('yes'/'no'):", answer, False, False), ] # Run CLI commands and get answer data, err = self.conn.shell_command(command, alternatives=alternatives, timeout=timeout, ret_code=False, quiet=True, raw_output=True) # Return error message if it present in output data if len(data.split('\n')) > 2 and ("Error!" in data or "CLIException" in data or "Invalid command has been entered" in data or "Incomplete command has been" in data or "Notice" in data): data = data.replace("<?> - help.", "") # Split multi error message if exist in one row result.append([(("".join(data.split('\n')[2:-1])).strip()).replace("\r", " ")]) else: # tabulation support processing: # one tab case: if tabulation == "\t": res = [] for line in data.split('\n')[1:-1]: for elem in line.split(' '): res.append(elem.strip()) result.append(res) # double tab case: elif tabulation == "\t\t": res = [] split_data = [] for line in data.split('\n'): split_data.append(line.strip()) split_point = split_data[:-1].index(split_data[-1]) for line in split_data[split_point + 1:-1]: res.append(line.strip()) result.append(res) elif tabulation == "\t\t\t": res = [] for line in data.split('\n')[1:-1]: res.append(line.strip()) result.append(res) else: result.append([data.split('\n')[-1].strip()]) else: result = [["readOnly"]] except Exception as err: self.suite_logger.error("Cli_set error: %s" % (err, )) self.suite_logger.error("Cli_set. Exception traceback data:\n%s" % (traceback.format_exc(), )) raise finally: if connect: if self.conn: self.conn.close() return result
[docs] def cli_connect(self, prompt=None): """SSH connect to switch and wait untill prompt string appeared. Args: prompt(str): expected CLI prompt. Returns: None Examples:: env.switch[1].cli.cli_connect(prompt="Switch ") """ if not prompt: prompt = self.prompt self._connect_to_switch(prompt)
[docs] def cli_disconnect(self): """Close ssh connection to switch. Raises: CLICMDException: error on disconnect Returns: None Examples:: env.switch[1].cli.cli_disconnect() """ try: if self.conn: self.conn.close() except Exception as err: raise CLICMDException(err)
[docs] def update_prompt(self, prompt): """Updating prompt in both clissh and clicmd_ons objects. Args: prompt(str): Prompt to be updated """ self.prompt = prompt self.conn.prompt = prompt