Source code for taf.plugins.pytest_reportingserver

# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""``pytest_reportingserver.py``

`XML-RPC reporting server plugin`

"""

import os
import sys
import time
import errno
from subprocess import Popen
from re import sub as re_sub
from contextlib import suppress
from socket import error as socket_error
from xml.sax.saxutils import escape as xml_escape
from xmlrpc.client import ProtocolError as XmlrpcProtocolError
from abc import ABCMeta, abstractmethod

import pytest

from .pytest_helpers import get_tcname, get_suite_name, get_steps, get_brief
from testlib import loggers
from testlib.xmlrpc_proxy import TimeoutServerProxy as XMLRPCProxy


MODULES = {}


[docs]def imp_plugins(dest): """Import all py modules from <dest> subfolder. """ _list = [os.path.splitext(_m)[0] for _m in os.listdir(os.path.join(os.path.dirname(__file__), dest)) if not _m.startswith("_") and _m.endswith(".py")] sys.path.insert(0, os.path.join(os.path.dirname(__file__), dest)) sys.path.insert(0, os.path.dirname(__file__)) for _m in _list: _module = "{0}.{1}".format(dest, _m) MODULES[_module] = __import__(_m)
imp_plugins("reports_conf")
[docs]class ReportingServerConfigBase(object, metaclass=ABCMeta): """Reporting Server configuration. """
[docs] @abstractmethod def _additional_option(self): """Defining options for Reporting Server. """ pass
[docs]def pytest_addoption(parser): """Plugin specific options. """ [MODULES[_var].ReportingServerConfig._additional_option(parser) for _var in MODULES if "reports_conf." in _var] # pylint: disable=protected-access group = parser.getgroup("Reporting server", "plugin: reporting server") group.addoption("--tc_duration", action="store_true", help="Use to show only duration only Test Case execution. Default = %default") group.addoption("--rs_port", action="store", default=None, help="Bind to the already launched instance of reporting server listenning on port. %default by default.")
[docs]def pytest_configure(config): """Registering plugin. Raises: Exception: not able to connect to the reporting server. """ if_start_server = any([MODULES[_var].ReportingServerConfig._configure(config) for _var in # pylint: disable=protected-access MODULES if "reports_conf." in _var]) if if_start_server: config.reportingserver = ReportingServer(config.option) config.pluginmanager.register(config.reportingserver, "reportingserver") if config.option.rs_port is None: config.reportingserver.launch_server() else: if not config.reportingserver.check_server(3): raise Exception("Cannot connect to reporting server on given port localhost:{0}". format(config.option.rs_port))
[docs]def pytest_unconfigure(config): """Unregistering plugin. """ reportingserver = getattr(config, "reportingserver", None) if reportingserver: # Time sleep for assurance that server add close command to queue and going to dump xml report time.sleep(1) reportingserver.shutdown_server() del config.reportingserver config.pluginmanager.unregister(reportingserver)
[docs]class ReportingServer(object): """Logging xmlrpc server class. """ REPORTINGSRV_PATH = "reporting_server.py" UNDEFINED_BUILD = "Undefined" class_logger = loggers.ClassLogger()
[docs] def __init__(self, opts): """Initialize ReportingServer class. """ self._opts = opts self.rs_port = opts.rs_port self._tc2id_map = {} self.xmlproxy = XMLRPCProxy("http://localhost:{0}".format(self.rs_port), allow_none=True) # _sessionstart launch status flag # _sessionstart should be launched only ones at the first runtest_call self._init_session = False # Setting build name : This is a test. # Setting build name self._buildname = None self.post_queue = [] # duration of Test Case since pytest_setup to longrepr self.detailed_duration = dict() self.platform = 'undetermined' self.build = 'undetermined' # Get os username try: self.os_username = os.environ['SUDO_USER'] except KeyError: self.os_username = os.environ['USER'] self.self_name = "py.test-{0}-{1}".format(self.os_username, os.getpid())
[docs] def buildname(self, env_prop=None): """Return buildname for current session. Args: env_prop(dict): environment information e.g. build, name, etc. Returns: str: buildname """ if self._buildname is not None: return self._buildname try: platform, build = env_prop['chipName'], env_prop['switchppVersion'] except (KeyError, TypeError): message = 'Cannot determine build name' self.class_logger.warning(message) self._buildname = self.UNDEFINED_BUILD else: self.platform = platform self.build = build name_iter = (MODULES[_var].ReportingServerConfig._get_build_name(self._opts) for _var in # pylint: disable=protected-access MODULES if 'reports_conf.' in _var) with suppress(StopIteration): # retain build name from env_prop build = next(name for name in name_iter if name is not None) self._buildname = '{0}-{1}'.format(build, platform) # WORKAROUND to add 'sanity' suffix to buildname if 'sanity' in self._opts.markexpr and self._buildname is not None: self._buildname += "-sanity" # WORKAROUND END return self._buildname
[docs] def shutdown_server(self): """Send xmlrpc request to shutdown xmlrpc server. """ try: ans = self.xmlproxy.shutdown() except socket_error as err: self.class_logger.info("xmlrpc shutdown complete. (DEBUG: {0})".format(err)) except XmlrpcProtocolError as err: self.class_logger.info("xmlrpc shutdown complete. (DEBUG: {0})".format(err)) except Exception as err: self.class_logger.info("xmlrpc shutdown expected error: {0} - {1}".format(type(err), err)) else: self.class_logger.info("xmlrpc shutdown query answer: %s" % (ans, ))
# except socket.error, err: # if err[0] == 111: # print "!"*100 # print "ERR '{0}' handled".format(err) # else: # raise REPORTINGSRV_TIMEOUT = 30
[docs] def launch_server(self, port=None): """Launch xmlrpc server. Args: port(int): port to launch xmlrpc server """ def wait_rc(popen, timeout=30): """Wait until popen finish execution. """ stop = False end_time = time.time() + timeout rc = None while not stop: rc = popen.poll() if time.time() > end_time: stop = True return rc if rc is not None: stop = True return rc else: time.sleep(0.5) logdir = self._opts.logdir if self._opts.logdir is not None else os.curdir # server_path = os.path.join(os.path.dirname(__file__), self.REPORTINGSRV_PATH) server_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../reporting', self.REPORTINGSRV_PATH)) cmd = [ sys.executable, server_path, "--logdir={0}".format(logdir), "--loglevel={0}".format(self._opts.loglevel), "--silent", "--logprefix={0}".format(os.path.splitext(self.REPORTINGSRV_PATH)[0]), ] if port: cmd.append("--port={0}".format(port)) popen = Popen(cmd) self.class_logger.debug("reporting server parent process PID - {0}".format(popen.pid)) rc = wait_rc(popen) if rc != 0: message = "Failed to start XMLRPC server with command: {0}. Return code: {1}".format(" ".join(cmd), rc) self.class_logger.error(message) raise Exception(message) retry = 0 _vr_path = os.path.join("/tmp", "{0}.pid".format(popen.pid)) while port is None and retry < self.REPORTINGSRV_TIMEOUT: try: with open(_vr_path, "r") as _vr: # Read server port number from file port = int(_vr.read()) except IOError as err: if err.errno == errno.ENOENT: self.class_logger.warning("Sleeping until %s exists", _vr_path) retry += 1 time.sleep(1) except Exception as err: self.class_logger.error("Cannot determinate reporting server port.") self.class_logger.error("Failed to open pid/port file {0}. Error:\n{1}".format(_vr_path, err)) raise self.class_logger.info("Reporting server is listening on port {0}".format(port)) self.rs_port = port self.xmlproxy = XMLRPCProxy("http://localhost:{0}".format(self.rs_port), allow_none=True) # Wait until xmlrpc server starts processing requests if not self.check_server(timeout=60): message = "XMLRPC Server does not respond." self.class_logger.error(message) raise Exception(message)
[docs] def check_server(self, timeout=1): """Check if xmlrpc server is alive. Args: timeout(int): timeout for server verification """ ans = None end_time = time.time() + timeout while time.time() <= end_time: try: ans = self.xmlproxy.ping() except socket_error: time.sleep(1) except Exception as err: self.class_logger.warning("Unexpected type of error while checking xmlrpc server - {0} - {1}".format(type(err), err)) time.sleep(1) else: if ans == "XMLReportingServer": return True else: message = "Unknown xmlrpc server is running on localhost:18080" self.class_logger.error(message) raise Exception(message) return False
[docs] def server_cmd(self, cmd, args, retry=3): """Send XML query to server with retry and exception handling. Args: cmd(str): command name args(list): command arguments retry(int): retry count """ success = False _i = 1 try: while not success and _i <= retry: _i += 1 try: getattr(self.xmlproxy, cmd)(*args) except socket_error: self.class_logger.info("XMLRPC query {0}({1}) failed on {2} retry.".format(cmd, str(args), _i)) time.sleep(2 * _i) except KeyboardInterrupt: self.class_logger.debug("Caught KeyboardInterrupt on server_cmd.") self.pytest_sessionfinish(session=self) else: success = True except Exception as err: self.class_logger.error("XMLRPC query {0}({1}) failed on {2} retry. Unexpected error type {3}: {4}". format(cmd, str(args), _i, type(err), err)) if not success: self.class_logger.warning("XMLRPC query {0}({1}) failed.".format(cmd, str(args)))
[docs] def _send_post_request(self, item): """Send post request to reporting server or add it to queue. Args: item(pytest.Item): test case item """ tc_name = get_tcname(item) try: env_prop = item.config.env.env_prop except AttributeError: buildname = self.UNDEFINED_BUILD else: buildname = self.buildname(env_prop) suite_name = get_suite_name(item.nodeid) info = {"brief": get_brief(item, tc_name), "description": get_steps(item, tc_name)} if self.post_queue: self._send_post_queue(item, buildname) self.server_cmd("post", [self.self_name, buildname, suite_name, tc_name, "Run", "", info, self._get_build_info(item)])
[docs] def pytest_runtest_setup(self, item): """Add info about test case start time. Args: item(pytest.Item): test case item """ if not item.config.option.tc_duration: self.detailed_duration[item.nodeid] = dict() self.detailed_duration[item.nodeid]['setup'] = time.time() if self._buildname is not None: self._send_post_request(item)
[docs] @pytest.mark.tryfirst def pytest_runtest_call(self, item): """Create TC instance and send it to the Reporting Server. Args: item(pytest.Item): test case item """ if not item.config.option.tc_duration: self.detailed_duration[item.nodeid]['call'] = time.time() if self._buildname is None: self.buildname(item.config.env.env_prop) if self._buildname is not None and not self._init_session: self._sessionstart(item) self._init_session = True self._send_post_request(item)
[docs] def _send_post_queue(self, item=None, buildname=None, sanity=False): """Send info about test execution to the Reporting Server. Args: item(pytest.Item): test case item buildname(str): buildname sanity(bool): True if sanity test """ if buildname is None: buildname = 'undetermined' if sanity: buildname += "-sanity" self.server_cmd("post", [self.self_name, buildname, "", "", "Run", "", "", self._get_build_info(item)]) for post_req in self.post_queue: post_req[1] = buildname # Add empty description and brief. In synapsert new TC won't create post_req.append(('', '')) post_req.append(self._get_build_info(item)) self.server_cmd("post", post_req) self.post_queue[:] = []
[docs] def _get_build_info(self, item=None): """Get info about build. Args: item(pytest.Item): test case item Returns: dict{"platform": str, "build": str}: build info """ if item is not None and item.config and hasattr(item.config, 'env')\ and item.config.env and "chipName" in item.config.env.env_prop \ and "switchppVersion" in item.config.env.env_prop and self.platform == 'undetermined': self.platform = item.config.env.env_prop["chipName"] self.build = item.config.env.env_prop["switchppVersion"] return {'platform': self.platform, 'build': self.build}
[docs] def pytest_runtest_logreport(self, report): """Send update TC run status to the Reporting Server. Args: report(pytest.BaseReport): pytets report """ status = None if report.passed: # ignore setup/teardown if report.when == "call": status = "Passed" elif report.failed: if report.when in ["setup", "teardown"]: status = "Error" else: status = "Failed" elif report.skipped: status = "Skipped" # status = "Blocked" if not status and hasattr(report, 'monitor'): status = "Monitor" if status is not None: _report = {} _report['longrepr'] = "" _report['when'] = report.when if hasattr(report, "longrepr"): # Remove all bash escape sequences _report['longrepr'] = xml_escape(re_sub(r"\x1b.*?m", "", str(report.longrepr))) # longrepr = xml_unescape(re_sub(r"\x1b.*?m", "", report['longrepr'])) if hasattr(report, "keywords") and "xfail" in report.keywords: _report['keywords'] = {} # TODO: check xfail in keywords because now it's number _report['keywords']['xfail'] = report.keywords['xfail'] if hasattr(report, "sections"): _report['sections'] = [] for i in range(len(report.sections)): if isinstance(report.sections[i], str): _report['sections'].append(xml_escape(report.sections[i])) # _report['sections'] = report.sections if hasattr(report, "duration"): if not self._opts.tc_duration and self.detailed_duration.get(report.nodeid) and self.detailed_duration.get(report.nodeid).get('call'): _report['detailed_duration'] = dict() _report['detailed_duration']['setup'] = \ self.detailed_duration.get(report.nodeid).get('call') - self.detailed_duration.get(report.nodeid).get('setup') _report['detailed_duration']['longrepr'] = time.time() - self.detailed_duration.get(report.nodeid).get('call') - report.duration _report['duration'] = report.duration if hasattr(report, "retval"): _report['retval'] = report.retval if hasattr(report, "monitor"): _report['monitor'] = report.monitor tc_name = get_tcname(report) suite_name = get_suite_name(report.nodeid) if self.buildname() is not None: self.server_cmd("post", [self.self_name, self.buildname(), suite_name, tc_name, status, _report, "", self._get_build_info()]) else: self.post_queue.append([self.self_name, self.buildname(), suite_name, tc_name, status, _report])
[docs] def _sessionstart(self, item): """Tell to XMLRPC Server that we are going to interact with it. Args: item(pytest.Item): test case item """ self.class_logger.info("Configuring reporting server...") self.server_cmd("open", [self.self_name]) for _var in MODULES: if "reports_conf." in _var: commands = MODULES[_var].ReportingServerConfig._sessionstart( # pylint: disable=protected-access self.class_logger, item, self.self_name, self.buildname(item.config.env.env_prop)) for comm in commands: self.server_cmd(*comm)
# Order TM reporting to server. # Order and configure XML report to server.
[docs] def pytest_sessionfinish(self, session): """Tell to XMLRPC Server that we have finished interaction. Args: session(pytest.Session): test session """ _buildname = self.buildname() if _buildname is None: if not self._init_session: self._sessionstart(session) if self.post_queue: if 'sanity' in self._opts.markexpr: self._send_post_queue(session, sanity=True) else: self._send_post_queue(session) self.class_logger.warning("Cannot determinate buildname. Probably test setup is failed. Skipping report.close step.") self.server_cmd("close", [self.self_name])
[docs] def pytest_keyboard_interrupt(self, excinfo): """Handle KeyboardInterrupt. Args: excinfo(py.code.ExceptionInfo): exception info """ self.class_logger.debug("Caught KeyboardInterrupt on pytest_hook.") self.pytest_sessionfinish(session=self)