# coding: utf-8 # Copyright (C) 1994-2018 Altair Engineering, Inc. # For more information, contact Altair at www.altair.com. # # This file is part of the PBS Professional ("PBS Pro") software. # # Open Source License Information: # # PBS Pro is free software. You can redistribute it and/or modify it under the # terms of the GNU Affero General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. # See the GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # Commercial License Information: # # For a copy of the commercial license terms and conditions, # go to: (http://www.pbspro.com/UserArea/agreement.html) # or contact the Altair Legal Department. # # Altair’s dual-license business model allows companies, individuals, and # organizations to create proprietary derivative works of PBS Pro and # distribute them - whether embedded or bundled with other software - # under a commercial license agreement. # # Use of Altair’s trademarks, including but not limited to "PBS™", # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's # trademark licensing policies. from tests.functional import * class TestPbsAccumulateRescUsed(TestFunctional): """ This tests the feature in PBS that enables mom hooks to accumulate resources_used values for resources beside cput, cpupercent, and mem. This includes accumulation of custom resources. The mom hooks supported this feature are: exechost_periodic, execjob_prologue, and execjob_epilogue. PRE: Have a cluster of PBS with 3 mom hosts, with an exechost_startup that adds custom resources. POST: When a job ends, accounting_logs reflect the aggregated resources_used values. And with job_history_enable=true, one can do a 'qstat -x -f ' to obtain information of a previous job. """ # Class variables def setUp(self): TestFunctional.setUp(self) self.logger.info("len moms = %d" % (len(self.moms))) if len(self.moms) != 3: usage_string = 'test requires 3 MoMs as input, ' + \ 'use -p moms=::' self.skip_test(usage_string) # PBSTestSuite returns the moms passed in as parameters as dictionary # of hostname and MoM object self.momA = self.moms.values()[0] self.momB = self.moms.values()[1] self.momC = self.moms.values()[2] self.momA.delete_vnode_defs() self.momB.delete_vnode_defs() self.momC.delete_vnode_defs() self.hostA = self.momA.shortname self.hostB = self.momB.shortname self.hostC = self.momC.shortname rc = self.server.manager(MGR_CMD_DELETE, NODE, None, "") self.assertEqual(rc, 0) rc = self.server.manager(MGR_CMD_CREATE, NODE, id=self.hostA) self.assertEqual(rc, 0) rc = self.server.manager(MGR_CMD_CREATE, NODE, id=self.hostB) self.assertEqual(rc, 0) rc = self.server.manager(MGR_CMD_CREATE, NODE, id=self.hostC) self.assertEqual(rc, 0) # Give the moms a chance to contact the server. self.server.expect(NODE, {'state': 'free'}, id=self.hostA) self.server.expect(NODE, {'state': 'free'}, id=self.hostB) self.server.expect(NODE, {'state': 'free'}, id=self.hostC) # First set some custom resources via exechost_startup hook. startup_hook_body = """ import pbs e=pbs.event() localnode=pbs.get_local_nodename() e.vnode_list[localnode].resources_available['foo_i'] = 7 e.vnode_list[localnode].resources_available['foo_f'] = 5.0 e.vnode_list[localnode].resources_available['foo_str'] = "seventyseven" """ hook_name = "start" a = {'event': "exechost_startup", 'enabled': 'True'} rv = self.server.create_import_hook( hook_name, a, startup_hook_body, overwrite=True) self.assertTrue(rv) self.momA.signal("-HUP") self.momB.signal("-HUP") self.momC.signal("-HUP") a = {'job_history_enable': 'True'} self.server.manager(MGR_CMD_SET, SERVER, a) # Next set some custom resources via qmgr -c 'create resource' attr = {} attr['type'] = 'string' attr['flag'] = 'h' r = 'foo_str2' rc = self.server.manager( MGR_CMD_CREATE, RSC, attr, id=r, runas=ROOT_USER, logerr=False) self.assertEqual(rc, 0) # Ensure the new resource is seen by all moms. self.momA.log_match( "resourcedef;copy hook-related file", max_attempts=3) self.momB.log_match( "resourcedef;copy hook-related file", max_attempts=3) self.momC.log_match( "resourcedef;copy hook-related file", max_attempts=3) attr['type'] = 'string' attr['flag'] = 'h' r = 'foo_str3' rc = self.server.manager( MGR_CMD_CREATE, RSC, attr, id=r, runas=ROOT_USER, logerr=False) self.assertEqual(rc, 0) # Ensure the new resource is seen by all moms. self.momA.log_match("resourcedef;copy hook-related file") self.momB.log_match("resourcedef;copy hook-related file") self.momC.log_match("resourcedef;copy hook-related file") attr['type'] = 'string' attr['flag'] = 'h' r = 'foo_str4' rc = self.server.manager( MGR_CMD_CREATE, RSC, attr, id=r, runas=ROOT_USER, logerr=False) self.assertEqual(rc, 0) # Ensure the new resource is seen by all moms. self.momA.log_match("resourcedef;copy hook-related file") self.momB.log_match("resourcedef;copy hook-related file") self.momC.log_match("resourcedef;copy hook-related file") attr['type'] = 'string_array' attr['flag'] = 'h' r = 'stra' rc = self.server.manager( MGR_CMD_CREATE, RSC, attr, id=r, runas=ROOT_USER, logerr=False) self.assertEqual(rc, 0) # Give the moms a chance to receive the updated resource. # Ensure the new resource is seen by all moms. self.momA.log_match("resourcedef;copy hook-related file") self.momB.log_match("resourcedef;copy hook-related file") self.momC.log_match("resourcedef;copy hook-related file") def test_epilogue(self): """ Test accumulatinon of resources of a multinode job from an exechost_epilogue hook. """ self.logger.info("test_epilogue") hook_body = """ import pbs e=pbs.event() pbs.logmsg(pbs.LOG_DEBUG, "executed epilogue hook") if e.job.in_ms_mom(): e.job.resources_used["vmem"] = pbs.size("9gb") e.job.resources_used["foo_i"] = 9 e.job.resources_used["foo_f"] = 0.09 e.job.resources_used["foo_str"] = '{"seven":7}' e.job.resources_used["cput"] = 10 e.job.resources_used["stra"] = '"glad,elated","happy"' e.job.resources_used["foo_str3"] = \ \"\"\"{"a":6,"b":"some value #$%^&*@","c":54.4,"d":"32.5gb"}\"\"\" e.job.resources_used["foo_str2"] = "seven" e.job.resources_used["foo_str4"] = "eight" else: e.job.resources_used["vmem"] = pbs.size("10gb") e.job.resources_used["foo_i"] = 10 e.job.resources_used["foo_f"] = 0.10 e.job.resources_used["foo_str"] = '{"eight":8,"nine":9}' e.job.resources_used["foo_str2"] = '{"seven":7}' e.job.resources_used["cput"] = 20 e.job.resources_used["stra"] = '"cucumbers,bananas"' e.job.resources_used["foo_str3"] = \"\"\""vn1":4,"vn2":5,"vn3":6\"\"\" """ hook_name = "epi" a = {'event': "execjob_epilogue", 'enabled': 'True'} rv = self.server.create_import_hook( hook_name, a, hook_body, overwrite=True) self.assertTrue(rv) a = {'Resource_List.select': '3:ncpus=1', 'Resource_List.walltime': 10, 'Resource_List.place': "scatter"} j = Job(TEST_USER) j.set_attributes(a) j.set_sleep_time("10") jid = self.server.submit(j) # The results should show results for custom resources 'foo_i', # 'foo_f', 'foo_str', 'foo_str3', and bultin resources 'vmem', # 'cput', and should be accumulating based # on the hook script, where MS defines 1 value, while the 2 sister # Moms define the same value. For 'string' type, it will be a # union of all values obtained from sister moms and local mom, and # the result will be in JSON-format. # # foo_str is for testing normal values. # foo_str2 is for testing non-JSON format value received from MS. # foo_str3 is for testing non-JSON format value received from a sister # mom. # foo_str4 is for testing MS-only set values. # # For string_array type resource 'stra', it is not accumulated but # will be set to last seen value from a mom epilogue hook. self.server.expect(JOB, { 'job_state': 'F', 'resources_used.foo_f': '0.29', 'resources_used.foo_i': '29', 'resources_used.foo_str4': "eight", 'resources_used.stra': "\"glad,elated\",\"happy\"", 'resources_used.vmem': '29gb', 'resources_used.cput': '00:00:50', 'resources_used.ncpus': '3'}, extend='x', offset=10, attrop=PTL_AND, id=jid) foo_str_dict_in = {"eight": 8, "seven": 7, "nine": 9} qstat = self.server.status( JOB, 'resources_used.foo_str', id=jid, extend='x') foo_str_dict_out_str = eval(qstat[0]['resources_used.foo_str']) foo_str_dict_out = eval(foo_str_dict_out_str) self.assertTrue(foo_str_dict_in == foo_str_dict_out) # resources_used.foo_str3 must not be set since a sister value is not # of JSON-format. self.server.expect(JOB, 'resources_used.foo_str3', op=UNSET, extend='x', id=jid) self.momA.log_match( "Job %s resources_used.foo_str3 cannot be " % (jid,) + "accumulated: value '\"vn1\":4,\"vn2\":5,\"vn3\":6' " + "from mom %s not JSON-format" % (self.hostB,)) # resources_used.foo_str2 must not be set. self.server.expect(JOB, 'resources_used.foo_str2', op=UNSET, id=jid) self.momA.log_match( "Job %s resources_used.foo_str2 cannot be " % (jid,) + "accumulated: value 'seven' from mom %s " % (self.hostA,) + "not JSON-format") # Match accounting_logs entry acctlog_match = 'resources_used.foo_f=0.29' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) acctlog_match = 'resources_used.foo_i=29' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) acctlog_match = "resources_used.foo_str='%s'" % (foo_str_dict_out_str,) self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) acctlog_match = 'resources_used.vmem=29gb' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) acctlog_match = 'resources_used.cput=00:00:50' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) # ensure resources_foo_str2 is not reported in accounting_logs since # it's unset due to non-JSON-format value. acctlog_match = 'resources_used.foo_str2=' self.server.accounting_match("E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100, existence=False) acctlog_match = 'resources_used.foo_str4=eight' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) acctlog_match = 'resources_used.ncpus=3' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) # resources_used.foo_str3 must not show up in accounting_logs acctlog_match = 'resources_used.foo_str3=', self.server.accounting_match("E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100, existence=False) acctlog_match = 'resources_used.stra=\"glad\,elated\"\,\"happy\"' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) def test_prologue(self): """ Test accumulatinon of resources of a multinode job from an exechost_prologue hook. """ self.logger.info("test_prologue") hook_body = """ import pbs e=pbs.event() pbs.logmsg(pbs.LOG_DEBUG, "executed prologue hook") if e.job.in_ms_mom(): e.job.resources_used["vmem"] = pbs.size("11gb") e.job.resources_used["foo_i"] = 11 e.job.resources_used["foo_f"] = 0.11 e.job.resources_used["foo_str"] = '{"seven":7}' e.job.resources_used["cput"] = 11 e.job.resources_used["stra"] = '"glad,elated","happy"' e.job.resources_used["foo_str3"] = \ \"\"\"{"a":6,"b":"some value #$%^&*@","c":54.4,"d":"32.5gb"}\"\"\" e.job.resources_used["foo_str2"] = "seven" e.job.resources_used["foo_str4"] = "eight" else: e.job.resources_used["vmem"] = pbs.size("12gb") e.job.resources_used["foo_i"] = 12 e.job.resources_used["foo_f"] = 0.12 e.job.resources_used["foo_str"] = '{"eight":8,"nine":9}' e.job.resources_used["foo_str2"] = '{"seven":7}' e.job.resources_used["cput"] = 12 e.job.resources_used["stra"] = '"cucumbers,bananas"' e.job.resources_used["foo_str3"] = \"\"\""vn1":4,"vn2":5,"vn3":6\"\"\" """ hook_name = "prolo" a = {'event': "execjob_prologue", 'enabled': 'True'} rv = self.server.create_import_hook( hook_name, a, hook_body, overwrite=True) self.assertTrue(rv) a = {'Resource_List.select': '3:ncpus=1', 'Resource_List.walltime': 10, 'Resource_List.place': 'scatter'} j = Job(TEST_USER) j.set_attributes(a) # The pbsdsh call is what allows a first task to get spawned on # on a sister mom, causing the execjob_prologue hook to execute. j.create_script( "pbsdsh -n 1 hostname\n" + "pbsdsh -n 2 hostname\n" + "sleep 10\n") jid = self.server.submit(j) # The results should show results for custom resources 'foo_i', # 'foo_f', 'foo_str', 'foo_str3', and bultin resources 'vmem', # 'cput', and should be accumulating based # on the hook script, where MS defines 1 value, while the 2 sister # Moms define the same value. For 'string' type, it will be a # union of all values obtained from sister moms and local mom, and # the result will be in JSON-format. # # foo_str is for testing normal values. # foo_str2 is for testing non-JSON format value received from MS. # foo_str3 is for testing non-JSON format value received from a sister # mom. # foo_str4 is for testing MS-only set values. # # For string_array type resource 'stra', it is not accumulated but # will be set to last seen value from a mom prologue hook. self.server.expect(JOB, { 'job_state': 'F', 'resources_used.foo_f': '0.35', 'resources_used.foo_i': '35', 'resources_used.foo_str4': "eight", 'resources_used.stra': "\"glad,elated\",\"happy\"", 'resources_used.vmem': '35gb', 'resources_used.cput': '00:00:35', 'resources_used.ncpus': '3'}, extend='x', offset=10, attrop=PTL_AND, id=jid) foo_str_dict_in = {"eight": 8, "seven": 7, "nine": 9} qstat = self.server.status( JOB, 'resources_used.foo_str', id=jid, extend='x') foo_str_dict_out_str = eval(qstat[0]['resources_used.foo_str']) foo_str_dict_out = eval(foo_str_dict_out_str) self.assertTrue(foo_str_dict_in == foo_str_dict_out) # resources_used.foo_str3 must not be set since a sister value is # not of JSON-format. self.server.expect(JOB, 'resources_used.foo_str3', op=UNSET, extend='x', id=jid) self.momA.log_match( "Job %s resources_used.foo_str3 cannot be " % (jid,) + "accumulated: value '\"vn1\":4,\"vn2\":5,\"vn3\":6' " + "from mom %s not JSON-format" % (self.hostB,)) self.momA.log_match( "Job %s resources_used.foo_str3 cannot be " % (jid,) + "accumulated: value '\"vn1\":4,\"vn2\":5,\"vn3\":6' " + "from mom %s not JSON-format" % (self.hostC,)) # Ensure resources_used.foo_str3 is not set since it has a # non-JSON format value. self.server.expect(JOB, 'resources_used.foo_str3', op=UNSET, extend='x', id=jid) # resources_used.foo_str2 must not be set. self.server.expect(JOB, 'resources_used.foo_str2', op=UNSET, id=jid) self.momA.log_match( "Job %s resources_used.foo_str2 cannot be " % (jid,) + "accumulated: value 'seven' from " + "mom %s not JSON-format" % (self.hostA,)) # Match accounting_logs entry acctlog_match = 'resources_used.foo_f=0.35' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) acctlog_match = 'resources_used.foo_i=35' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) acctlog_match = "resources_used.foo_str='%s'" % (foo_str_dict_out_str,) self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) acctlog_match = 'resources_used.vmem=35gb' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) acctlog_match = 'resources_used.cput=00:00:35' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) # resources_used.foo_str2 should not be reported in accounting_logs. acctlog_match = 'resources_used.foo_str2=' self.server.accounting_match("E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100, existence=False) acctlog_match = 'resources_used.ncpus=3' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) # resources_used.foo_str3 must not show up in accounting_logs acctlog_match = 'resources_used.foo_str3=' self.server.accounting_match("E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100, existence=False) acctlog_match = 'resources_used.foo_str4=eight' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) acctlog_match = 'resources_used.stra=\"glad\,elated\"\,\"happy\"' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) def test_periodic(self): """ Test accumulatinon of resources from an exechost_periodic hook. """ self.logger.info("test_periodic") hook_body = """ import pbs e=pbs.event() pbs.logmsg(pbs.LOG_DEBUG, "executed periodic hook") i = 0 l = [] for v in pbs.server().vnodes(): pbs.logmsg(pbs.LOG_DEBUG, "node %s" % (v.name,)) l.append(v.name) local_node=pbs.get_local_nodename() for jk in e.job_list.keys(): if local_node == l[0]: e.job_list[jk].resources_used["vmem"] = pbs.size("11gb") e.job_list[jk].resources_used["foo_i"] = 11 e.job_list[jk].resources_used["foo_f"] = 0.11 e.job_list[jk].resources_used["foo_str"] = '{"seven":7}' e.job_list[jk].resources_used["cput"] = 11 e.job_list[jk].resources_used["stra"] = '"glad,elated","happy"' e.job_list[jk].resources_used["foo_str3"] = \ \"\"\"{"a":6,"b":"some value #$%^&*@","c":54.4,"d":"32.5gb"}\"\"\" e.job_list[jk].resources_used["foo_str2"] = "seven" elif local_node == l[1]: e.job_list[jk].resources_used["vmem"] = pbs.size("12gb") e.job_list[jk].resources_used["foo_i"] = 12 e.job_list[jk].resources_used["foo_f"] = 0.12 e.job_list[jk].resources_used["foo_str"] = '{"eight":8}' e.job_list[jk].resources_used["cput"] = 12 e.job_list[jk].resources_used["stra"] = '"cucumbers,bananas"' e.job_list[jk].resources_used["foo_str2"] = '{"seven":7}' e.job_list[jk].resources_used["foo_str3"] = \ \"\"\"{"vn1":4,"vn2":5,"vn3":6}\"\"\" else: e.job_list[jk].resources_used["vmem"] = pbs.size("13gb") e.job_list[jk].resources_used["foo_i"] = 13 e.job_list[jk].resources_used["foo_f"] = 0.13 e.job_list[jk].resources_used["foo_str"] = '{"nine":9}' e.job_list[jk].resources_used["foo_str2"] = '{"seven":7}' e.job_list[jk].resources_used["cput"] = 13 e.job_list[jk].resources_used["stra"] = '"cucumbers,bananas"' e.job_list[jk].resources_used["foo_str3"] = \ \"\"\"{"vn1":4,"vn2":5,"vn3":6}\"\"\" """ hook_name = "period" a = {'event': "exechost_periodic", 'enabled': 'True', 'freq': 15} rv = self.server.create_import_hook( hook_name, a, hook_body, overwrite=True) self.assertTrue(rv) a = {'resources_available.ncpus': '2'} self.server.manager(MGR_CMD_SET, NODE, a, self.hostA, expect=True) self.server.manager(MGR_CMD_SET, NODE, a, self.hostB, expect=True) self.server.manager(MGR_CMD_SET, NODE, a, self.hostC, expect=True) a = {'Resource_List.select': '3:ncpus=1', 'Resource_List.place': 'scatter'} j = Job(TEST_USER) j.set_attributes(a) j.set_sleep_time("35") jid1 = self.server.submit(j) jid2 = self.server.submit(j) for jid in [jid1, jid2]: # The results should show results for custom resources 'foo_i', # 'foo_f', 'foo_str', 'foo_str3', and bultin resources 'vmem', # 'cput', and should be accumulating based # on the hook script, where MS defines 1 value, while the 2 sister # Moms define the same value. For 'string' type, it will be a # union of all values obtained from sister moms and local mom, and # the result will be in JSON-format. # foo_str is for testing normal values. # foo_str2 is for testing non-JSON format value received from MS. # foo_str3 is for testing non-JSON format value received from a # sister mom. # self.server.expect(JOB, { 'job_state': 'F', 'resources_used.foo_f': '0.36', 'resources_used.foo_i': '36', 'resources_used.stra': "\"glad,elated\",\"happy\"", 'resources_used.vmem': '36gb', 'resources_used.cput': '00:00:36', 'resources_used.ncpus': '3'}, extend='x', offset=35, attrop=PTL_AND, id=jid) foo_str_dict_in = {"eight": 8, "seven": 7, "nine": 9} qstat = self.server.status( JOB, 'resources_used.foo_str', id=jid, extend='x') foo_str_dict_out_str = eval(qstat[0]['resources_used.foo_str']) foo_str_dict_out = eval(foo_str_dict_out_str) self.assertTrue(foo_str_dict_in == foo_str_dict_out) foo_str3_dict_in = {"a": 6, "b": "some value #$%^&*@", "c": 54.4, "d": "32.5gb", "vn1": 4, "vn2": 5, "vn3": 6} qstat = self.server.status( JOB, 'resources_used.foo_str3', id=jid, extend='x') foo_str3_dict_out_str = eval(qstat[0]['resources_used.foo_str3']) foo_str3_dict_out = eval(foo_str3_dict_out_str) self.assertTrue(foo_str3_dict_in == foo_str3_dict_out) # resources_used.foo_str2 must be unset since its value is not of # JSON-format. self.server.expect(JOB, 'resources_used.foo_str2', op=UNSET, extend='x', id=jid) # Match accounting_logs entry acctlog_match = 'resources_used.foo_f=0.36' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) acctlog_match = 'resources_used.foo_i=36' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) acctlog_match = "resources_used.foo_str='%s'" % ( foo_str_dict_out_str,) self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) acctlog_match = 'resources_used.vmem=36gb' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) acctlog_match = 'resources_used.cput=00:00:36' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) # resources_used.foo_str2 must not show in accounting_logs acctlog_match = 'resources_used.foo_str2=', self.server.accounting_match("E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100, existence=False) acctlog_match = 'resources_used.ncpus=3' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) acctlog_match = "resources_used.foo_str3='%s'" % ( foo_str3_dict_out_str.replace('.', '\.'). replace("#$%^&*@", "\#\$\%\^\&\*\@")) self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) acctlog_match = 'resources_used.stra=\"glad\,elated\"\,\"happy\"' self.server.accounting_match( "E;%s;.*%s.*" % (jid, acctlog_match), regexp=True, n=100) def test_resource_bool(self): """ To test that boolean value are not getting aggregated """ # Create a boolean type resource attr = {} attr['type'] = 'boolean' self.server.manager( MGR_CMD_CREATE, RSC, attr, id='foo_bool', runas=ROOT_USER, logerr=False) hook_body = """ import pbs e=pbs.event() j=e.job if j.in_ms_mom(): j.resources_used["foo_bool"] = True else: j.resources_used["foo_bool"] = False """ hook_name = "epi_bool" a = {'event': "execjob_epilogue", 'enabled': "True"} self.server.create_import_hook( hook_name, a, hook_body, overwrite=True) a = {'Resource_List.select': '3:ncpus=1', 'Resource_List.walltime': 10, 'Resource_List.place': 'scatter'} j = Job(TEST_USER) j.set_attributes(a) j.set_sleep_time("5") jid = self.server.submit(j) # foo_bool is True a = {'resources_used.foo_bool': "True", 'job_state': 'F'} self.server.expect(JOB, a, extend='x', offset=5, attrop=PTL_AND, id=jid) def test_resource_invisible(self): """ Test that value aggregation is same for invisible resources """ # Set float and string_array to be invisible resource attr = {} attr['flag'] = 'ih' self.server.manager( MGR_CMD_SET, RSC, attr, id='foo_f', runas=ROOT_USER) self.server.manager( MGR_CMD_SET, RSC, attr, id='foo_str', runas=ROOT_USER) hook_body = """ import pbs e=pbs.event() j = e.job if j.in_ms_mom(): j.resources_used["foo_f"] = 2.114 j.resources_used["foo_str"] = '{"one":1,"two":2}' else: j.resources_used["foo_f"] = 3.246 j.resources_used["foo_str"] = '{"two":2, "three":3}' """ hook_name = "epi_invis" a = {'event': "execjob_epilogue", 'enabled': 'True'} self.server.create_import_hook( hook_name, a, hook_body, overwrite=True) a = {'Resource_List.select': '3:ncpus=1', 'Resource_List.walltime': 10, 'Resource_List.place': 'scatter'} j = Job(TEST_USER) j.set_attributes(a) j.set_sleep_time("5") jid = self.server.submit(j) # Verify that values are accumulated for float and string array a = {'resources_used.foo_f': '8.606'} self.server.expect(JOB, a, extend='x', offset=5, id=jid) foo_str_dict_in = {"one": 1, "two": 2, "three": 3} qstat = self.server.status( JOB, 'resources_used.foo_str', id=jid, extend='x') foo_str_dict_out_str = eval(qstat[0]['resources_used.foo_str']) foo_str_dict_out = eval(foo_str_dict_out_str) self.assertEquals(foo_str_dict_in, foo_str_dict_out) def test_reservation(self): """ Test that job inside reservations works same """ # Create non-host level resources from qmgr attr = {} attr['type'] = 'size' self.server.manager( MGR_CMD_CREATE, RSC, attr, id='foo_i2', runas=ROOT_USER) # Ensure the new resource is seen by all moms. self.momA.log_match( "resourcedef;copy hook-related file", max_attempts=3) self.momB.log_match( "resourcedef;copy hook-related file", max_attempts=3) self.momC.log_match( "resourcedef;copy hook-related file", max_attempts=3) attr['type'] = 'float' self.server.manager( MGR_CMD_CREATE, RSC, attr, id='foo_f2', runas=ROOT_USER) # Ensure the new resource is seen by all moms. self.momA.log_match( "resourcedef;copy hook-related file", max_attempts=3) self.momB.log_match( "resourcedef;copy hook-related file", max_attempts=3) self.momC.log_match( "resourcedef;copy hook-related file", max_attempts=3) attr['type'] = 'string_array' self.server.manager( MGR_CMD_CREATE, RSC, attr, id='stra2', runas=ROOT_USER) # Ensure the new resource is seen by all moms. self.momA.log_match( "resourcedef;copy hook-related file", max_attempts=3) self.momB.log_match( "resourcedef;copy hook-related file", max_attempts=3) self.momC.log_match( "resourcedef;copy hook-related file", max_attempts=3) # Create an epilogue hook hook_body = """ import pbs e = pbs.event() j = e.job pbs.logmsg(pbs.LOG_DEBUG, "executed epilogue hook") j.resources_used["foo_i"] = 2 j.resources_used["foo_i2"] = pbs.size(1000) j.resources_used["foo_f"] = 1.02 j.resources_used["foo_f2"] = 2.01 j.resources_used["stra"] = '"happy"' j.resources_used["stra2"] = '"glad"' """ # Create and import hook a = {'event': "execjob_epilogue", 'enabled': 'True'} self.server.create_import_hook( "epi", a, hook_body, overwrite=True) # Submit a reservation a = {'Resource_List.select': '3:ncpus=1', 'Resource_List.place': 'scatter', 'reserve_start': time.time() + 10, 'reserve_end': time.time() + 30, } r = Reservation(TEST_USER, a) rid = self.server.submit(r) a = {'reserve_state': (MATCH_RE, "RESV_CONFIRMED|2")} self.server.expect(RESV, a, id=rid) rname = rid.split('.') # Submit a job inside reservation a = {'Resource_List.select': '3:ncpus=1', ATTR_queue: rname[0]} j = Job(TEST_USER) j.set_attributes(a) j.set_sleep_time(20) jid = self.server.submit(j) # Verify the resource values a = {'resources_used.foo_i': '6', 'resources_used.foo_i2': '3kb', 'resources_used.foo_f': '3.06', 'resources_used.foo_f2': '6.03', 'resources_used.stra': "\"happy\"", 'resources_used.stra2': "\"glad\"", 'job_state': 'F'} self.server.expect(JOB, a, extend='x', attrop=PTL_AND, offset=30, interval=1, max_attempts=20, id=jid) # Restart server and verifies that the values are still the same self.server.restart() # Below is commented due to a known PBS issue # self.server.expect(JOB, a, extend='x', id=jid) def test_server_restart(self): """ Test that resource accumulation will not get impacted if server is restarted during job execution """ # Create a prologue hook hook_body = """ import pbs e=pbs.event() pbs.logmsg(pbs.LOG_DEBUG, "executed prologue hook") if e.job.in_ms_mom(): e.job.resources_used["vmem"] = pbs.size("11gb") e.job.resources_used["foo_i"] = 11 e.job.resources_used["foo_f"] = 0.11 e.job.resources_used["foo_str"] = '{"seven":7}' e.job.resources_used["cput"] = 11 e.job.resources_used["stra"] = '"glad,elated","happy"' e.job.resources_used["foo_str4"] = "eight" else: e.job.resources_used["vmem"] = pbs.size("12gb") e.job.resources_used["foo_i"] = 12 e.job.resources_used["foo_f"] = 0.12 e.job.resources_used["foo_str"] = '{"eight":8,"nine":9}' e.job.resources_used["cput"] = 12 e.job.resources_used["stra"] = '"cucumbers,bananas"' """ hook_name = "prolo" a = {'event': "execjob_prologue", 'enabled': 'True'} self.server.create_import_hook( hook_name, a, hook_body, overwrite=True) a = {'Resource_List.select': '3:ncpus=1', 'Resource_List.walltime': 20, 'Resource_List.place': 'scatter'} j = Job(TEST_USER) j.set_attributes(a) # The pbsdsh call is what allows a first task to get spawned on # on a sister mom, causing the execjob_prologue hook to execute. j.create_script( "pbsdsh -n 1 hostname\n" + "pbsdsh -n 2 hostname\n" + "sleep 10\n") jid = self.server.submit(j) # Once the job is started running restart server self.server.expect(JOB, {'job_state': "R", "substate": 42}, id=jid) self.server.restart() # Job will be requeued and rerun. Verify that the # resource accumulation is similar as if server is # not started a = {'resources_used.foo_i': '35', 'resources_used.foo_f': '0.35', 'resources_used.vmem': '35gb', 'resources_used.cput': '00:00:35', 'resources_used.stra': "\"glad,elated\",\"happy\"", 'resources_used.foo_str4': "eight", 'job_state': 'F'} self.server.expect(JOB, a, extend='x', offset=5, id=jid, interval=1, attrop=PTL_AND) foo_str_dict_in = {"eight": 8, "seven": 7, "nine": 9} qstat = self.server.status( JOB, 'resources_used.foo_str', id=jid, extend='x') foo_str_dict_out_str = eval(qstat[0]['resources_used.foo_str']) foo_str_dict_out = eval(foo_str_dict_out_str) self.assertEquals(foo_str_dict_in, foo_str_dict_out) def test_mom_down(self): """ Test that resource_accumulation is not impacted due to mom restart """ # Set node_fail_requeue to requeue job self.server.manager(MGR_CMD_SET, SERVER, {'node_fail_requeue': 10}) hook_body = """ import pbs e = pbs.event() pbs.logmsg(pbs.LOG_DEBUG, "executed periodic hook") for jj in e.job_list.keys(): e.job_list[jj].resources_used["foo_i"] = 1 e.job_list[jj].resources_used["foo_str"] = '{"happy":"true"}' e.job_list[jj].resources_used["stra"] = '"one","two"' """ a = {'event': "exechost_periodic", 'enabled': 'True', 'freq': 10} self.server.create_import_hook( "period", a, hook_body, overwrite=True) a = {'Resource_List.select': '3:ncpus=1', 'Resource_List.place': 'scatter'} j = Job(TEST_USER) j.set_attributes(a) jid1 = self.server.submit(j) # Submit a job that can never run a = {'Resource_List.select': '5:ncpus=1', 'Resource_List.place': 'scatter'} j.set_attributes(a) jid2 = self.server.submit(j) # Wait for 10s approx for hook to get executed # verify the resources_used.foo_i self.server.expect(JOB, {'resources_used.foo_i': '3'}, offset=10, id=jid1, interval=1) self.server.expect(JOB, "resources_used.foo_i", op=UNSET, id=jid2) # Bring sister mom down self.momB.stop() # Wait for 20 more seconds for preiodic hook to run # more than once and verify that value is still 3 self.server.expect(JOB, {'resources_used.foo_i': '3'}, offset=20, id=jid1, interval=1) # Wait for job to be requeued by node_fail_requeue self.server.rerunjob(jid1, runas=ROOT_USER) self.server.expect(JOB, {'job_state': 'Q'}, id=jid1) # Verify that resources_used.foo_i is unset self.server.expect(JOB, "resources_used.foo_i", op=UNSET, id=jid1) # Bring sister mom up self.momB.start() self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'}) self.server.expect(JOB, {'job_state': 'R'}, id=jid1, interval=1) # Verify that value of foo_i for job1 is set back self.server.expect(JOB, {'resources_used.foo_i': '3'}, offset=10, id=jid1, interval=1) def test_job_rerun(self): """ Test that resource accumulates once when job is rerun """ hook_body = """ import pbs e = pbs.event() pbs.logmsg(pbs.LOG_DEBUG, "executed periodic hook") for jj in e.job_list.keys(): e.job_list[jj].resources_used["foo_f"] = 1.01 e.job_list[jj].resources_used["cput"] = 10 """ a = {'event': "exechost_periodic", 'enabled': 'True', 'freq': 10} self.server.create_import_hook( "period", a, hook_body, overwrite=True) a = {'Resource_List.select': '3:ncpus=1', 'Resource_List.place': 'scatter'} j = Job(TEST_USER) j.set_attributes(a) jid1 = self.server.submit(j) self.server.expect(JOB, {'job_state': "R", "substate": 42}, id=jid1) # Wait for 10s approx for hook to get executed # Verify the resources_used.foo_f a = {'resources_used.foo_f': '3.03', 'resources_used.cput': 30} self.server.expect(JOB, a, offset=10, id=jid1, attrop=PTL_AND, interval=1) # Rerun the job self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'}) self.server.rerunjob(jobid=jid1, runas=ROOT_USER) self.server.expect(JOB, {'job_state': 'Q'}, id=jid1) # Verify that foo_f is unset self.server.expect(JOB, 'Resource_List.foo_f', op=UNSET, id=jid1) # turn the scheduling on self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'}) self.server.expect(JOB, {'job_state': "R", "substate": 42}, attrop=PTL_AND, id=jid1) # Validate that resources_used.foo_f is reset self.server.expect(JOB, a, offset=10, id=jid1, attrop=PTL_AND, interval=1) def test_job_array(self): """ Test that resource accumulation for subjobs also work """ hook_body = """ import pbs e=pbs.event() pbs.logmsg(pbs.LOG_DEBUG, "executed epilogue hook") if e.job.in_ms_mom(): e.job.resources_used["vmem"] = pbs.size("9gb") e.job.resources_used["foo_i"] = 9 e.job.resources_used["foo_f"] = 0.09 e.job.resources_used["foo_str"] = '{"seven":7}' e.job.resources_used["cput"] = 10 e.job.resources_used["stra"] = '"glad,elated","happy"' else: e.job.resources_used["vmem"] = pbs.size("10gb") e.job.resources_used["foo_i"] = 10 e.job.resources_used["foo_f"] = 0.10 e.job.resources_used["foo_str"] = '{"eight":8,"nine":9}' e.job.resources_used["cput"] = 20 e.job.resources_used["stra"] = '"cucumbers,bananas"' """ a = {'event': "execjob_epilogue", 'enabled': 'True'} self.server.create_import_hook( "test", a, hook_body, overwrite=True) a = {'Resource_List.select': '3:ncpus=1', 'Resource_List.walltime': 10, 'Resource_List.place': 'scatter'} j = Job(TEST_USER, attrs={ATTR_J: '1-2'}) j.set_attributes(a) j.set_sleep_time("5") jid = self.server.submit(j) # Verify that once subjobs are over values are # set for each subjob in the accounting logs subjob1 = string.replace(jid, '[]', '[1]') acctlog_match = 'resources_used.foo_f=0.29' # Below code is commented due to a PTL issue # s = self.server.accounting_match( # "E;%s;.*%s.*" % (subjob1, acctlog_match), regexp=True, n=100) # self.assertTrue(s) acctlog_match = 'resources_used.foo_i=29' # s = self.server.accounting_match( # "E;%s;.*%s.*" % (subjob1, acctlog_match), regexp=True, n=100) # self.assertTrue(s) foo_str_dict_in = {"eight": 8, "seven": 7, "nine": 9} acctlog_match = "resources_used.foo_str='%s'" % (foo_str_dict_in,) # s = self.server.accounting_match( # "E;%s;.*%s.*" % (subjob1, acctlog_match), regexp=True, n=100) # self.assertTrue(s) acctlog_match = 'resources_used.vmem=29gb' # s = self.server.accounting_match( # "E;%s;.*%s.*" % (subjob1, acctlog_match), regexp=True, n=100) # self.assertTrue(s) acctlog_match = 'resources_used.cput=00:00:50' # s = self.server.accounting_match( # "E;%s;.*%s.*" % (subjob1, acctlog_match), regexp=True, n=100) # self.assertTrue(s) acctlog_match = 'resources_used.stra=\"glad\,elated\"\,\"happy\"' # s = self.server.accounting_match( # "E;%s;.*%s.*" % (subjob1, acctlog_match), regexp=True, n=100) # self.assertTrue(s) def test_epi_pro(self): """ Test that epilogue and prologue changing same and different resources. Values of same resource would get overwriteen by the last hook. """ hook_body = """ import pbs e=pbs.event() pbs.logmsg(pbs.LOG_DEBUG, "In prologue hook") e.job.resources_used["foo_i"] = 10 e.job.resources_used["foo_f"] = 0.10 """ a = {'event': "execjob_prologue", 'enabled': 'True'} self.server.create_import_hook( "pro", a, hook_body, overwrite=True) # Verify the copy message in the logs to avoid # race conditions self.momA.log_match( "pro.PY;copy hook-related file", max_attempts=10) self.momB.log_match( "pro.PY;copy hook-related file", max_attempts=10) self.momC.log_match( "pro.PY;copy hook-related file", max_attempts=10) hook_body = """ import pbs e=pbs.event() pbs.logmsg(pbs.LOG_DEBUG, "In epilogue hook") e.job.resources_used["foo_f"] = 0.20 e.job.resources_used["cput"] = 10 """ a = {'event': "execjob_epilogue", 'enabled': 'True'} self.server.create_import_hook( "epi", a, hook_body, overwrite=True) # Verify the copy message in the logs to avoid # race conditions self.momA.log_match( "epi.PY;copy hook-related file", max_attempts=10) self.momB.log_match( "epi.PY;copy hook-related file", max_attempts=10) self.momC.log_match( "epi.PY;copy hook-related file", max_attempts=10) a = {'Resource_List.select': '3:ncpus=1', 'Resource_List.place': 'scatter'} j = Job(TEST_USER) j.set_attributes(a) j.create_script( "pbsdsh -n 1 hostname\n" + "pbsdsh -n 2 hostname\n" + "sleep 5\n") jid = self.server.submit(j) # Verify the resources_used once the job is over self.server.expect(JOB, { 'resources_used.foo_i': '30', 'resources_used.foo_f': '0.6', 'resources_used.cput': '30', 'job_state': 'F'}, attrop=PTL_AND, extend='x', id=jid, offset=5, max_attempts=60, interval=1) # Submit another job j1 = Job(TEST_USER) j1.set_attributes(a) j1.create_script( "pbsdsh -n 1 hostname\n" + "pbsdsh -n 2 hostname\n" + "sleep 300\n") jid1 = self.server.submit(j1) # Verify that prologue hook has set the values self.server.expect(JOB, { 'job_state': 'R', 'resources_used.foo_i': '30', 'resources_used.foo_f': '0.3'}, attrop=PTL_AND, id=jid1, max_attempts=30, interval=2) # Force delete the job self.server.deljob(id=jid1, wait=True, attr_W="force") # Verify values are accumulated by prologue hook only self.server.expect(JOB, { 'resources_used.foo_i': '30', 'resources_used.foo_f': '0.3'}, attrop=PTL_AND, extend='x', id=jid1) def test_server_restart2(self): """ Test that server restart during hook execution has no impact """ hook_body = """ import pbs import time e = pbs.event() pbs.logmsg(pbs.LOG_DEBUG, "executed epilogue hook") if e.job.in_ms_mom(): e.job.resources_used["vmem"] = pbs.size("9gb") e.job.resources_used["foo_i"] = 9 e.job.resources_used["foo_f"] = 0.09 e.job.resources_used["foo_str"] = '{"seven":7}' e.job.resources_used["cput"] = 10 else: e.job.resources_used["vmem"] = pbs.size("10gb") e.job.resources_used["foo_i"] = 10 e.job.resources_used["foo_f"] = 0.10 e.job.resources_used["foo_str"] = '{"eight":8,"nine":9}' e.job.resources_used["cput"] = 20 time.sleep(15) """ a = {'event': "execjob_epilogue", 'enabled': 'True'} self.server.create_import_hook( "epi", a, hook_body, overwrite=True) # Submit a job a = {'Resource_List.select': '3:ncpus=1', 'Resource_List.walltime': 10, 'Resource_List.place': "scatter"} j = Job(TEST_USER) j.set_attributes(a) j.set_sleep_time("5") jid = self.server.submit(j) # Verify the resource values a = {'resources_used.foo_i': 29, 'resources_used.foo_f': 0.29, 'resources_used.foo_str': "\'{\"eight\": 8, \"seven\": 7, \"nine\": 9}\'"} self.server.expect(JOB, a, extend='x', attrop=PTL_AND, offset=5, id=jid, interval=1) # Restart server while hook is still executing self.server.restart() # Verify that values again self.server.expect(JOB, a, extend='x', attrop=PTL_AND, id=jid) def test_mom_down2(self): """ Test that when mom is down values are still accumulated for resources """ hook_body = """ import pbs e=pbs.event() pbs.logmsg(pbs.LOG_DEBUG, "executed epilogue hook") if e.job.in_ms_mom(): e.job.resources_used["vmem"] = pbs.size("9gb") e.job.resources_used["foo_i"] = 9 e.job.resources_used["foo_f"] = 0.09 e.job.resources_used["foo_str"] = '{"seven":7}' e.job.resources_used["cput"] = 10 e.job.resources_used["stra"] = '"glad,elated","happy"' else: e.job.resources_used["vmem"] = pbs.size("10gb") e.job.resources_used["foo_i"] = 10 e.job.resources_used["foo_f"] = 0.10 e.job.resources_used["foo_str"] = '{"eight":8,"nine":9}' e.job.resources_used["cput"] = 20 e.job.resources_used["stra"] = '"cucumbers,bananas"' """ a = {'event': "execjob_epilogue", 'enabled': 'True'} self.server.create_import_hook( "epi", a, hook_body, overwrite=True) # Submit a job a = {'Resource_List.select': '3:ncpus=1', 'Resource_List.walltime': 10, 'Resource_List.place': "scatter"} j = Job(TEST_USER) j.set_attributes(a) j.set_sleep_time("10") jid = self.server.submit(j) # Verify job is running self.server.expect(JOB, {'job_state': "R"}, id=jid) # Bring sister mom down self.momB.stop() # Wait for job to end # Validate that the values are being set # with 2 moms only self.server.expect(JOB, {'job_state': 'F', 'resources_used.foo_i': '19', 'resources_used.foo_f': '0.19', 'resources_used.foo_str': '\'{\"eight\": 8, \"seven\": 7, \"nine\": 9}\''}, offset=10, id=jid, interval=1, extend='x', attrop=PTL_AND) # Bring the mom back up self.momB.start()