Source code for taf.testlib.clinns

# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""``clinns.py``

`Module contains classes for managing device using SSH connection or SSH connection emulation for Linux Network`

"""

from subprocess import Popen, PIPE

import pexpect

from . import loggers
from .cli_template import CLIGenericMixin
from .custom_exceptions import CLINNSException


[docs]def ip_net_namespace_mode(function): """Decorator: get clissh_instance for class methods. """ def wrapper(*args, **kwargs): """Add "ip netns exec " prefix to all commands to forward them into network namespace. """ if "command" in kwargs: command_line = kwargs.pop("command") command_list = command_line.split() else: command_list = args[1].split() if command_list[0] == "sudo": command_list.pop(0) use_sudo = True else: use_sudo = False cmd_prefix = ["ip", "netns", "exec", args[0].netns_name] if use_sudo: cmd_prefix.insert(0, "sudo") command_list = cmd_prefix + command_list result = ' '.join(command_list) _args = list(args) _args[1] = result args = tuple(_args) return function(*args, **kwargs) return wrapper
[docs]class CLISSHNetNS(CLIGenericMixin): """Class for executing command inside of namespace. Unused parameters added to support the same interface for other CLI classes. Examples:: client = CLISSHNetNS("some_nns", timeout=10) client.exec_command("some_command") """ class_logger = loggers.ClassLogger()
[docs] def __init__(self, nsname, port=None, username=None, password=None, page_break=None, prompt=None, pass_prompt=None, sudo_prompt=None, login_prompt=None, page_break_lines=None, exit_cmd=None, timeout=10, quiet=False): """Initialize CLISSHNetNS class. Args: nsname(str): NNS name. port(int): Host port. username(str): Host user password(str): Host password page_break(str): Page brake marker. prompt(str): Shell prompt. pass_prompt(str): Login password prompt. sudo_prompt(str): Sudo password prompt. login_prompt(str): Login prompt. page_break_lines(int): Number of page brake lines. exit_cmd(str): Command to perform telnet exit. timeout(int): Default timeout for commands. quiet(bool): Flag for return code verification. """ super(CLISSHNetNS, self).__init__() self.netns_name = nsname self.page_break = page_break self.prompt = prompt self.sudoprompt = sudo_prompt self.timeout = timeout self.login_status = False self.password = None self.child = None # Default action: raise an exception if the command's exit code isn't 0 or not. self.quiet = quiet
[docs] def login(self, *args, **kwargs): """Shell is always opened for Popen\\Pexpect. """ self.login_status = True
[docs] def open_shell(self): """Shell is always opened for Popen\\Pexpect. """ pass
[docs] def close_shell(self): """Shell is always opened for Popen\\Pexpect. """ pass
[docs] def check_shell(self): """Shell is always existed for Popen\\Pexpect. """ pass
[docs] def close(self): """Popen\\Pexpect object is always opened and doesn't have 'close' method. """ pass
[docs] def shell_read(self, timeout=0, interval=0.1): """Read data from output buffer. Args: timeout(int): Increases time to read data from output buffer. interval(int): Time delay between attempts to read data from output buffer. """ data = "" if self.child: data = self.child.read() else: self.class_logger.warning("Child object isn't created, reading is skipped") return data
[docs] @ip_net_namespace_mode def exec_command(self, command, timeout=None): """Execute command without shell (tty). Args: command(str): Command to be executed. timeout(int): Timeout for command execution. Returns: tuple(str, str, int): tuple of stdout, stderr, rc """ self.class_logger.debug(command) command = command.split(" ") process = Popen(command, stdout=PIPE, stderr=PIPE) process.wait() so = process.stdout.read() se = process.stderr.read() self.class_logger.debug(self.cmd_output_log(so, se)) return so, se
# TODO: Implement bash creation for shell_command
[docs] @ip_net_namespace_mode def shell_command(self, command, alternatives=None, timeout=None, sudo=False, ret_code=True, expected_rc="0", quiet=None, raw_output=False, interval=0.1): """Run interactive command on previously created shell (tty). Args: command(str): Command to be executed. alternatives(tuple): Tuples of ("expected line", "action if line is found", <Exit execution? (bool)>, <Use ones? (bool)>). action can be: - str - in case this is just command; - function - callable object to execute without parameters; timeout(int): Expecting timeout. sudo(bool): Flag if sudo should be added to the list of alternatives. ret_code(bool): Flag if return code should be added to the list of alternatives. expected_rc(int): Sets return code and verifies if return code of executed command the same as expected return code (int or str). quiet(bool): Flag to verify if expected return equals expected. raw_output(bool): Flag whether to return 'pure' output. interval(int | float): Interval between read data cycles. Raises: CLINNSException: unexpected return code. """ self.class_logger.debug("Command {0}".format(command)) if timeout is None: timeout = self.timeout if quiet is None: quiet = self.quiet if isinstance(expected_rc, int): expected_rc = str(expected_rc) data = "" return_code = None command, alternatives, end_pattern = self.prepare_alter(command, alternatives, sudo=True) self.child = pexpect.spawn('bash', ['-c', command], maxread=1) self.child = self.prepare_pexpect_obj(self.child) data = self.action_on_expect(self.child, alternatives, timeout, interval) if not raw_output: data, return_code = self.normalize_output(data, command, ret_code, end_pattern) self.class_logger.debug("Command output:\n{0}".format(data)) if ret_code and not quiet: if return_code != expected_rc: raise CLINNSException("Command returned the unexpected code: {0}".format(return_code)) return data, return_code
[docs] @ip_net_namespace_mode def send_command(self, command): """Run command without waiting response. Args: command(str): Command to be executed. """ self.class_logger.debug(command) command = command.split(" ") Popen(command, stdout=PIPE, stderr=PIPE)
[docs] def put_file(self, src, dst): """Transfer file from/to remote host. Args: src(str): File's source. dst(str): File's destination. """ if isinstance(src, str): src = [src, ] dst = [dst, ] for _src, _dst in zip(src, dst): command = "sudo cp {0} {1}".format(_src, _dst) so, se, rc = self.native_cmd(command, verbose=False) if rc != "0": self.class_logger.warning("Error when copy {0} to {1} : \n{2}".format(_src, _dst, se))
[docs] def get_file(self, src, dst): """Put file to remote host. Args: src(str): File's source. dst(str): File's destination. """ self.put_file(src, dst)
[docs] def native_cmd(self, command, shell=False, wait=True, verbose=True): """Execute open command. Args: command(str): Command to be executed. shell(bool): Flag specifies whether to use the shell as the program to execute. wait(bool): Flag specifies whether to wait for command output. verbose(bool): Flag specifies whether to print command output. """ if verbose: self.class_logger.debug(command) command = command.split(" ") stderr = PIPE stdout = PIPE process = Popen(command, stdout=stdout, stderr=stderr, shell=shell) if wait: rc = process.wait() so = process.stdout.read() se = process.stderr.read() if verbose: self.class_logger.debug(self.cmd_output_log(so, se)) return so, se, str(rc)