# coding: utf-8 # Copyright (C) 1994-2018 Altair Engineering, Inc. # For more information, contact Altair at www.altair.com. # # This file is part of the PBS Professional ("PBS Pro") software. # # Open Source License Information: # # PBS Pro is free software. You can redistribute it and/or modify it under the # terms of the GNU Affero General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. # See the GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # Commercial License Information: # # For a copy of the commercial license terms and conditions, # go to: (http://www.pbspro.com/UserArea/agreement.html) # or contact the Altair Legal Department. # # Altair’s dual-license business model allows companies, individuals, and # organizations to create proprietary derivative works of PBS Pro and # distribute them - whether embedded or bundled with other software - # under a commercial license agreement. # # Use of Altair’s trademarks, including but not limited to "PBS™", # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's # trademark licensing policies. from tests.functional import * class TestProvisioningJob_Enh(TestFunctional): """ This testsuite tests newly introduced provisioining capabilities. With this enhacement, PBS will be able to run job requesting aoe in subchunks, just like any other custom non consumable resource. PRE: Have a cluster of PBS with two MOM's installed, with one MOM on a node other than PBS server host. Pass the provisionable mom first in pbs_bencpress. Eg. pbs_bencpress -p moms=second_node:server_node ... """ fake_prov_hook = """ import pbs import time e = pbs.event() vnode = e.vnode aoe = e.aoe if aoe == 'App1': pbs.logmsg(pbs.LOG_DEBUG, "fake application provisioning script") e.accept(1) pbs.logmsg(pbs.LOG_DEBUG, "aoe=%s,vnode=%s" % (aoe,vnode)) pbs.logmsg(pbs.LOG_DEBUG, "fake os provisioning script") e.accept(0) """ reject_runjob_hook = """ import pbs e = pbs.event() j = e.job pbs.logmsg(pbs.LOG_DEBUG, "job " + str(j) + " solution " + str(j.exec_vnode)) e.reject() """ def setUp(self): if len(self.moms) < 2: self.skipTest("Provide at least 2 moms while invoking test") TestFunctional.setUp(self) # This test suite expects the the first mom given with "-p moms" # benchpress option to be remote mom. In case this assumption # is not true then it reverses the order in the setup. if self.moms.values()[0].shortname == self.server.shortname: self.momA = self.moms.values()[1] self.momB = self.moms.values()[0] else: self.momA = self.moms.values()[0] self.momB = self.moms.values()[1] self.hostA = self.momA.shortname self.hostB = self.momB.shortname # Remove all nodes self.server.manager(MGR_CMD_DELETE, NODE, None, "") # Restart PBS self.server.restart() # Create node self.server.manager(MGR_CMD_CREATE, NODE, None, self.hostA) self.server.manager(MGR_CMD_CREATE, NODE, None, self.hostB) self.server.expect(NODE, {'state': 'free'}, id=self.hostA) self.server.expect(NODE, {'state': 'free'}, id=self.hostB) # Set hostA provisioning attributes. a = {'provision_enable': 'true', 'resources_available.ncpus': '2', 'resources_available.aoe': 'App1,osimage1'} self.server.manager( MGR_CMD_SET, NODE, a, id=self.hostA, expect=True) self.server.manager(MGR_CMD_UNSET, NODE, id=self.hostA, attrib='current_aoe', expect=True) # Set hostB ncpus to 12 a = {'resources_available.ncpus': '12'} self.server.manager( MGR_CMD_SET, NODE, a, id=self.hostB, expect=True) # Setup provisioning hook. a = {'event': 'provision', 'enabled': 'True', 'alarm': '300'} rv = self.server.create_import_hook( 'fake_prov_hook', a, self.fake_prov_hook, overwrite=True) self.assertTrue(rv) self.server.manager(MGR_CMD_SET, SERVER, {'log_events': 2047}, expect=True) def test_app_provisioning(self): """ Test application provisioning """ j = Job(TEST_USER1) j.set_sleep_time(5) j.set_attributes({'Resource_List.select': '1:aoe=App1'}) jid = self.server.submit(j) # Job should start running after provisioining script finish # executing. # Since this is application provisioining, mom restart is # not needed. self.server.expect(JOB, {'job_state': 'R'}, id=jid) # Current aoe on momA, should be set to the requested aoe in job. self.server.expect(NODE, {'current_aoe': 'App1'}, id=self.hostA) self.server.log_match( "fake application provisioning script", max_attempts=20, interval=1) def test_os_provisioning(self): """ Test os provisioning """ j = Job(TEST_USER1) j.set_sleep_time(10) j.set_attributes({'Resource_List.select': '1:aoe=osimage1'}) jid = self.server.submit(j) # Job will start and wait for provisioning to complete. self.server.expect(JOB, {ATTR_substate: '71'}, id=jid) self.server.log_match("fake os provisioning script", max_attempts=60, interval=1) # Since this is OS provisioining, mom restart is # required to finish provisioining. Sending SIGHUP # to the provisioning mom will also work, but restart # is more apt to simulate real world scenario. self.momA.restart() # Current aoe on momA should be set to the requested aoe in job. self.server.expect(NODE, {'current_aoe': 'osimage1'}, id=self.hostA) # After mom restart job execution should start, as # OS provisioining completes affer mom restart. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid) def test_subchunk_application_provisioning(self): """ Test application provisioning job request consist of subchunks with and without aoe resource. """ j = Job(TEST_USER1) j.set_attributes({'Resource_List.select': '1:ncpus=1:aoe=App1+1:ncpus=12'}) j.set_sleep_time(5) jid = self.server.submit(j) self.server.expect(JOB, {ATTR_state: 'R'}, id=jid) self.server.expect(JOB, ATTR_execvnode, id=jid, op=SET) nodes = j.get_vnodes(j.exec_vnode) self.server.log_match("fake application provisioning script", max_attempts=20, interval=1) self.assertTrue((nodes[0] == self.momA.shortname and nodes[1] == self.momB.shortname) or (nodes[0] == self.momB.shortname and nodes[1] == self.momA.shortname)) # Current aoe on momA, should be set to the requested aoe in job. self.server.expect(NODE, {'current_aoe': 'App1'}, id=self.hostA) def test_subchunk_os_provisioning(self): """ Test os provisioning job request consist of subchunks with and without aoe resource. """ a = {'Resource_List.select': '1:aoe=osimage1+1:ncpus=12'} j = Job(TEST_USER1, a) j.set_sleep_time(10) jid = self.server.submit(j) self.server.expect(JOB, ATTR_execvnode, id=jid, op=SET) nodes = j.get_vnodes(j.exec_vnode) self.assertTrue((nodes[0] == self.momA.shortname and nodes[1] == self.momB.shortname) or (nodes[0] == self.momB.shortname and nodes[1] == self.momA.shortname)) self.momA.restart() # After mom restart job execution should start, as # OS provisioining completes affer mom restart. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid) # Current aoe on momA, should be set to the requested aoe in job. self.server.expect(NODE, {'current_aoe': 'osimage1'}, id=self.hostA) def test_job_wide_provisioining_request(self): """ Test jobs with jobwide aoe resource request. """ # Below job will not run, since resource requested are job-wide, # and no single node have all the requested resource. j = Job(TEST_USER1) j.set_sleep_time(5) j.set_attributes({"Resource_List.aoe": "App1", "Resource_List.ncpus": 12}) jid = self.server.submit(j) self.server.expect(JOB, {ATTR_state: 'Q', ATTR_comment: (MATCH_RE, 'Not Running: Insufficient ' + 'amount of resource: .*')}, id=jid) j = Job(TEST_USER1) j.set_attributes({"Resource_List.aoe": "App1", "Resource_List.ncpus": 1}) jid = self.server.submit(j) self.server.expect(JOB, {ATTR_state: 'R'}, id=jid) # Current aoe on momA, should be set to the requested aoe in job. self.server.expect(NODE, {'current_aoe': 'App1'}, id=self.hostA) def test_multiple_aoe_request(self): """ Test jobs with multiple similar/various aoe request in subchunks. Job request cosisting of multiple subchunks with different aoe will fail submission, Whereas job request with same aoe across multiple subchunks should be succesful. """ a1 = {'Resource_List.select': '1:ncpus=1:aoe=App1+1:ncpus=12:aoe=osimage1'} a2 = {'Resource_List.select': '1:ncpus=1:aoe=App1+1:ncpus=12:aoe=App1'} # Below job will fail submission, since different aoe's requested, # across multiple subchunks. j = Job(TEST_USER1) j.set_attributes(a1) j.set_sleep_time(5) jid = None try: jid = self.server.submit(j) self.assertTrue(jid is None, 'Job successfully submitted' + 'when it should have failed') except PbsSubmitError as e: self.assertTrue('Invalid provisioning request in chunk(s)' in e.msg[0], 'Job submission failed, but due to ' + 'unexpected reason.\n%s' % e.msg[0]) self.logger.info("Job submission failed, as expected") # Below job will get submitted, since same aoe requested, # across multiple subchunks. j = Job(TEST_USER1) j.set_attributes(a2) jid = self.server.submit(j) self.assertTrue(jid is not None, 'Job submission failed' + 'when it should have succeeded') self.logger.info("Job submission succeeded, as expected") def test_provisioning_with_placement(self): """ Test provisioining job with various placement options. """ # Below job will not run, since placement is set to pack. # and no single node have all the requested resource. j = Job(TEST_USER1) j.set_attributes({'Resource_List.select': '1:ncpus=1:aoe=App1+1:ncpus=12', 'Resource_List.place': 'pack'}) j.set_sleep_time(5) jid = self.server.submit(j) self.server.expect(JOB, {ATTR_state: 'Q', ATTR_comment: (MATCH_RE, 'Not Running: Insufficient ' + 'amount of resource: .*')}, id=jid) # Below job will run with placement set to pack. # since there is only one node with the requested resource. j = Job(TEST_USER1) j.set_attributes({'Resource_List.select': '1:ncpus=1:aoe=App1+1:ncpus=1', 'Resource_List.place': 'pack'}) j.set_sleep_time(5) jid = self.server.submit(j) self.server.expect(JOB, {'job_state': 'R'}, id=jid) self.server.expect(JOB, ATTR_execvnode, id=jid, op=SET) nodes = j.get_vnodes(j.exec_vnode) self.assertTrue(nodes[0] == self.momA.shortname) # Current aoe on momA, should be set to the requested aoe in job. self.server.expect(NODE, {'current_aoe': 'App1'}, id=self.hostA) # This was needed since sometime the above job takes longer # to finish and release the resources. This causes delay for # the next job to start and can probably fail the test. self.server.cleanup_jobs(extend='force') # Below job will run on two node with placement set to scatter. # even though single node can satisfy both the requested chunks. j = Job(TEST_USER1) j.set_attributes({'Resource_List.select': '1:ncpus=1:aoe=App1+1:ncpus=1', 'Resource_List.place': 'scatter'}) j.set_sleep_time(5) jid = self.server.submit(j) self.server.expect(JOB, ATTR_execvnode, id=jid, op=SET) nodes = j.get_vnodes(j.exec_vnode) self.assertTrue((nodes[0] == self.momA.shortname and nodes[1] == self.momB.shortname) or (nodes[0] == self.momB.shortname and nodes[1] == self.momA.shortname)) self.server.expect(JOB, {'job_state': 'R'}, id=jid) # Current aoe on momA, should be set to the requested aoe in job. self.server.expect(NODE, {'current_aoe': 'App1'}, id=self.hostA) def test_sched_provisioning_response_with_runjob(self): """ Test that if one provisioning job fails to run then scheduler correctly provides the node solution for the second job with aoe in it. """ # Setup runjob hook. a = {'event': 'runjob', 'enabled': 'True'} rv = self.server.create_import_hook( 'reject_runjob_hook', a, self.reject_runjob_hook, overwrite=True) self.assertTrue(rv) # Set current aoe to App1 self.server.manager(MGR_CMD_SET, NODE, id=self.hostA, attrib={'current_aoe': 'App1'}, expect=True) # Turn on scheduling self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'}) # submit two provisioning jobs a = {'Resource_List.select': '1:aoe=osimage1:ncpus=1+1:ncpus=4', 'Resource_List.place': 'vscatter'} j = Job(TEST_USER1, attrs=a) jid1 = self.server.submit(j) jid2 = self.server.submit(j) # Turn off scheduling self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'}) # Job will be rejected by runjob hook and it should log # correct exec_vnode for each job. msg = "job %s " + "solution (%s:aoe=osimage1:ncpus=1)+(%s:ncpus=4)" job1_msg = msg % (jid1, self.hostA, self.hostB) job2_msg = msg % (jid2, self.hostA, self.hostB) self.server.log_match(job1_msg) self.server.log_match(job2_msg) def test_sched_provisioning_response(self): """ Test that if scheduler could not find node solution for one provisioning job then it will find the correct solution for the second one. """ # Set current aoe to osimage1 self.server.manager(MGR_CMD_SET, NODE, id=self.hostA, attrib={'current_aoe': 'osimage1'}, expect=True) # submit one job that will run on local node a = {'Resource_List.select': '1:ncpus=10'} j1 = Job(TEST_USER1, attrs=a) j1.set_sleep_time(200) jid1 = self.server.submit(j1) self.server.expect(JOB, {'job_state': 'R'}, id=jid1) # Turn off scheduling self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'}) # submit two provisioning jobs where first job will not be able # to run and second one can a = {'Resource_List.select': '1:aoe=App1:ncpus=1+1:ncpus=3', 'Resource_List.place': 'vscatter'} j2 = Job(TEST_USER1, attrs=a) jid2 = self.server.submit(j2) a = {'Resource_List.select': '1:aoe=App1:ncpus=1+1:ncpus=2', 'Resource_List.place': 'vscatter'} j3 = Job(TEST_USER1, attrs=a) jid3 = self.server.submit(j3) # Turn on scheduling self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'}) ev_format = "(%s:aoe=App1:ncpus=1)+(%s:ncpus=2)" solution = ev_format % (self.hostA, self.hostB) self.server.expect(JOB, {'job_state': 'Q'}, id=jid2) self.server.expect(JOB, {'job_state': 'R'}, id=jid3) job_state = self.server.status(JOB, id=jid3) self.assertEqual(job_state[0]['exec_vnode'], solution)