# Copyright (c) 2011 - 2017, Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""``powerboard.py``
`Functionality related to Power boards which support SNMP actions`
"""
import time
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.proto import rfc1902
from testlib.custom_exceptions import CustomException
[docs]class SnmpPowerControl(object):
[docs] def __init__(self, config):
"""Initialize SnmpPowerControl class.
"""
super(SnmpPowerControl, self).__init__()
self.pw_board = config.get("pwboard_host", "")
self.pw_status_oid = config.get("pw_status_oid", "")
self.pw_action_oid = config.get("pw_action_oid", "")
self.pw_on_cmd = str(config.get("pw_on_cmd", "1"))
self.pw_off_cmd = str(config.get("pw_off_cmd", "0"))
self.pw_port = config.get("pwboard_port", "")
self.powercycle_timeout = config.get('reboot_latency', 1)
self.pwboard_snmp_rw_community_string = config.get('pwboard_snmp_rw_community_string', 'private')
self.pw_snmp_service_port = config.get("pw_snmp_service_port", 161)
self.power_status_map = {self.pw_on_cmd: 'On',
self.pw_off_cmd: 'Off'}
[docs] def power_off(self):
"""Perform power Off of device"""
port_action_oid = tuple([int(x) for x in self.pw_action_oid.split('.') + [self.pw_port]])
self.snmpset(port_action_oid, self.pw_off_cmd)
[docs] def power_on(self):
"""Perform power On of device"""
port_action_oid = tuple([int(x) for x in self.pw_action_oid.split('.') + [self.pw_port]])
self.snmpset(port_action_oid, self.pw_on_cmd)
[docs] def power_cycle(self):
"""Perform power cycle of device"""
self.power_off()
time.sleep(self.powercycle_timeout)
self.power_on()
[docs] def get_power_status(self):
"""Get Power status of device on power board
Returns:
(str): 'On'|'Off'
"""
port_status_oid = tuple([int(x) for x in self.pw_status_oid.split('.') + [self.pw_port]])
port_status = self.snmpget(port_status_oid)
return self.power_status_map[port_status]
[docs] def snmpget(self, snmp_get_oid):
"""Returns snmpget result connected to specified port on specified host via SNMP ()
Args:
snmp_get_oid(tuple): SNMP OID
Returns:
(str): SNMP get result
"""
errorIndication, errorStatus, _, varBinds = \
cmdgen.CommandGenerator().getCmd(
cmdgen.CommunityData('my-agent', self.pwboard_snmp_rw_community_string, 0),
cmdgen.UdpTransportTarget((self.pw_board, self.pw_snmp_service_port)),
snmp_get_oid,
)
if errorIndication or errorStatus != 0 or not varBinds:
raise CustomException("Error on SNMP get: OID: '{}'"
"errorIndication: '{}', "
"errorStatus: '{}', "
"returned data: '{}'".format(snmp_get_oid, errorIndication, errorStatus, varBinds))
data = varBinds[0][-1].prettyPrint()
return data
[docs] def snmpset(self, snmp_set_oid, snmp_set_value, snmp_set_type='INTEGER'):
"""Perform snmpset for specified OID to specified value
Args:
snmp_set_oid(tuple): SNMP OID
snmp_set_value(str): SNMP OID
snmp_set_type(str): SNMP SET Data Type
"""
if snmp_set_type.upper() == "INTEGER":
def set_type(x):
return rfc1902.Integer(int(x))
else:
set_type = rfc1902.OctetString
errorIndication, errorStatus, _, _ = \
cmdgen.CommandGenerator().setCmd(
cmdgen.CommunityData('my-agent', self.pwboard_snmp_rw_community_string, 0),
cmdgen.UdpTransportTarget((self.pw_board, self.pw_snmp_service_port)),
(snmp_set_oid, set_type(snmp_set_value)),
)
if errorIndication or errorStatus != 0:
raise CustomException("Error on SNMP set to value '{}': OID: '{}'"
"errorIndication: '{}', "
"errorStatus: '{}'".format(snmp_set_value, snmp_set_oid, errorIndication, errorStatus))