Source code for taf.testlib.clitelnet

# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""``clitelnet.py``

`Basic telnet class with command oriented functionality`

"""

import telnetlib
import time
import re

from .cli_template import CLIGenericMixin
from .cli_template import CmdStatus
from .custom_exceptions import CLITelnetException
from . import loggers


# TODO: cmd retry
# TODO: Add logout/exit command for proper closing telnet connections
[docs]class TelnetCMD(CLIGenericMixin): """HighLevel telnet command oriented class. Unused parameters added to support the same interface for other CLI classes. Examples:: client = TelnetCMD("1.1.1.1", 22) client.login("username", "paSSword") """ class_logger = loggers.ClassLogger()
[docs] def __init__(self, host=None, port=23, username=None, password=None, page_break="--More--", prompt=None, pass_prompt="Password: ", sudo_prompt=None, timeout=10, login_prompt="login: ", page_break_lines=3, exit_cmd=None, quiet=False): """Initialize TelnetCMD class. Args: host(str): Target host IP address. port(int): SSH port. username(str): SSH login user. password(str): SSH user password. page_break(str): Page brake marker. prompt(str, list[str]): Shell prompt or list of shell prompts. pass_prompt(str): Login password prompt. sudo_prompt(str): Sudo password prompt. timeout(int): Default timeout for commands. login_prompt(str): Login prompt. page_break_lines(int): Number of page brake lines. exit_cmd(str): Command to perform telnet exit (str). quiet(bool): Flag for return code verification. """ super(TelnetCMD, self).__init__() self.host = host self.port = port self.user = username.encode('utf-8') self.password = password.encode('utf-8') self.prompt = prompt.encode('utf-8') self.pass_prompt = pass_prompt.encode('utf-8') self.login_prompt = login_prompt.encode('utf-8') self.page_break = page_break self.page_break_lines = page_break_lines self.timeout = timeout self.telnet_obj = None self.prompt_stack = [] self.exit_cmd = exit_cmd # Prompt for sudo password. self.sudo_pass_prompt = None # Global alternatives for commands. self.alternatives_global = [] self.login_status = False self.connect_status = False self.sudoprompt = sudo_prompt # Default action: raise an exception if command's exit code isn't 0 or not. self.quiet = quiet
[docs] def connect(self, with_login=True, wait_login=5, alternatives=None, wait_prompt=True, socket_closed=False): """Create telnet connection and do login if necessary. Args: with_login(bool): Perform login procedure or not. If param isn't set try automatically determine login necessity. (True|False|None) wait_login(int): time to wait login before sending <Enter>. <Enter> is necessary if login is already appeared. alternatives(tuple): Tuples of ("expected line", "action if line is found", <Exit execution? (bool)>, <Use ones? (bool)>). action can be: - str - in case this is just command; - function - callable object to execute without parameters; wait_prompt(bool): Wait for prompt message or not. socket_closed(bool): Determines if socket has been closed or not. Raises: Exception: self.host is None; self.prompt is None Returns: str: telnet stdout """ if not self.host: raise Exception("Remote host was not defined.") telnet_output = [] if socket_closed: self.telnet_obj.open(self.host, self.port) else: self.telnet_obj = telnetlib.Telnet(self.host, self.port, self.timeout) if with_login is None: prompt_re = re.compile(re.escape(self.prompt)) login_prompt_re = re.compile(re.escape(self.login_prompt)) _output = (-1, None, None) if wait_login > 0: _output = self.telnet_obj.expect([prompt_re, login_prompt_re], timeout=wait_login) if _output[0] == -1: self.telnet_obj.write(b"\n") _output = self.telnet_obj.expect([prompt_re, login_prompt_re], timeout=self.timeout) if _output[0] == 1: with_login = True else: with_login = False if with_login: _output = self.login(wait_login=wait_login, alternatives=alternatives, connect=False) telnet_output += _output else: if wait_prompt: if not self.prompt: raise Exception("Prompt isn't defined. Please set the prompt.") # Send <Enter> to ensure that prompt is appeared self.telnet_obj.write(b"\n") _output = self.telnet_obj.read_until(self.prompt, self.timeout) telnet_output += _output else: # Skip reading output. Just connect. pass self.connect_status = True self.telnet_obj_prep = self.prepare_telnet_obj(self.telnet_obj) # pylint: disable=attribute-defined-outside-init return telnet_output
[docs] def _check_telnet_obj(self): """Check if telnet object exists (connection is established). Raises: Exception: telnet connection is not established """ if not (self.telnet_obj and self._check_telnet_obj_connection()): raise Exception("Connection to host has not been established yet.")
[docs] def check_shell(self): """Check if CLI connection is alive. Raises: CLITelnetException: telnet connection is not established; user is not logged in """ # Why are we raising execptions, return False if not self.telnet_obj or not self._check_telnet_obj_connection(): return False else: if not self.login_status: return False return True
[docs] def close(self): """Close CLI object connection. """ self.telnet_obj.close()
[docs] def close_shell(self): """Close interactive CLI shell on existing connection. """ self.close()
[docs] def shell_read(self, timeout=0, interval=0.1): """Read data from output buffer. Args: timeout(int): Increases time to read data from output buffer. interval(int): Time delay between attempts to read data from output buffer. """ self.check_shell() data = b"" # The following loop has to be executed at least one time. end_time = time.time() + timeout end_flag = False self.telnet_obj.write(b"\n") while not end_flag: data += self.telnet_obj.read_very_eager() if time.time() >= end_time: end_flag = True else: time.sleep(interval) return data.decode('utf-8', errors='ignore')
[docs] def send_command(self, command): """Run command without waiting response. Args: command(str): Command to be executed. """ self.check_shell() self.class_logger.debug("{0}@{1}: {2}".format(self.user, self.host, command)) self.telnet_obj.write(command.encode('utf-8') + b"\n")
[docs] def open_shell(self, raw_output=False): """Call login method. Added to support other CLI objects interface. Args: raw_output(bool): Flag whether to read output buffer. Raises: CLITelnetException: telnet connection is not established Returns: str: telnet output """ data = "" if self._check_telnet_obj_connection(): if self.check_shell(): self.login_status = True if not raw_output: data = self.shell_read(2) return data else: self.login() if not raw_output: data = self.shell_read(2) return data else: raise CLITelnetException("Telnet object has no connection")
[docs] def _check_telnet_obj_connection(self): """Verify if telnet connection exists. Returns: bool: True if telnet connection exists """ flag = True try: if not self.telnet_obj.sock: return False if isinstance(self.telnet_obj.sock, int): return False self.telnet_obj.sock.sendall(telnetlib.IAC + telnetlib.DONT + telnetlib.NOP) except Exception as err: self.class_logger.error("Telnet connection failure: %s", err) flag = False return flag
[docs] def login(self, username=None, password=None, timeout=None, wait_login=0, alternatives=None, connect=True): """Do CLI object login procedure. Args: username(str): Host login (string). password(str): Host password(string). timeout(int): Time to execute login procedure (integer). wait_login(int): time to wait login prompt before sending <Enter>. <Enter> is necessary if login prompt has been already appeared before connection is established. alternatives(list of tuples): list of alternative prompts and actions. connect(bool): Flag if connection should be established before login procedure (bool). Returns: None Raises: Exception: username is not defined CLITelnetException: login timeout exceeded, unexpected login prompt """ if not self.user and not username: raise Exception("User login name was not defined.") if not self.connect_status and connect: self.connect(with_login=False, wait_prompt=False) if self.telnet_obj: if self.telnet_obj.sock == 0: self.connect(with_login=False, wait_prompt=False, socket_closed=True) self._check_telnet_obj() telnet_output = [] err = False alternatives = alternatives or [] expect_list = [re.compile(re.escape(self.login_prompt)), ] action_list = [(self.user if not username else username, False if self.password or password else True), ] if self.password or password: expect_list.append(re.compile(re.escape(self.pass_prompt))) action_list.append((self.password if self.password else password, True)) for alter_item in alternatives: expect_list.append(re.compile(alter_item[0])) action_list.append((alter_item[1].encode('utf-8'), alter_item[2])) for alter_item in self.alternatives_global: expect_list.append(re.compile(alter_item[0])) action_list.append((alter_item[1].encode('utf-8'), alter_item[2])) enter_sent = False stop_flag = False while not stop_flag: _output = self.telnet_obj.expect(expect_list, wait_login or self.timeout) telnet_output.append(_output[1]) self.class_logger.debug("Output: %s" % (_output[1], )) if _output[0] == -1: # Reset wait_login to set default timeout for next iteration. if wait_login > 0: wait_login = 0 if not enter_sent: # Send <Enter> to ensure that login prompt appeared. self.telnet_obj.write(b"\n") enter_sent = True else: # If <Enter> is already sent and still no response - set error flag. err = True stop_flag = True continue self.class_logger.debug("Action: %s" % (action_list[_output[0]], )) self.telnet_obj.write(action_list[_output[0]][0] + b"\n") stop_flag = action_list[_output[0]][1] if err: message = "Telnet timeout exceeded. Expected prompt did not appear during %s seconds.\nTelnet output: %s" %\ (self.timeout, "\n".join(telnet_output)) self.class_logger.error(message) raise CLITelnetException(message) elif self.prompt: _output = self.telnet_obj.read_until(self.prompt, self.timeout) telnet_output.append(_output) # Verification that login is successful if prompt isn't defined else: # time.sleep is used to wait until data appear in buffer time.sleep(2) # _output = self.telnet_obj.read_very_eager() telnet_output.append(_output) _output_1 = self.telnet_obj.read_very_eager() self.prompt = _output_1.split("\n")[-1] self.telnet_obj.write("\n") # time.sleep is used to wait until data appear in buffer time.sleep(2) _output = self.telnet_obj.read_very_eager() last_line = _output.splitlines()[-1] if self.prompt != last_line: self.class_logger.error(telnet_output) raise CLITelnetException("Login prompt is not as expected: {0} != {1}".format(self.prompt, last_line)) self.login_status = True return telnet_output
[docs] def exit(self, wait_close=True): """Do telnet exit/logout procedure. Wait until connection closed. Args: wait_close(bool): Flag specifies whether to verify successful exit and to return output data. """ self._check_telnet_obj() telnet_output = [] # Send <Enter> to ensure that login prompt is appeared self.telnet_obj.write("\n") _output = self.telnet_obj.read_until(self.prompt, self.timeout) telnet_output.append(_output) self.telnet_obj.write("%s\n" % (self.exit_cmd, )) if not wait_close: self.telnet_obj.read_eager() return None end_time = time.time() + self.timeout while time.time() <= end_time: try: _output = self.telnet_obj.read_eager() telnet_output.append(_output) time.sleep(0.3) except EOFError: return telnet_output return None
[docs] def disconnect(self, with_exit=True): """Do disconnect. Args: with_exit(bool): Flag specifies whether perform exit procedure. """ if self.telnet_obj: try: if with_exit and self.exit_cmd is not None: # print "TELNETCMD: Try to do exit()" self.exit() finally: self.telnet_obj.close() self.class_logger.debug("Connection to %s:%s is closed." % (self.host, self.port))
[docs] def shell_command(self, command, alternatives=None, timeout=None, sudo=False, ret_code=True, new_prompt=None, expected_rc="0", quiet=None, raw_output=False, interval=0.1): """Run interactive command on previously created shell (tty). Args: command(str): Command to be executed. alternatives(tuple): Tuples of ("expected line", "action if line is found", <Exit execution? (bool)>, <Use ones? (bool)>). action can be: - str - in case this is just command; - function - callable object to execute without parameters; timeout(int): Expecting timeout. sudo(bool): Flag if sudo should be added to the list of alternatives. ret_code(bool): Flag if return code should be added to the list of alternatives. expected_rc(int): Sets return code and verifies if return code of executed command the same as expected return code (int or str). quiet(bool): Flag to verify if expected return equals expected. raw_output(bool): Flag whether to return 'pure' output. interval(int | float): Interval between read data cycles. new_prompt(str): Prompt which will replace current prompt after successful mode changing. Raises: CLITelnetException: unexpected return code """ self._check_telnet_obj() self.class_logger.debug("{0}@{1}: {2}".format(self.user, self.host, command)) if timeout is None: timeout = self.timeout if quiet is None: quiet = self.quiet if isinstance(expected_rc, int): expected_rc = str(expected_rc) data = "" return_code = None command, alternatives, end_pattern = self.prepare_alter(command, alternatives, sudo=sudo, ret_code=ret_code) # Expand alternatives for telnet specific login. alternatives.append((self.sudoprompt, str(self.password), False, False)) if new_prompt: alternatives.append((new_prompt, None, True, False)) self.telnet_obj.write(command.encode('utf-8') + b"\n") data = self.action_on_expect(self.telnet_obj_prep, alternatives, timeout, interval) if not raw_output: data, return_code = self.normalize_output(data, command, ret_code, end_pattern) self.class_logger.debug("Command output:\n{0}".format(data)) if ret_code and not quiet: if return_code != expected_rc: raise CLITelnetException("Command return unexpected return code: {0}".format(return_code)) return data, return_code
[docs] def exec_command(self, command, timeout=None, sudo=False, ret_code=False): """Execute command without shell (tty). Args: command(str): Command to be executed. timeout(int): Timeout for command execution. sudo(bool): Flag if sudo should be added to the list of alternatives. ret_code(bool): Flag if return code should be added to the list of alternatives (bool). Returns: tuple(str, str, int): output, "", return code """ output, rc = self.shell_command(command, timeout=timeout, sudo=sudo, ret_code=ret_code) # no stderr so use empty string return CmdStatus(output, "", rc)
[docs] def enter_mode(self, cmd=None, new_prompt=None): """Enter config/priv or other mode with specific prompt. Args: cmd(str): Command to change mode. new_prompt(str): Prompt which will replace current prompt after successful mode changing. Raises: Exception: undefined prompt, unexpected new prompt Note: After success execution current prompt will be replaced with new propmt and saved in prompt_stack. """ if not self.prompt: message = "Prompt isn't defined. Please set the prompt." raise Exception(message) self._check_telnet_obj() telnet_output, error = self.shell_command(cmd, ret_code=False, new_prompt=new_prompt) if error: message = "Cannot enter priv mode. Command: %s. Expected prompt: %s. Error: %s. Last output: %s" % (cmd, new_prompt, error, telnet_output, ) self.class_logger.log(loggers.levels['ERROR'], message) raise Exception(message) else: self.prompt_stack.append(self.prompt) self.prompt = new_prompt return self.prompt
[docs] def exit_mode(self, exit_cmd=None): """Exit config/priv or other mode with specific prompt. Args: exit_cmd(str): Command to exit from current mode. Raises: Exception: undefined prompt, unexpected new prompt """ if not self.prompt: message = "Prompt isn't defined. Please set the prompt." raise Exception(message) self._check_telnet_obj() prev_prompt = self.prompt_stack[-1] telnet_output, error = self.shell_command(exit_cmd, ret_code=False, new_prompt=prev_prompt) if error: message = 'Cannot exit priv mode. Command: %s. Expected prompt: %s. Error: %s. Last output: %s' % (exit_cmd, prev_prompt, error, telnet_output) self.class_logger.log(loggers.levels['ERROR'], message) raise Exception(message) else: self.prompt = prev_prompt self.prompt_stack.pop() return self.prompt
[docs] def _normalize_output(self, output=None, cmd=None, prompt=None): """Remove everything from the response except the actual command output. Args: output(str): Output data to be normalized. cmd(str): Command which was used for program execution. prompt(str): Prompt which will be removed form output data. """ lines = output.splitlines(True) # If command was echoed back, remove it from the output if cmd in lines[0]: lines.pop(0) # Remove the last element, which is the prompt or page break marker being displayed again if len(lines) > 0 and lines[-1] == prompt: lines.pop() if len(lines) > 0 and lines[-1] == self.page_break: for _ in range(self.page_break_lines): lines.pop() # Restore output return "".join(lines)
[docs] def put_file(self): """This method isn't supported by telnetlib. Raises: CLITelnetException: unsupported """ raise CLITelnetException("cli_telnet object doesn't support put_file method ")
[docs] def get_file(self): """This method isn't supported by telnetlib. Raises: CLITelnetException: unsupported """ raise CLITelnetException("cli_telnet object doesn't support get_file method ")