# coding: utf-8 # Copyright (C) 1994-2018 Altair Engineering, Inc. # For more information, contact Altair at www.altair.com. # # This file is part of the PBS Professional ("PBS Pro") software. # # Open Source License Information: # # PBS Pro is free software. You can redistribute it and/or modify it under the # terms of the GNU Affero General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. # See the GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # Commercial License Information: # # For a copy of the commercial license terms and conditions, # go to: (http://www.pbspro.com/UserArea/agreement.html) # or contact the Altair Legal Department. # # Altair’s dual-license business model allows companies, individuals, and # organizations to create proprietary derivative works of PBS Pro and # distribute them - whether embedded or bundled with other software - # under a commercial license agreement. # # Use of Altair’s trademarks, including but not limited to "PBS™", # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's # trademark licensing policies. import unittest import logging import subprocess import pwd import grp import os import sys import platform import socket import time import calendar import ptl from ptl.utils.pbs_logutils import PBSLogAnalyzer from ptl.utils.pbs_dshutils import DshUtils from ptl.utils.pbs_cliutils import CliUtils from ptl.utils.pbs_procutils import ProcMonitor from ptl.lib.pbs_testlib import * try: from ptl.utils.plugins.ptl_test_tags import tags except ImportError: def tags(*args, **kwargs): pass try: from nose.plugins.skip import SkipTest except ImportError: class SkipTest(Exception): pass # Test users/groups are expected to exist on the test systems # User running the tests and the test users should have passwordless sudo # access configured to avoid interrupted (queries for password) test runs # Groups TSTGRP0 = PbsGroup('tstgrp00', gid=1900) TSTGRP1 = PbsGroup('tstgrp01', gid=1901) TSTGRP2 = PbsGroup('tstgrp02', gid=1902) TSTGRP3 = PbsGroup('tstgrp03', gid=1903) TSTGRP4 = PbsGroup('tstgrp04', gid=1904) TSTGRP5 = PbsGroup('tstgrp05', gid=1905) TSTGRP6 = PbsGroup('tstgrp06', gid=1906) TSTGRP7 = PbsGroup('tstgrp07', gid=1907) GRP_PBS = PbsGroup('pbs', gid=901) GRP_AGT = PbsGroup('agt', gid=1146) ROOT_GRP = PbsGroup(grp.getgrgid(0).gr_name, gid=0) # Users # first group from group list is primary group of user TEST_USER = PbsUser('pbsuser', uid=4359, groups=[TSTGRP0]) TEST_USER1 = PbsUser('pbsuser1', uid=4361, groups=[TSTGRP0, TSTGRP1, TSTGRP2]) TEST_USER2 = PbsUser('pbsuser2', uid=4362, groups=[TSTGRP0, TSTGRP1, TSTGRP3]) TEST_USER3 = PbsUser('pbsuser3', uid=4363, groups=[TSTGRP0, TSTGRP1, TSTGRP4]) TEST_USER4 = PbsUser('pbsuser4', uid=4364, groups=[TSTGRP1, TSTGRP4, TSTGRP5]) TEST_USER5 = PbsUser('pbsuser5', uid=4365, groups=[TSTGRP2, TSTGRP4, TSTGRP6]) TEST_USER6 = PbsUser('pbsuser6', uid=4366, groups=[TSTGRP3, TSTGRP4, TSTGRP7]) TEST_USER7 = PbsUser('pbsuser7', uid=4368, groups=[TSTGRP1]) OTHER_USER = PbsUser('pbsother', uid=4358, groups=[TSTGRP0, TSTGRP2, GRP_PBS, GRP_AGT]) PBSTEST_USER = PbsUser('pbstest', uid=4355, groups=[TSTGRP0, TSTGRP2, GRP_PBS, GRP_AGT]) TST_USR = PbsUser('tstusr00', uid=11000, groups=[TSTGRP0]) TST_USR1 = PbsUser('tstusr01', uid=11001, groups=[TSTGRP0]) BUILD_USER = PbsUser('pbsbuild', uid=9000, groups=[TSTGRP0]) DATA_USER = PbsUser('pbsdata', uid=4372, groups=[TSTGRP0]) MGR_USER = PbsUser('pbsmgr', uid=4367, groups=[TSTGRP0]) OPER_USER = PbsUser('pbsoper', uid=4356, groups=[TSTGRP0, TSTGRP2, GRP_PBS, GRP_AGT]) ADMIN_USER = PbsUser('pbsadmin', uid=4357, groups=[TSTGRP0, TSTGRP2, GRP_PBS, GRP_AGT]) PBSROOT_USER = PbsUser('pbsroot', uid=4371, groups=[TSTGRP0, TSTGRP2]) ROOT_USER = PbsUser('root', uid=0, groups=[ROOT_GRP]) PBS_USERS = (TEST_USER, TEST_USER1, TEST_USER2, TEST_USER3, TEST_USER4, TEST_USER5, TEST_USER6, TEST_USER7, OTHER_USER, PBSTEST_USER, TST_USR, TST_USR1) PBS_GROUPS = (TSTGRP0, TSTGRP1, TSTGRP2, TSTGRP3, TSTGRP4, TSTGRP5, TSTGRP6, TSTGRP7, GRP_PBS, GRP_AGT) PBS_OPER_USERS = (OPER_USER,) PBS_MGR_USERS = (MGR_USER, ADMIN_USER) PBS_DATA_USERS = (DATA_USER,) PBS_ROOT_USERS = (PBSROOT_USER, ROOT_USER) PBS_BUILD_USERS = (BUILD_USER,) SETUPLOG = 'setuplog' TEARDOWNLOG = 'teardownlog' SMOKE = 'smoke' REGRESSION = 'regression' NUMNODES = 'numnodes' TIMEOUT_KEY = '__testcase_timeout__' MINIMUM_TESTCASE_TIMEOUT = 600 def skip(reason="Skipped test execution"): """ Unconditionally skip a test. :param reason: Reason for the skip :type reason: str or None """ skip_flag = True def wrapper(test_item): test_item.__unittest_skip__ = skip_flag test_item.__unittest_skip_why__ = reason return test_item return wrapper def timeout(val): """ Decorator to set timeout value of test case """ logger = logging.getLogger(__name__) old_val = None if val < MINIMUM_TESTCASE_TIMEOUT: old_val = val val = MINIMUM_TESTCASE_TIMEOUT def wrapper(obj): msg = 'for test ' + obj.func_name msg += ' minimum-testcase-timeout updated to ' msg += str(val) + ' from ' + str(old_val) if old_val: logger.info(msg) setattr(obj, TIMEOUT_KEY, int(val)) return obj return wrapper def checkModule(modname): """ Decorator to check if named module is available on the system and if not skip the test """ def decorated(function): def wrapper(self, *args, **kwargs): import imp try: imp.find_module(modname) except ImportError: self.skipTest(reason='Module unavailable ' + modname) else: function(self, *args, **kwargs) wrapper.__doc__ = function.__doc__ wrapper.__name__ = function.__name__ return wrapper return decorated def skipOnCray(function): """ Decorator to skip a test on a ``Cray`` system """ def wrapper(self, *args, **kwargs): if self.mom.is_cray(): self.skipTest(reason='capability not supported on Cray') else: function(self, *args, **kwargs) wrapper.__doc__ = function.__doc__ wrapper.__name__ = function.__name__ return wrapper def skipOnCpuSet(function): """ Decorator to skip a test on a CpuSet system """ def wrapper(self, *args, **kwargs): if self.mom.is_cpuset_mom(): self.skipTest(reason='capability not supported on Cpuset') else: function(self, *args, **kwargs) wrapper.__doc__ = function.__doc__ wrapper.__name__ = function.__name__ return wrapper class PBSServiceInstanceWrapper(dict): """ A wrapper class to handle multiple service ``(i.e., mom, server, scheduler)``instances as passed along through the test harness ``(pbs_benchpress)``.Returns an ordered dictionary of PBS service instances ``(i.e., mom/server/ scheduler)`` Users may invoke PTL using pointers to multiple services per host, for example: ``pbs_benchpress -p moms=hostA@/etc/pbs.conf,hostB,hostA@/etc/pbs.conf3`` In such cases, the moms instance variable must be able to distinguish the ``self.moms['hostA']`` instances, each instance will be mapped to a unique configuration file """ def __init__(self, *args, **kwargs): super(self.__class__, self).__init__(self, *args, **kwargs) self.orderedlist = super(self.__class__, self).keys() def __setitem__(self, key, value): super(self.__class__, self).__setitem__(key, value) if key not in self.orderedlist: self.orderedlist.append(key) def __getitem__(self, key): for k, v in self.items(): if k == key: return v if '@' in k: name, _ = k.split('@') if key in name: return v else: name = k # Users may have specified shortnames instead of FQDN, in order # to not enforce that PBS_SERVER match the hostname passed in as # parameter, we check if a shortname matches a FQDN entry if '.' in key and key.split('.')[0] in name: return v if '.' in name and name.split('.')[0] in key: return v return None def __contains__(self, key): if key in self.keys(): return True for k in self.keys(): if '@' in k: name, _ = k.split('@') if key in name: return True else: name = k # Users may have specified shortnames instead of FQDN, in order # to not enforce that PBS_SERVER match the hostname passed in as # parameter, we check if a shortname matches a FQDN entry if '.' in key and key.split('.')[0] in name: return True if '.' in name and name.split('.')[0] in key: return True return False def __iter__(self): return iter(self.orderedlist) def host_keys(self): return map(lambda h: h.split('@')[0], self.keys()) def keys(self): return self.orderedlist def itervalues(self): return (self[key] for key in self.orderedlist) def values(self): return [self[key] for key in self.orderedlist] class setUpClassError(Exception): pass class tearDownClassError(Exception): pass class PBSTestSuite(unittest.TestCase): """ Generic ``setup``, ``teardown``, and ``logging`` functions to be used as parent class for most tests. Class instantiates: ``server object connected to localhost`` ``scheduler objected connected to localhost`` ``mom object connected to localhost`` Custom parameters: :param server: The hostname on which the PBS ``server/scheduler`` are running :param mom: The hostname on which the PBS MoM is running :param servers: Colon-separated list of hostnames hosting a PBS server. Servers are then accessible as a dictionary in the instance variable servers. :param client: For CLI mode only, name of the host on which the PBS client commands are to be run from. Format is ``@`` :param moms: Colon-separated list of hostnames hosting a PBS MoM. MoMs are made accessible as a dictionary in the instance variable moms. :param comms: Colon-separated list of hostnames hosting a PBS Comm. Comms are made accessible as a dictionary in the instance variable comms. :param nomom=\:...: expect no MoM on given set of hosts :param mode: Sets mode of operation to PBS server. Can be either ``'cli'`` or ``'api'``.Defaults to API behavior. :param conn_timeout: set a timeout in seconds after which a pbs_connect IFL call is refreshed (i.e., disconnected) :param skip-setup: Bypasses setUp of PBSTestSuite (not custom ones) :param skip-teardown: Bypasses tearDown of PBSTestSuite (not custom ones) :param procinfo: Enables process monitoring thread, logged into ptl_proc_info test metrics. The value can be set to _all_ to monitor all PBS processes,including ``pbs_server``, ``pbs_sched``, ``pbs_mom``, or a process defined by name. :param revert-to-defaults=: if False, will not revert to defaults.True by default. :param revert-hooks=: if False, do not revert hooks to defaults.Defaults to True. ``revert-to-defaults`` set to False overrides this setting. :param del-hooks=: If False, do not delete hooks. Defaults to False.``revert-to-defaults`` set to False overrides this setting. :param revert-queues=: If False, do not revert queues to defaults.Defaults to True. ``revert-to-defaults`` set to False overrides this setting. :param revert-resources=: If False, do not revert resources to defaults. Defaults to True. ``revert-to-defaults`` set to False overrides this setting. :param del-queues=: If False, do not delete queues. Defaults to False.``revert-to-defaults`` set to Falseoverrides this setting. :param del-vnodes=: If False, do not delete vnodes on MoM instances.Defaults to True. :param server-revert-to-defaults=: if False, don't revert Server to defaults :param comm-revert-to-defaults=: if False, don't revert Comm to defaults :param mom-revert-to-defaults=: if False, don't revert MoM to defaults :param sched-revert-to-defaults=: if False, don't revert Scheduler to defaults :param procmon: Enables process monitoring. Multiple values must be colon separated. For example to monitor ``server``, ``sched``, and ``mom`` use ``procmon=pbs_server:pbs_sched:pbs_mom`` :param procmon-freq: Sets a polling frequency for the process monitoring tool.Defaults to 10 seconds. :param test-users: colon-separated list of users to use as test users. The users specified override the default users in the order in which they appear in the ``PBS_USERS`` list. :param default-testcase-timeout: Default test case timeout value. :param data-users: colon-separated list of data users. :param oper-users: colon-separated list of operator users. :param mgr-users: colon-separated list of manager users. :param root-users: colon-separated list of root users. :param build-users: colon-separated list of build users. :param clienthost: the hostnames to set in the MoM config file """ logger = logging.getLogger(__name__) metrics_data = {} measurements = [] additional_data = {} conf = {} param = None du = DshUtils() _procmon = None _process_monitoring = False revert_to_defaults = True server_revert_to_defaults = True mom_revert_to_defaults = True sched_revert_to_defaults = True revert_queues = True revert_resources = True revert_hooks = True del_hooks = True del_queues = True del_scheds = True del_vnodes = True server = None scheduler = None mom = None comm = None servers = None schedulers = {} scheds = None moms = None comms = None @classmethod def setUpClass(cls): cls.log_enter_setup(True) cls._testMethodName = 'setUpClass' cls.parse_param() cls.init_param() cls.check_users_exist() cls.init_servers() cls.init_comms() cls.init_schedulers() cls.init_moms() cls.log_end_setup(True) def setUp(self): if 'skip-setup' in self.conf: return self.log_enter_setup() self.init_proc_mon() self.revert_pbsconf() self.revert_servers() self.revert_comms() self.revert_schedulers() self.revert_moms() self.log_end_setup() self.measurements = [] @classmethod def log_enter_setup(cls, iscls=False): _m = ' Entered ' + cls.__name__ + ' setUp' if iscls: _m += 'Class' _m_len = len(_m) cls.logger.info('=' * _m_len) cls.logger.info(_m) cls.logger.info('=' * _m_len) @classmethod def log_end_setup(cls, iscls=False): _m = 'Completed ' + cls.__name__ + ' setUp' if iscls: _m += 'Class' _m_len = len(_m) cls.logger.info('=' * _m_len) cls.logger.info(_m) cls.logger.info('=' * _m_len) @classmethod def _validate_param(cls, pname): """ Check if parameter was enabled at the ``command-line`` :param pname: parameter name :type pname: str :param pvar: class variable to set according to command-line setting """ if pname not in cls.conf: return if cls.conf[pname] in PTL_TRUE: setattr(cls, pname.replace('-', '_'), True) else: setattr(cls, pname.replace('-', '_'), False) @classmethod def _set_user(cls, name, user_list): if name in cls.conf: for idx, u in enumerate(cls.conf[name].split(':')): user_list[idx].__init__(u) @classmethod def check_users_exist(cls): """ Check whether the user is exist or not """ testusersexist = True for u in [TEST_USER, TEST_USER1, TEST_USER2, TEST_USER3]: rv = cls.du.check_user_exists(str(u)) if not rv: _msg = 'User ' + str(u) + ' does not exist!' raise setUpClassError(_msg) return testusersexist @classmethod def kicksched_action(cls, server, obj_type, *args, **kwargs): """ custom scheduler action to kick a scheduling cycle when expectig a job state change """ if server is None: cls.logger.error('no server defined for custom action') return if obj_type == JOB: if (('scheduling' in server.attributes) and (server.attributes['scheduling'] != 'False')): server.manager(MGR_CMD_SET, MGR_OBJ_SERVER, {'scheduling': 'True'}, level=logging.DEBUG) @classmethod def parse_param(cls): """ get test configuration parameters as a ``comma-separated`` list of attributes. Attributes may be ``'='`` separated key value pairs or standalone entries. ``Multi-property`` attributes are colon-delimited. """ if cls.param is None: return for h in cls.param.split(','): if '=' in h: k, v = h.split('=') cls.conf[k.strip()] = v.strip() else: cls.conf[h.strip()] = '' if (('clienthost' in cls.conf) and not isinstance(cls.conf['clienthost'], list)): cls.conf['clienthost'] = cls.conf['clienthost'].split(':') users_map = [('test-users', PBS_USERS), ('oper-users', PBS_OPER_USERS), ('mgr-users', PBS_MGR_USERS), ('data-users', PBS_DATA_USERS), ('root-users', PBS_ROOT_USERS), ('build-users', PBS_BUILD_USERS)] for k, v in users_map: cls._set_user(k, v) @classmethod def init_param(cls): cls._validate_param('revert-to-defaults') cls._validate_param('server-revert-to-defaults') cls._validate_param('comm-revert-to-defaults') cls._validate_param('mom-revert-to-defaults') cls._validate_param('sched-revert-to-defaults') cls._validate_param('del-hooks') cls._validate_param('revert-hooks') cls._validate_param('del-queues') cls._validate_param('del-vnodes') cls._validate_param('revert-queues') cls._validate_param('revert-resources') if 'default-testcase-timeout' not in cls.conf.keys(): cls.conf['default_testcase_timeout'] = MINIMUM_TESTCASE_TIMEOUT else: cls.conf['default_testcase_timeout'] = int( cls.conf['default-testcase-timeout']) @classmethod def is_server_licensed(cls, server): """ Check if server is licensed or not """ for i in range(0, 10, 1): lic = server.status(SERVER, 'license_count', level=logging.INFOCLI) if lic and 'license_count' in lic[0]: lic = PbsTypeLicenseCount(lic[0]['license_count']) if ('Avail_Nodes' in lic) and (int(lic['Avail_Nodes']) > 0): return True elif (('Avail_Sockets' in lic) and (int(lic['Avail_Sockets']) > 0)): return True elif (('Avail_Global' in lic) and (int(lic['Avail_Global']) > 0)): return True elif ('Avail_Local' in lic) and (int(lic['Avail_Local']) > 0): return True time.sleep(i) return False @classmethod def init_from_conf(cls, conf, single=None, multiple=None, skip=None, func=None): """ Helper method to parse test parameters for`` mom/server/scheduler`` instances. The supported format of each service request is: ``hostname@configuration/path`` For example: ``pbs_benchpress -p server=remote@/etc/pbs.conf.12.0`` initializes a remote server instance that is configured according to the remote file ``/etc/pbs.conf.12.0`` """ endpoints = [] if ((multiple in conf) and (conf[multiple] is not None)): __objs = conf[multiple].split(':') for _m in __objs: tmp = _m.split('@') if len(tmp) == 2: endpoints.append(tuple(tmp)) elif len(tmp) == 1: endpoints.append((tmp[0], None)) elif ((single in conf) and (conf[single] is not None)): tmp = conf[single].split('@') if len(tmp) == 2: endpoints.append(tuple(tmp)) elif len(tmp) == 1: endpoints.append((tmp[0], None)) else: endpoints = [(socket.gethostname(), None)] objs = PBSServiceInstanceWrapper() for name, objconf in endpoints: if ((skip is not None) and (skip in conf) and ((name in conf[skip]) or (conf[skip] in name))): continue if objconf is not None: n = name + '@' + objconf else: n = name if getattr(cls, "server", None) is not None: objs[n] = func(name, pbsconf_file=objconf, server=cls.server.hostname) else: objs[n] = func(name, pbsconf_file=objconf) if objs[n] is None: _msg = 'Failed %s(%s, %s)' % (func.__name__, name, objconf) raise setUpClassError(_msg) objs[n].initialise_service() return objs @classmethod def init_servers(cls, init_server_func=None, skip=None): """ Initialize servers """ if init_server_func is None: init_server_func = cls.init_server if 'servers' in cls.conf: if 'comms' not in cls.conf: cls.conf['comms'] = cls.conf['servers'] if 'schedulers' not in cls.conf: cls.conf['schedulers'] = cls.conf['servers'] if 'moms' not in cls.conf: cls.conf['moms'] = cls.conf['servers'] if 'server' in cls.conf: if 'comm' not in cls.conf: cls.conf['comm'] = cls.conf['server'] if 'scheduler' not in cls.conf: cls.conf['scheduler'] = cls.conf['server'] if 'mom' not in cls.conf: cls.conf['mom'] = cls.conf['server'] cls.servers = cls.init_from_conf(conf=cls.conf, single='server', multiple='servers', skip=skip, func=init_server_func) if cls.servers: cls.server = cls.servers.values()[0] @classmethod def init_comms(cls, init_comm_func=None, skip=None): """ Initialize comms """ if init_comm_func is None: init_comm_func = cls.init_comm cls.comms = cls.init_from_conf(conf=cls.conf, single='comm', multiple='comms', skip=skip, func=init_comm_func) if cls.comms: cls.comm = cls.comms.values()[0] @classmethod def init_schedulers(cls, init_sched_func=None, skip=None): """ Initialize schedulers """ if init_sched_func is None: init_sched_func = cls.init_scheduler cls.scheds = cls.init_from_conf(conf=cls.conf, single='scheduler', multiple='schedulers', skip=skip, func=init_sched_func) for sched in cls.scheds.values(): if sched.server.name in cls.schedulers: continue else: cls.schedulers[sched.server.name] = sched.server.schedulers # creating a short hand for current host server.schedulers cls.scheds = cls.server.schedulers cls.scheduler = cls.scheds['default'] @classmethod def init_moms(cls, init_mom_func=None, skip='nomom'): """ Initialize moms """ if init_mom_func is None: init_mom_func = cls.init_mom cls.moms = cls.init_from_conf(conf=cls.conf, single='mom', multiple='moms', skip=skip, func=init_mom_func) if cls.moms: cls.mom = cls.moms.values()[0] cls.server.moms = cls.moms @classmethod def init_server(cls, hostname, pbsconf_file=None): """ Initialize a server instance Define custom expect action to trigger a scheduling cycle when job is not in running state :returns: The server instance on success and None on failure """ client = hostname client_conf = None if 'client' in cls.conf: _cl = cls.conf['client'].split('@') client = _cl[0] if len(_cl) > 1: client_conf = _cl[1] server = Server(hostname, pbsconf_file=pbsconf_file, client=client, client_pbsconf_file=client_conf) server._conn_timeout = 0 if cls.conf is not None: if 'mode' in cls.conf: if cls.conf['mode'] == 'cli': server.set_op_mode(PTL_CLI) if 'conn_timeout' in cls.conf: conn_timeout = int(cls.conf['conn_timeout']) server.set_connect_timeout(conn_timeout) sched_action = ExpectAction('kicksched', True, JOB, cls.kicksched_action) server.add_expect_action(action=sched_action) return server @classmethod def init_comm(cls, hostname, pbsconf_file=None, server=None): """ Initialize a Comm instance associated to the given hostname. This method must be called after init_server :param hostname: The host on which the Comm is running :type hostname: str :param pbsconf_file: Optional path to an alternate pbs config file :type pbsconf_file: str or None :returns: The instantiated Comm upon success and None on failure. :param server: The server name associated to the Comm :type server: str Return the instantiated Comm upon success and None on failure. """ try: server = cls.servers[server] except: server = None return Comm(hostname, pbsconf_file=pbsconf_file, server=server) @classmethod def init_scheduler(cls, hostname, pbsconf_file=None, server=None): """ Initialize a Scheduler instance associated to the given server. This method must be called after ``init_server`` :param server: The server name associated to the scheduler :type server: str :param pbsconf_file: Optional path to an alternate config file :type pbsconf_file: str or None :param hostname: The host on which Sched is running :type hostname: str :returns: The instantiated scheduler upon success and None on failure """ try: server = cls.servers[server] except: server = None return Scheduler(hostname=hostname, server=server, pbsconf_file=pbsconf_file) @classmethod def init_mom(cls, hostname, pbsconf_file=None, server=None): """ Initialize a ``MoM`` instance associated to the given hostname. This method must be called after ``init_server`` :param hostname: The host on which the MoM is running :type hostname: str :param pbsconf_file: Optional path to an alternate pbs config file :type pbsconf_file: str or None :returns: The instantiated MoM upon success and None on failure. """ try: server = cls.servers[server] except: server = None return MoM(hostname, pbsconf_file=pbsconf_file, server=server) def init_proc_mon(self): """ Initialize process monitoring when requested """ if 'procmon' in self.conf: _proc_mon = [] for p in self.conf['procmon'].split(':'): _proc_mon += ['.*' + p + '.*'] if _proc_mon: if 'procmon-freq' in self.conf: freq = int(self.conf['procmon-freq']) else: freq = 10 self.start_proc_monitor(name='|'.join(_proc_mon), regexp=True, frequency=freq) self._process_monitoring = True def _get_dflt_pbsconfval(self, conf, svr_hostname, hosttype, hostobj): """ Helper function to revert_pbsconf, tries to determine and return default value for the pbs.conf variable given :param conf: the pbs.conf variable :type conf: str :param svr_hostname: hostname of the server host :type svr_hostname: str :param hosttype: type of host being reverted :type hosttype: str :param hostobj: PTL object associated with the host :type hostobj: PBSService :return default value of the pbs.conf variable if it can be determined as a string, otherwise None """ if conf == "PBS_SERVER": return svr_hostname elif conf == "PBS_START_SCHED": if hosttype == "server": return "1" else: return "0" elif conf == "PBS_START_COMM": if hosttype == "comm": return "1" else: return "0" elif conf == "PBS_START_SERVER": if hosttype == "server": return "1" else: return "0" elif conf == "PBS_START_MOM": if hosttype == "mom": return "1" else: return "0" elif conf == "PBS_CORE_LIMIT": return "unlimited" elif conf == "PBS_SCP": scppath = self.du.which(hostobj.hostname, "scp") if scppath != "scp": return scppath return None def _revert_pbsconf_comm(self, primary_server, vals_to_set): """ Helper function to revert_pbsconf to revert all comm daemons' pbs.conf :param primary_server: object of the primary PBS server :type primary_server: PBSService :param vals_to_set: dict of pbs.conf values to set :type vals_to_set: dict """ svr_hostnames = [svr.hostname for svr in self.servers.values()] for comm in self.comms.values(): if comm.hostname in svr_hostnames: continue new_pbsconf = dict(vals_to_set) restart_comm = False pbs_conf_val = self.du.parse_pbs_config(comm.hostname) if not pbs_conf_val: raise ValueError("Could not parse pbs.conf on host %s" % (comm.hostname)) # to start with, set all keys in new_pbsconf with values from the # existing pbs.conf keys_to_delete = [] for conf in new_pbsconf: if conf in pbs_conf_val: new_pbsconf[conf] = pbs_conf_val[conf] else: # existing pbs.conf doesn't have a default variable set # Try to determine the default val = self._get_dflt_pbsconfval(conf, primary_server.hostname, "comm", comm) if val is None: self.logger.error("Couldn't revert %s in pbs.conf" " to its default value" % (conf)) keys_to_delete.append(conf) else: new_pbsconf[conf] = val for key in keys_to_delete: del(new_pbsconf[key]) # Set the comm start bit to 1 if new_pbsconf["PBS_START_COMM"] != "1": new_pbsconf["PBS_START_COMM"] = "1" restart_comm = True # Set PBS_CORE_LIMIT, PBS_SCP and PBS_SERVER if new_pbsconf["PBS_CORE_LIMIT"] != "unlimited": new_pbsconf["PBS_CORE_LIMIT"] = "unlimited" restart_comm = True if new_pbsconf["PBS_SERVER"] != primary_server.hostname: new_pbsconf["PBS_SERVER"] = primary_server.hostname restart_comm = True if "PBS_SCP" not in new_pbsconf: scppath = self.du.which(comm.hostname, "scp") if scppath != "scp": new_pbsconf["PBS_SCP"] = scppath restart_comm = True # Check if existing pbs.conf has more/less entries than the # default list if len(pbs_conf_val) != len(new_pbsconf): restart_comm = True if restart_comm: self.du.set_pbs_config(comm.hostname, confs=new_pbsconf) comm.pbs_conf = new_pbsconf comm.pi.initd(comm.hostname, "restart", daemon="comm") def _revert_pbsconf_mom(self, primary_server, vals_to_set): """ Helper function to revert_pbsconf to revert all mom daemons' pbs.conf :param primary_server: object of the primary PBS server :type primary_server: PBSService :param vals_to_set: dict of pbs.conf values to set :type vals_to_set: dict """ svr_hostnames = [svr.hostname for svr in self.servers.values()] for mom in self.moms.values(): if mom.hostname in svr_hostnames: continue new_pbsconf = dict(vals_to_set) restart_mom = False pbs_conf_val = self.du.parse_pbs_config(mom.hostname) if not pbs_conf_val: raise ValueError("Could not parse pbs.conf on host %s" % (mom.hostname)) # to start with, set all keys in new_pbsconf with values from the # existing pbs.conf keys_to_delete = [] for conf in new_pbsconf: if conf in pbs_conf_val: new_pbsconf[conf] = pbs_conf_val[conf] else: # existing pbs.conf doesn't have a default variable set # Try to determine the default val = self._get_dflt_pbsconfval(conf, primary_server.hostname, "mom", mom) if val is None: self.logger.error("Couldn't revert %s in pbs.conf" " to its default value" % (conf)) keys_to_delete.append(conf) else: new_pbsconf[conf] = val for key in keys_to_delete: del(new_pbsconf[key]) # Set the mom start bit to 1 if (new_pbsconf["PBS_START_MOM"] != "1"): new_pbsconf["PBS_START_MOM"] = "1" restart_mom = True # Set PBS_CORE_LIMIT, PBS_SCP and PBS_SERVER if new_pbsconf["PBS_CORE_LIMIT"] != "unlimited": new_pbsconf["PBS_CORE_LIMIT"] = "unlimited" restart_mom = True if new_pbsconf["PBS_SERVER"] != primary_server.hostname: new_pbsconf["PBS_SERVER"] = primary_server.hostname restart_mom = True if "PBS_SCP" not in new_pbsconf: scppath = self.du.which(mom.hostname, "scp") if scppath != "scp": new_pbsconf["PBS_SCP"] = scppath restart_mom = True # Check if existing pbs.conf has more/less entries than the # default list if len(pbs_conf_val) != len(new_pbsconf): restart_mom = True if restart_mom: self.du.set_pbs_config(mom.hostname, confs=new_pbsconf, append=False) mom.pbs_conf = new_pbsconf mom.pi.initd(mom.hostname, "restart", daemon="mom") def _revert_pbsconf_server(self, vals_to_set): """ Helper function to revert_pbsconf to revert all servers' pbs.conf :param vals_to_set: dict of pbs.conf values to set :type vals_to_set: dict """ for server in self.servers.values(): new_pbsconf = dict(vals_to_set) cmds_to_exec = [] dmns_to_restart = 0 restart_pbs = False pbs_conf_val = self.du.parse_pbs_config(server.hostname) if not pbs_conf_val: raise ValueError("Could not parse pbs.conf on host %s" % (server.hostname)) # to start with, set all keys in new_pbsconf with values from the # existing pbs.conf keys_to_delete = [] for conf in new_pbsconf: if conf in pbs_conf_val: new_pbsconf[conf] = pbs_conf_val[conf] else: # existing pbs.conf doesn't have a default variable set # Try to determine the default val = self._get_dflt_pbsconfval(conf, server.hostname, "server", server) if val is None: self.logger.error("Couldn't revert %s in pbs.conf" " to its default value" % (conf)) keys_to_delete.append(conf) else: new_pbsconf[conf] = val for key in keys_to_delete: del(new_pbsconf[key]) # Set all start bits if (new_pbsconf["PBS_START_SERVER"] != "1"): new_pbsconf["PBS_START_SERVER"] = "1" dmns_to_restart += 1 cmds_to_exec.append(["server", "start"]) if (new_pbsconf["PBS_START_SCHED"] != "1"): new_pbsconf["PBS_START_SCHED"] = "1" cmds_to_exec.append(["sched", "start"]) dmns_to_restart += 1 if self.moms and server.hostname not in self.moms: if new_pbsconf["PBS_START_MOM"] != "0": new_pbsconf["PBS_START_MOM"] = "0" cmds_to_exec.append(["mom", "stop"]) dmns_to_restart += 1 else: if (new_pbsconf["PBS_START_MOM"] != "1"): new_pbsconf["PBS_START_MOM"] = "1" cmds_to_exec.append(["mom", "start"]) dmns_to_restart += 1 if self.comms and server.hostname not in self.comms: if new_pbsconf["PBS_START_COMM"] != "0": new_pbsconf["PBS_START_COMM"] = "0" cmds_to_exec.append(["comm", "stop"]) else: if (new_pbsconf["PBS_START_COMM"] != "1"): new_pbsconf["PBS_START_COMM"] = "1" cmds_to_exec.append(["comm", "start"]) dmns_to_restart += 1 if dmns_to_restart == 4: # If all daemons need to be started again, just restart PBS # instead of making PTL start each of them one at a time restart_pbs = True # Set PBS_CORE_LIMIT, PBS_SCP and PBS_SERVER if new_pbsconf["PBS_CORE_LIMIT"] != "unlimited": new_pbsconf["PBS_CORE_LIMIT"] = "unlimited" restart_pbs = True if new_pbsconf["PBS_SERVER"] != server.shortname: new_pbsconf["PBS_SERVER"] = server.shortname restart_pbs = True if "PBS_SCP" not in new_pbsconf: scppath = self.du.which(server.hostname, "scp") if scppath != "scp": new_pbsconf["PBS_SCP"] = scppath restart_pbs = True # Check if existing pbs.conf has more/less entries than the # default list if len(pbs_conf_val) != len(new_pbsconf): restart_pbs = True if restart_pbs or dmns_to_restart > 0: # Write out the new pbs.conf file self.du.set_pbs_config(server.hostname, confs=new_pbsconf, append=False) server.pbs_conf = new_pbsconf if restart_pbs: # Restart all server.pi.restart(server.hostname) else: for initcmd in cmds_to_exec: # start/stop the particular daemon server.pi.initd(server.hostname, initcmd[1], daemon=initcmd[0]) def revert_pbsconf(self): """ Revert contents of the pbs.conf file Also start/stop the appropriate daemons """ primary_server = self.server vals_to_set = { "PBS_HOME": None, "PBS_EXEC": None, "PBS_SERVER": None, "PBS_START_SCHED": None, "PBS_START_COMM": None, "PBS_START_SERVER": None, "PBS_START_MOM": None, "PBS_CORE_LIMIT": None, "PBS_SCP": None } self._revert_pbsconf_server(vals_to_set) self._revert_pbsconf_mom(primary_server, vals_to_set) self._revert_pbsconf_comm(primary_server, vals_to_set) def revert_servers(self, force=False): """ Revert the values set for servers """ for server in self.servers.values(): self.revert_server(server, force) def revert_comms(self, force=False): """ Revert the values set for comms """ for comm in self.comms.values(): self.revert_comm(comm, force) def revert_schedulers(self, force=False): """ Revert the values set for schedulers """ for scheds in self.schedulers.values(): if 'default' in scheds: self.revert_scheduler(scheds['default'], force) def revert_moms(self, force=False): """ Revert the values set for moms """ self.del_all_nodes = True for mom in self.moms.values(): self.revert_mom(mom, force) def revert_server(self, server, force=False): """ Revert the values set for server """ rv = server.isUp() if not rv: self.logger.error('server ' + server.hostname + ' is down') server.start() msg = 'Failed to restart server ' + server.hostname self.assertTrue(server.isUp(), msg) server_stat = server.status(SERVER)[0] current_user = pwd.getpwuid(os.getuid())[0] try: # Unset managers list server.manager(MGR_CMD_UNSET, SERVER, 'managers', sudo=True, expect=True) except PbsManagerError as e: self.logger.error(e.msg) a = {ATTR_managers: (INCR, current_user + '@*')} server.manager(MGR_CMD_SET, SERVER, a, sudo=True) if ((self.revert_to_defaults and self.server_revert_to_defaults) or force): server.revert_to_defaults(reverthooks=self.revert_hooks, delhooks=self.del_hooks, revertqueues=self.revert_queues, delqueues=self.del_queues, delscheds=self.del_scheds, revertresources=self.revert_resources, server_stat=server_stat) rv = self.is_server_licensed(server) _msg = 'No license found on server %s' % (server.shortname) self.assertTrue(rv, _msg) self.logger.info('server: %s licensed', server.hostname) def revert_comm(self, comm, force=False): """ Revert the values set for comm """ rv = comm.isUp() if not rv: self.logger.error('comm ' + comm.hostname + ' is down') comm.start() msg = 'Failed to restart comm ' + comm.hostname self.assertTrue(comm.isUp(), msg) def revert_scheduler(self, scheduler, force=False): """ Revert the values set for scheduler """ rv = scheduler.isUp() if not rv: self.logger.error('scheduler ' + scheduler.hostname + ' is down') scheduler.start() msg = 'Failed to restart scheduler ' + scheduler.hostname self.assertTrue(scheduler.isUp(), msg) if ((self.revert_to_defaults and self.sched_revert_to_defaults) or force): rv = scheduler.revert_to_defaults() _msg = 'Failed to revert sched %s' % (scheduler.hostname) self.assertTrue(rv, _msg) def revert_mom(self, mom, force=False): """ Revert the values set for mom :param mom: the MoM object whose values are to be reverted :type mom: MoM object :param force: Option to reverse forcibly :type force: bool """ rv = mom.isUp() if not rv: self.logger.error('mom ' + mom.hostname + ' is down') mom.start() msg = 'Failed to restart mom ' + mom.hostname self.assertTrue(mom.isUp(), msg) mom.pbs_version() if ((self.revert_to_defaults and self.mom_revert_to_defaults) or force): rv = mom.revert_to_defaults(delvnodedefs=self.del_vnodes) _msg = 'Failed to revert mom %s' % (mom.hostname) self.assertTrue(rv, _msg) if 'clienthost' in self.conf: mom.add_config({'$clienthost': self.conf['clienthost']}) a = {'state': 'free', 'resources_available.ncpus': (GE, 1)} nodes = self.server.counter(NODE, a, attrop=PTL_AND, level=logging.DEBUG) if not nodes: try: self.server.manager(MGR_CMD_DELETE, NODE, None, '') except: pass mom.delete_vnode_defs() mom.delete_vnodes() mom.restart() self.logger.info('server: no nodes defined, creating one') self.server.manager(MGR_CMD_CREATE, NODE, None, mom.shortname) name = mom.shortname if mom.platform == 'cray' or mom.platform == 'craysim': # delete all nodes(@default) on first call of revert_mom # and create all nodes specified by self.moms one by one try: if self.del_all_nodes: self.server.manager(MGR_CMD_DELETE, NODE, None, '') self.del_all_nodes = False except: pass self.server.manager(MGR_CMD_CREATE, NODE, None, name) else: try: self.server.status(NODE, id=name) except PbsStatusError: # server doesn't have node with shortname # check with hostname name = mom.hostname try: self.server.status(NODE, id=name) except PbsStatusError: # server doesn't have node for this mom yet # so create with shortname name = mom.shortname self.server.manager(MGR_CMD_CREATE, NODE, None, mom.shortname) self.server.expect(NODE, {ATTR_NODE_state: 'free'}, id=name, interval=1) return mom def analyze_logs(self): """ analyze accounting and scheduler logs from time test was started until it finished """ pla = PBSLogAnalyzer() self.metrics_data = pla.analyze_logs(serverlog=self.server.logfile, schedlog=self.scheduler.logfile, momlog=self.mom.logfile, acctlog=self.server.acctlogfile, start=self.server.ctime, end=int(time.time())) def set_test_measurements(self, mdic=None): """ set dictionary of analytical results of the test in order to include it in test report :param mdic: dictionary with analytical data :type mdic: dict :returns: True on successful append or False on failure """ if not (mdic and isinstance(mdic, dict)): return False self.measurements.append(mdic) return True def add_additional_data_to_report(self, datadic=None): """ set dictionary that will be merged with the test report for the overall test run :param datadic: dictionary with analytical data :type datadic: dict :returns: True on succssful update or False on failure """ if not (datadic and isinstance(datadic, dict)): return False self.additional_data.update(datadic) return True def start_proc_monitor(self, name=None, regexp=False, frequency=60): """ Start the process monitoring :param name: Process name :type name: str or None :param regexp: Regular expression to match :type regexp: bool :param frequency: Frequency of monitoring :type frequency: int """ if self._procmon is not None: self.logger.info('A process monitor is already instantiated') return self.logger.info('starting process monitoring of ' + name + ' every ' + str(frequency) + 'seconds') self._procmon = ProcMonitor(name=name, regexp=regexp, frequency=frequency) self._procmon.start() def stop_proc_monitor(self): """ Stop the process monitoring """ if not self._process_monitoring: return self.logger.info('stopping process monitoring') self._procmon.stop() self.metrics_data['procs'] = self._procmon.db_proc_info self._process_monitoring = False def skipTest(self, reason=None): """ Skip Test :param reason: message to indicate why test is skipped :type reason: str or None """ if reason: self.logger.warning('test skipped: ' + reason) else: reason = 'unknown' raise SkipTest(reason) skip_test = skipTest @classmethod def log_enter_teardown(cls, iscls=False): _m = ' Entered ' + cls.__name__ + ' tearDown' if iscls: _m += 'Class' _m_len = len(_m) cls.logger.info('=' * _m_len) cls.logger.info(_m) cls.logger.info('=' * _m_len) @classmethod def log_end_teardown(cls, iscls=False): _m = 'Completed ' + cls.__name__ + ' tearDown' if iscls: _m += 'Class' _m_len = len(_m) cls.logger.info('=' * _m_len) cls.logger.info(_m) cls.logger.info('=' * _m_len) def tearDown(self): """ verify that ``server`` and ``scheduler`` are up clean up jobs and reservations """ if 'skip-teardown' in self.conf: return self.log_enter_teardown() self.stop_proc_monitor() self.log_end_teardown() @classmethod def tearDownClass(cls): cls._testMethodName = 'tearDownClass'