# coding: utf-8 # Copyright (C) 1994-2018 Altair Engineering, Inc. # For more information, contact Altair at www.altair.com. # # This file is part of the PBS Professional ("PBS Pro") software. # # Open Source License Information: # # PBS Pro is free software. You can redistribute it and/or modify it under the # terms of the GNU Affero General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. # See the GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # Commercial License Information: # # For a copy of the commercial license terms and conditions, # go to: (http://www.pbspro.com/UserArea/agreement.html) # or contact the Altair Legal Department. # # Altair’s dual-license business model allows companies, individuals, and # organizations to create proprietary derivative works of PBS Pro and # distribute them - whether embedded or bundled with other software - # under a commercial license agreement. # # Use of Altair’s trademarks, including but not limited to "PBS™", # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's # trademark licensing policies. from tests.functional import * @tags('commands') class TestPbsnodes(TestFunctional): """ This test suite contains regression tests for pbsnodes command. """ def setUp(self): TestFunctional.setUp(self) self.header = ['vnode', 'state', 'OS', 'hardware', 'host', 'queue', 'mem', 'ncpus', 'nmics', 'ngpus', 'comment'] self.pbs_exec = self.server.pbs_conf['PBS_EXEC'] self.pbsnodes = [os.path.join(self.pbs_exec, 'bin', 'pbsnodes')] self.svrname = self.server.pbs_server_name self.hostA = self.moms.values()[0].shortname def common_setUp(self): """ Common setUp for tests test_pbsnodes_as_user and test_pbsnodes_as_root """ TestFunctional.setUp(self) self.server.manager(MGR_CMD_DELETE, NODE, id="", sudo=True, expect=True) self.server.manager(MGR_CMD_CREATE, NODE, id=self.mom.shortname) self.server.expect(NODE, {'state': 'free'}) def get_newnode_attrs(self, user): """ return expected values of attributes on a newly created node """ expect_dict = {} expect_dict[ATTR_NODE_Mom] = self.server.hostname expect_dict[ATTR_NODE_ntype] = 'PBS' expect_dict[ATTR_NODE_state] = 'free' expect_dict[ATTR_rescavail + '.vnode'] = self.server.shortname expect_dict[ATTR_rescavail + '.host'] = self.server.shortname expect_dict[ATTR_NODE_resv_enable] = 'True' if user == 'root': expect_dict[ATTR_version] = self.server.pbs_version expect_dict[ATTR_NODE_Port] = '15002' return expect_dict def verify_node_dynamic_val(self, last_state_change_time, available_ncpus, pcpus, sharing, available_mem): """ verifies node dynamic attributes have expected value """ sharing_list = ['default_shared', 'default_excl', 'default_exclhost', 'ignore_excl', 'force_excl', 'force_exclhost'] # Verify that 'last_state_change_time' has value in datetime format last_state_change_time = str(last_state_change_time) try: time.strptime(last_state_change_time, "%a %b %d %H:%M:%S %Y") except ValueError: self.fail("'last_state_change_time' has value in incorrect format") else: mssg = "'last_state_change_time' has value in correct format" self.logger.info(mssg) # checking resources_avalable.ncpus and pcpus value are positive value ncpus_val = int(available_ncpus) if ncpus_val >= 0: mssg = "resources_available.ncpus have positive int value" self.logger.info(mssg) else: self.fail("resources_available.ncpus have negative value") pcpus_val = int(pcpus) if pcpus_val >= 0: self.logger.info("pcpus have positive int value") else: self.fail("pcpus have negative value") # verify pcpus and ncpus value are same if pcpus_val == ncpus_val: self.logger.info("pcpus and ncpus have same value") else: self.fail("pcpus and ncpus not having same value") # verify node sharing attribute have one of the value in # sharing_val list mssg = "Node sharing attribute not have expected value" self.assertIn(sharing, sharing_list, mssg) # checking resources_avalable.mem value is positive value index = available_mem.find('kb') if int(available_mem[:index]) >= 0: mssg = "resources_available.mem have positive int value" self.logger.info(mssg) else: self.fail("resources_available.mem not having positive int value") def test_pbsnodes_S(self): """ This verifies that 'pbsnodes -S' results in a usage message """ pbsnodes_S = self.pbsnodes + ['-S'] out = self.du.run_cmd(self.svrname, cmd=pbsnodes_S) self.logger.info(out['err'][0]) self.assertIn('usage:', out['err'][ 0], 'usage not found in error message') def test_pbsnodes_S_host(self): """ This verifies that 'pbsnodes -S ' results in an output with correct headers. """ pbsnodes_S_host = self.pbsnodes + ['-S', self.hostA] out1 = self.du.run_cmd(self.svrname, cmd=pbsnodes_S_host) self.logger.info(out1['out']) for hdr in self.header: self.assertIn( hdr, out1['out'][0], "header %s not found in output" % hdr) def test_pbsnodes_aS(self): """ This verifies that 'pbsnodes -aS' results in an output with correct headers. """ pbsnodes_aS = self.pbsnodes + ['-aS'] out2 = self.du.run_cmd(self.svrname, cmd=pbsnodes_aS) self.logger.info(out2['out']) for hdr in self.header: self.assertIn( hdr, out2['out'][0], "header %s not found in output" % hdr) def test_pbsnodes_av(self): """ This verifies the values of last_used_time in 'pbsnodes -av' result before and after server shutdown, once a job submitted. """ j = Job(TEST_USER) j.set_sleep_time(1) jid = self.server.submit(j) self.server.accounting_match("E;%s;" % jid) prev = self.server.status(NODE, 'last_used_time')[0]['last_used_time'] self.logger.info("Restarting server") self.server.restart() self.assertTrue(self.server.isUp(), 'Failed to restart Server Daemon') now = self.server.status(NODE, 'last_used_time')[0]['last_used_time'] self.logger.info("Before: " + prev + ". After: " + now + ".") self.assertEquals(prev.strip(), now.strip(), 'Last used time mismatch after server restart') @skipOnCpuSet @skipOnCray def test_pbsnodes_as_user(self): """ Validate default values of node attributes for non-root user """ self.common_setUp() attr_dict = {} expected_attrs = self.get_newnode_attrs(TEST_USER) command = os.path.join(self.server.pbs_conf['PBS_EXEC'], 'bin', 'pbsnodes -a') ret = self.du.run_cmd(self.server.hostname, command) self.assertEqual(ret['rc'], 0) attr_list = ret['out'] list_len = len(attr_list) - 1 for i in range(1, list_len): attr = attr_list[i].split('=')[0].strip() val = attr_list[i].split('=')[1].strip() attr_dict[attr] = val # comparing the pbsnodes -a output with expected result for attr in expected_attrs: self.assertEqual(expected_attrs[attr], attr_dict[attr]) self.verify_node_dynamic_val(attr_dict['last_state_change_time'], attr_dict['resources_available.ncpus'], attr_dict['pcpus'], attr_dict['sharing'], attr_dict['resources_available.mem']) @tags('smoke') @skipOnCpuSet @skipOnCray def test_pbsnodes_as_root(self): """ Validate default values of node attributes for root user """ self.common_setUp() attr_dict = {} expected_attrs = self.get_newnode_attrs('root') command = os.path.join(self.server.pbs_conf['PBS_EXEC'], 'bin', 'pbsnodes -a') ret = self.du.run_cmd(self.server.hostname, command, sudo=True) self.assertEqual(ret['rc'], 0) attr_list = ret['out'] list_len = len(attr_list) - 1 for i in range(1, list_len): attr = attr_list[i].split('=')[0].strip() val = attr_list[i].split('=')[1].strip() attr_dict[attr] = val # comparing the pbsnodes -a output with expected result for attr in expected_attrs: self.assertEqual(expected_attrs[attr], attr_dict[attr]) self.verify_node_dynamic_val(attr_dict['last_state_change_time'], attr_dict['resources_available.ncpus'], attr_dict['pcpus'], attr_dict['sharing'], attr_dict['resources_available.mem'])