Source code for taf.testlib.cli_template

# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""``cli_template.py``

`Abstract class for any CLI classes`

"""

import io
import random
import time
import re
from abc import ABCMeta, abstractmethod
from collections import namedtuple
import collections

from .custom_exceptions import CLIException
from . import loggers


[docs]class Raw(str): """This class represents raw commands for cli object. """ pass
CmdStatus = namedtuple("CmdStatus", "stdout, stderr, rc")
[docs]class CLIGenericMixin(object, metaclass=ABCMeta): """Base class for CLI configuration. """ Raw = Raw class_logger = loggers.ClassLogger()
[docs] def __init__(self): """Entry __init__ method defines class variable. """ self.CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" self.prompt = None self.page_break = None self.sudoprompt = None self.password = None
[docs] def randstr(self, length): """Return random string with required length. Args: length(int): Required length """ return "".join(random.choice(self.CHARS) for x in range(length))
[docs] def expect(self, obj, expect_list, timeout=60, interval=0.1, remove_cmd=True, is_shell=False): """Expecting prompts and return prompt index in expect_list. Args: obj(Channel): CLI obj of different types. expect_list(list): List of compiled re objects to find prompt in data. timeout(int): Expecting timeout. interval(int): Interval between read data cycles. remove_cmd(bool): Flag whether to remove command from output during searching prompt in data. is_shell(bool): Indicates shell command Returns: tuple: Found position from expect_list and full command output """ full_out = "" temp_data = "" end_time = time.time() + timeout while True: time.sleep(interval) # Read data from IO. data = obj.read() # Remove command from searched data if data: if remove_cmd and not is_shell: if "END_COMMAND" in data and len(data.split("\n")) <= 2: temp_data = data else: counter = 0 while len(data.split("\n")) < 2 and counter < 30: data += obj.read() counter += 1 time.sleep(0.1) if "\n" in data: temp_data = " ".join(data.split("\n")[1:]) else: temp_data = "" remove_cmd = False else: temp_data = data full_out += data # Update action and exit_flag for position, expect_re in enumerate(expect_list): if expect_re.search(temp_data) is not None: return position, full_out # When timeout ends exit from loop and return None if time.time() >= end_time: return -1, full_out
[docs] def action_on_expect(self, obj, alternatives, timeout=60, interval=0.1, command_timeout=600, is_shell=False): """Performs actions on found prompts. Returns command output data. Args: obj(Channel): CLI object of different types. alternatives(tuple | list): Tuples of ("expected line", "action if line is found", <Exit execution? (bool)>, <Use ones? (bool)>). action can be: - str - in case this is just command; - function - callable object to execute without parameters; timeout(int): Expecting timeout. interval(int | float): Interval between read data cycles. command_timeout(int): Command execution timeout. is_shell(bool): Indicates shell command Raises: CLIException: timeout exceeded for command execution Returns: str: Full output """ # Flag whether to remove command from output during searching prompt in data remove_cmd = True # Update alternatives. expect_list = [] for alter_item in alternatives: if alter_item[0]: expect_list.append((re.compile(re.escape(alter_item[0])), alter_item[1], alter_item[2], alter_item[3])) full_out = "" exit_flag = None end_time = time.time() + timeout + command_timeout while not exit_flag: if time.time() >= end_time: # class_logger is defined in subclasses self.class_logger.error(full_out + "\n") # pylint: disable=no-member raise CLIException("Command execution is not finished: timeout %s" % (timeout + command_timeout)) prompt_list = [] for expect_item in expect_list: prompt_list.append(expect_item[0]) _output = None _output = obj.expect(prompt_list, timeout=timeout, interval=interval, remove_cmd=remove_cmd, is_shell=is_shell) # command has to be removed only from the 1st output part. remove_cmd = False if _output[0] == -1: err = "Timeout exceeded. Expected prompt didn't appear in %s seconds.\n" % timeout full_out += _output[1] # class_logger is defined in subclasses self.class_logger.error(full_out) # pylint: disable=no-member raise CLIException(err) full_out += _output[1] _action = expect_list[_output[0]][1] if _action: if isinstance(_action, Raw): obj.write(_action) elif isinstance(_action, str): obj.write(str(_action) + "\n") elif isinstance(_action, collections.Callable): _action() exit_flag = expect_list[_output[0]][2] # Remove alternative from list if it's requested if expect_list[_output[0]][3]: expect_list.pop(_output[0]) return full_out
[docs] def action_on_connect(self, obj, alternatives, timeout=60, interval=0.1, command_timeout=600, is_shell=False): """Performs actions on found prompts. Returns command output data. This is only for CLI connect. See action_on_expect for details. Args: obj(Channel): CLI object of different types. alternatives(tuple): Tuples of ("expected line", "action if line is found", <Exit execution? (bool)>, <Use ones? (bool)>). action can be: - str - in case this is just command; - function - callable object to execute without parameters; timeout(int): Expecting timeout. interval(int): Interval between read data cycles. command_timeout(int): Command execution timeout. is_shell(bool): Indicates shell command Raises: CLIException: sudoprompt is not defined Returns: str: Full output """ # Update alternatives. expect_list = [] for alter_item in alternatives: if alter_item[0]: expect_list.append((re.compile(re.escape(alter_item[0])), alter_item[1], alter_item[2], alter_item[3])) full_out = "" prompt_list = [] for expect_item in expect_list: prompt_list.append(expect_item[0]) _output = None _output = obj.expect(prompt_list, timeout=timeout, interval=interval, remove_cmd=True, is_shell=is_shell) full_out += _output[1] _action = expect_list[_output[0]][1] if _action: if isinstance(_action, Raw): obj.write(_action) elif isinstance(_action, str): obj.write(str(_action) + "\n") elif isinstance(_action, collections.Callable): _action() # Remove alternative from list if it's requested if expect_list[_output[0]][3]: expect_list.pop(_output[0]) return full_out
[docs] def prepare_ssh_shell_obj(self, shell): """Add read(), write() and expect() methods to emulate object IO methods. Args: shell(paramiko.Channel): paramiko.Channel object. Returns: paramiko.Channel: paramiko.Channel object. """ # use a closure def read(): """ Non blocking read implementation. """ data = "" while shell.recv_ready(): # += for strings is optimized, don't worry. # we still have to decode here since there can be unicode data += shell.recv(200000).decode('utf-8') return data def expect(alternatives=None, timeout=60, interval=0.1, remove_cmd=True, is_shell=False): return self.expect(shell, alternatives, timeout, interval, remove_cmd, is_shell) shell.write = shell.sendall shell.read = read shell.expect = expect return shell
[docs] def normalize_output(self, data, command, ret_code, end_pattern=None): """Removes command and command's end flag from output data. Extracts return code of the command. Args: data(str): Output data of command. command(str): Executed command. ret_code(bool): Flag which shows if return code command was added to main command. end_pattern(str): pattern which is used to find end command flag. Returns: tuple: data and return code """ # Remove command itself from output data_list = data.split("\n") updated_data_list = [] # Prepare output data(it removes \r in case if long command is used) for item in data_list: data_l = [] temp_data_list = item.split(" ") for data_string in temp_data_list: data_l.append(data_string.lstrip('\r').rstrip('\r')) item = " ".join(data_l) updated_data_list.append(item) # Remove command from output count = 0 indexes_list = [] for data_item in updated_data_list: if command.split(";")[0] in data_item: indexes_list.append(count) count += 1 count = 0 for index in indexes_list: data_list.pop(index - count) count += 1 data = "\n".join(data_list) # Removes prompt from output data if self.prompt: for single_prompt in self.prompt if isinstance(self.prompt, list) else [self.prompt]: str_prompt = single_prompt.decode('utf-8') if str_prompt in data.split("\n")[-1]: temp_data_list = data.split("\n") if temp_data_list[-1].endswith(str_prompt): data_string = temp_data_list[-1].rsplit(str_prompt, 1)[0] temp_data_list[-1] = data_string data = "\n".join(temp_data_list) break return_code = "" # Remove end_command flag. if ret_code: end_pattern += r"=\[(-*\d{,3})" ret_code_re = re.compile(end_pattern) re_s = ret_code_re.search(data) if re_s is not None: _start, _ = re_s.regs[0] return_code = re_s.groups()[-1] data = data[:_start] else: return_code = None return data, return_code
[docs] def prepare_pexpect_obj(self, pexp_obj): """Add read(), write() and expect() methods to emulate object IO methods. Args: pexp_obj(pexpect): pexpect object. Returns: pexpect: pexpect object. """ pexp_obj.mod_expect = pexp_obj.expect def wrap_exp(obj, prompt_list, timeout, interval, remove_cmd, is_shell): """Add expect() method to emulate object IO methods. """ rc, data = self.expect(obj, prompt_list, timeout, interval, remove_cmd, is_shell) return rc, data def wrap_read(pexp_obj): """Add read() method to emulate object IO methods. """ try: data = pexp_obj.read_nonblocking(200000, 1) except Exception: data = "" return data pexp_obj.expect = lambda prompt, timeout, interval, remove_cmd=True, is_shell=False: wrap_exp(pexp_obj, prompt, timeout, interval, remove_cmd, is_shell) pexp_obj.write = lambda cmd: pexp_obj.sendline(cmd.replace("\n", "")) pexp_obj.read = lambda: wrap_read(pexp_obj) return pexp_obj
[docs] def prepare_telnet_obj(self, telnet_obj): """Add read() and expect() methods to object to emulate object IO methods. Args: telnet_obj(telnetlib): telnetlib object. Returns: telnetlib: telnetlib object """ telnet_obj.mod_expect = telnet_obj.expect def wrap_exp(obj, prompt_list, timeout, interval, remove_cmd, is_shell): """Add expect() method to emulate object IO methods. """ rc, data = self.expect(obj, prompt_list, timeout, interval, remove_cmd, is_shell) return rc, data def wrap_read(telnet_obj): """Add read() method to emulate object IO methods. """ try: data = telnet_obj.read_very_eager().decode('utf-8') except Exception: data = "" return data # telnet_obj.expect = lambda prompt, timeout, interval, remove_cmd=True, is_shell=False: wrap_exp(telnet_obj, prompt, timeout, interval, # remove_cmd, is_shell) telnet_obj.expect = lambda alternatives=None, timeout=60, interval=0.1, remove_cmd=True, is_shell=False: self.expect(telnet_obj, alternatives, timeout, interval, remove_cmd, is_shell) telnet_obj.read = lambda: wrap_read(telnet_obj) # Clear output buffer from previous programs telnet_obj.read_very_eager() return telnet_obj
[docs] def cmd_output_log(self, so, se): """log message normalizer. Args: so(str): StdOut se(str): StdErr Returns: str: Normalized output """ message = "Command output:" if so: message += " StdOut\n{0}".format(so) else: message += " StdOut: <empty>" if se: message += " StdErr:\n{0}".format(se) else: message += " StdErr: <empty>" return message
[docs] def prepare_alter(self, command, alternatives=None, sudo=False, ret_code=True, page_break=None): """Adds specified alternatives to list, updates command with end command and sudo if needed. Args: command(str): Command to be executed. alternatives(tuple): Tuples of ("expected line", "action if line is found", <Exit execution? (bool)>, <Use ones? (bool)>). action can be: - str - in case this is just command; - function - callable object to execute without parameters; sudo(bool): Flag if sudo should be added to the list of alternatives . ret_code(bool): Flag if return code should be added to the list of alternatives. page_break(bool): Flag if page break should be added to the list of alternatives. Returns: tuple: command, alternatives, end_pattern """ if alternatives is None: alternatives = [] # Add one or few expected prompt(s) and action(s) to alternatives list if self.prompt: if isinstance(self.prompt, list): for single_prompt in self.prompt: alternatives.append((single_prompt.decode('utf-8'), None, True, False)) else: alternatives.append((self.prompt.decode('utf-8'), None, True, False)) # Append page_break tuple with exit_flag = False. if self.page_break: alternatives.append((self.page_break, Raw(" "), False, False)) elif page_break: alternatives.append((page_break, Raw(" "), False, False)) # FYI: shell.recv_exit_status() returns exit status of shell process, and any attempts # to read it before shell is closed invokes "dead" lock. end_pattern = None if ret_code: # '' is required to exclude false end command detection on command send. end_flag = "END_COM''MAND_{0}_{1}".format(id(self), self.randstr(10)) # Remove '' for expect pattern. end_pattern = end_flag[:7] + end_flag[9:] command += "; echo {0}=[$?]".format(end_flag) alternatives.append((end_pattern, None, True, False)) if sudo: command = "sudo " + command if self.sudoprompt is None: raise CLIException("sudo prompt is not defined. Cannot execute command with sudo.") alternatives.append((self.sudoprompt, self.password, False, True)) return command, alternatives, end_pattern
[docs] @abstractmethod def login(self): """Do CLI object login procedure. Args: username(str): Host login (string). password(str): Host password(string). timeout(int): Time to execute login procedure (integer). wait_login(int): time to wait login prompt before sending <Enter>. <Enter> is necessary if login prompt has been already appeared before connection is established. alternatives(list of tuples): list of alternative prompts and actions. connect(bool): Flag if connection should be established before login procedure (bool). Returns: None """ pass
[docs] @abstractmethod def close(self): """Close CLI object connection. """ pass
[docs] @abstractmethod def open_shell(self): """Create interactive CLI shell on existing connection. """ pass
[docs] @abstractmethod def close_shell(self): """Close interactive CLI shell on existing connection. """ pass
[docs] @abstractmethod def check_shell(self): """Check if CLI connection is alive. """ pass
[docs] @abstractmethod def shell_read(self): """Read data from output buffer. Args: timeout(int): Increases time to read data from output buffer. interval(int): Time delay between attempts to read data from output buffer. """ pass
[docs] @abstractmethod def send_command(self): """Run command without waiting response. Args: command(str): Command to be executed. """ pass
[docs] @abstractmethod def exec_command(self): """Execute command without shell (tty). Args: command(str): Command to be executed. timeout(int): Timeout for command execution. Returns: tuple(str, str, int): tuple of stdout, stderr, rc """ pass
[docs] @abstractmethod def shell_command(self): """Run interactive command on previously created shell (tty). Args: command(str): Command to be executed. alternatives(tuple): Tuples of ("expected line", "action if line is found", <Exit execution? (bool)>, <Use ones? (bool)>). action can be: - str - in case this is just command; - function - callable object to execute without parameters; timeout(int): Expecting timeout. sudo(bool): Flag if sudo should be added to the list of alternatives. ret_code(bool): Flag if return code should be added to the list of alternatives. expected_rc(int): Sets return code and verifies if return code of executed command the same as expected return code (int or str). quiet(bool): Flag to verify if expected return equals expected. raw_output(bool): Flag whether to return 'pure' output. """ pass
[docs] @abstractmethod def put_file(self): """Transfer file from/to remote host. Args: src(str): File's source. dst(str): File's destination. """ pass
[docs] @abstractmethod def get_file(self): """Put file to remote host. Args: src(str): File's source. dst(str): File's destination. """ pass