123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442 |
- # coding: utf-8
- # Copyright (C) 1994-2018 Altair Engineering, Inc.
- # For more information, contact Altair at www.altair.com.
- #
- # This file is part of the PBS Professional ("PBS Pro") software.
- #
- # Open Source License Information:
- #
- # PBS Pro is free software. You can redistribute it and/or modify it under the
- # terms of the GNU Affero General Public License as published by the Free
- # Software Foundation, either version 3 of the License, or (at your option) any
- # later version.
- #
- # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
- # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- # FOR A PARTICULAR PURPOSE.
- # See the GNU Affero General Public License for more details.
- #
- # You should have received a copy of the GNU Affero General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
- #
- # Commercial License Information:
- #
- # For a copy of the commercial license terms and conditions,
- # go to: (http://www.pbspro.com/UserArea/agreement.html)
- # or contact the Altair Legal Department.
- #
- # Altair’s dual-license business model allows companies, individuals, and
- # organizations to create proprietary derivative works of PBS Pro and
- # distribute them - whether embedded or bundled with other software -
- # under a commercial license agreement.
- #
- # Use of Altair’s trademarks, including but not limited to "PBS™",
- # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
- # trademark licensing policies.
- from tests.functional import *
- class TestProvisioningJob_Enh(TestFunctional):
- """
- This testsuite tests newly introduced provisioining capabilities.
- With this enhacement, PBS will be able to run job requesting aoe
- in subchunks, just like any other custom non consumable resource.
- PRE: Have a cluster of PBS with two MOM's installed, with one MOM
- on a node other than PBS server host. Pass the provisionable mom
- first in pbs_bencpress.
- Eg. pbs_bencpress -p moms=second_node:server_node ...
- """
- fake_prov_hook = """
- import pbs
- import time
- e = pbs.event()
- vnode = e.vnode
- aoe = e.aoe
- if aoe == 'App1':
- pbs.logmsg(pbs.LOG_DEBUG, "fake application provisioning script")
- e.accept(1)
- pbs.logmsg(pbs.LOG_DEBUG, "aoe=%s,vnode=%s" % (aoe,vnode))
- pbs.logmsg(pbs.LOG_DEBUG, "fake os provisioning script")
- e.accept(0)
- """
- reject_runjob_hook = """
- import pbs
- e = pbs.event()
- j = e.job
- pbs.logmsg(pbs.LOG_DEBUG, "job " + str(j) + " solution " + str(j.exec_vnode))
- e.reject()
- """
- def setUp(self):
- if len(self.moms) < 2:
- self.skipTest("Provide at least 2 moms while invoking test")
- TestFunctional.setUp(self)
- # This test suite expects the the first mom given with "-p moms"
- # benchpress option to be remote mom. In case this assumption
- # is not true then it reverses the order in the setup.
- if self.moms.values()[0].shortname == self.server.shortname:
- self.momA = self.moms.values()[1]
- self.momB = self.moms.values()[0]
- else:
- self.momA = self.moms.values()[0]
- self.momB = self.moms.values()[1]
- self.hostA = self.momA.shortname
- self.hostB = self.momB.shortname
- # Remove all nodes
- self.server.manager(MGR_CMD_DELETE, NODE, None, "")
- # Restart PBS
- self.server.restart()
- # Create node
- self.server.manager(MGR_CMD_CREATE, NODE, None, self.hostA)
- self.server.manager(MGR_CMD_CREATE, NODE, None, self.hostB)
- self.server.expect(NODE, {'state': 'free'}, id=self.hostA)
- self.server.expect(NODE, {'state': 'free'}, id=self.hostB)
- # Set hostA provisioning attributes.
- a = {'provision_enable': 'true',
- 'resources_available.ncpus': '2',
- 'resources_available.aoe': 'App1,osimage1'}
- self.server.manager(
- MGR_CMD_SET, NODE, a, id=self.hostA, expect=True)
- self.server.manager(MGR_CMD_UNSET, NODE, id=self.hostA,
- attrib='current_aoe', expect=True)
- # Set hostB ncpus to 12
- a = {'resources_available.ncpus': '12'}
- self.server.manager(
- MGR_CMD_SET, NODE, a, id=self.hostB, expect=True)
- # Setup provisioning hook.
- a = {'event': 'provision', 'enabled': 'True', 'alarm': '300'}
- rv = self.server.create_import_hook(
- 'fake_prov_hook', a, self.fake_prov_hook, overwrite=True)
- self.assertTrue(rv)
- self.server.manager(MGR_CMD_SET, SERVER, {'log_events': 2047},
- expect=True)
- def test_app_provisioning(self):
- """
- Test application provisioning
- """
- j = Job(TEST_USER1)
- j.set_sleep_time(5)
- j.set_attributes({'Resource_List.select': '1:aoe=App1'})
- jid = self.server.submit(j)
- # Job should start running after provisioining script finish
- # executing.
- # Since this is application provisioining, mom restart is
- # not needed.
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- # Current aoe on momA, should be set to the requested aoe in job.
- self.server.expect(NODE, {'current_aoe': 'App1'}, id=self.hostA)
- self.server.log_match(
- "fake application provisioning script",
- max_attempts=20,
- interval=1)
- def test_os_provisioning(self):
- """
- Test os provisioning
- """
- j = Job(TEST_USER1)
- j.set_sleep_time(10)
- j.set_attributes({'Resource_List.select': '1:aoe=osimage1'})
- jid = self.server.submit(j)
- # Job will start and wait for provisioning to complete.
- self.server.expect(JOB, {ATTR_substate: '71'}, id=jid)
- self.server.log_match("fake os provisioning script",
- max_attempts=60,
- interval=1)
- # Since this is OS provisioining, mom restart is
- # required to finish provisioining. Sending SIGHUP
- # to the provisioning mom will also work, but restart
- # is more apt to simulate real world scenario.
- self.momA.restart()
- # Current aoe on momA should be set to the requested aoe in job.
- self.server.expect(NODE, {'current_aoe': 'osimage1'}, id=self.hostA)
- # After mom restart job execution should start, as
- # OS provisioining completes affer mom restart.
- self.server.expect(JOB, {ATTR_state: 'R'}, id=jid)
- def test_subchunk_application_provisioning(self):
- """
- Test application provisioning job request consist of subchunks
- with and without aoe resource.
- """
- j = Job(TEST_USER1)
- j.set_attributes({'Resource_List.select':
- '1:ncpus=1:aoe=App1+1:ncpus=12'})
- j.set_sleep_time(5)
- jid = self.server.submit(j)
- self.server.expect(JOB, {ATTR_state: 'R'}, id=jid)
- self.server.expect(JOB, ATTR_execvnode, id=jid, op=SET)
- nodes = j.get_vnodes(j.exec_vnode)
- self.server.log_match("fake application provisioning script",
- max_attempts=20,
- interval=1)
- self.assertTrue((nodes[0] == self.momA.shortname and
- nodes[1] == self.momB.shortname) or
- (nodes[0] == self.momB.shortname and
- nodes[1] == self.momA.shortname))
- # Current aoe on momA, should be set to the requested aoe in job.
- self.server.expect(NODE, {'current_aoe': 'App1'}, id=self.hostA)
- def test_subchunk_os_provisioning(self):
- """
- Test os provisioning job request consist of subchunks
- with and without aoe resource.
- """
- a = {'Resource_List.select': '1:aoe=osimage1+1:ncpus=12'}
- j = Job(TEST_USER1, a)
- j.set_sleep_time(10)
- jid = self.server.submit(j)
- self.server.expect(JOB, ATTR_execvnode, id=jid, op=SET)
- nodes = j.get_vnodes(j.exec_vnode)
- self.assertTrue((nodes[0] == self.momA.shortname and
- nodes[1] == self.momB.shortname) or
- (nodes[0] == self.momB.shortname and
- nodes[1] == self.momA.shortname))
- self.momA.restart()
- # After mom restart job execution should start, as
- # OS provisioining completes affer mom restart.
- self.server.expect(JOB, {ATTR_state: 'R'}, id=jid)
- # Current aoe on momA, should be set to the requested aoe in job.
- self.server.expect(NODE, {'current_aoe': 'osimage1'}, id=self.hostA)
- def test_job_wide_provisioining_request(self):
- """
- Test jobs with jobwide aoe resource request.
- """
- # Below job will not run, since resource requested are job-wide,
- # and no single node have all the requested resource.
- j = Job(TEST_USER1)
- j.set_sleep_time(5)
- j.set_attributes({"Resource_List.aoe": "App1",
- "Resource_List.ncpus": 12})
- jid = self.server.submit(j)
- self.server.expect(JOB, {ATTR_state: 'Q',
- ATTR_comment:
- (MATCH_RE, 'Not Running: Insufficient ' +
- 'amount of resource: .*')}, id=jid)
- j = Job(TEST_USER1)
- j.set_attributes({"Resource_List.aoe": "App1",
- "Resource_List.ncpus": 1})
- jid = self.server.submit(j)
- self.server.expect(JOB, {ATTR_state: 'R'}, id=jid)
- # Current aoe on momA, should be set to the requested aoe in job.
- self.server.expect(NODE, {'current_aoe': 'App1'}, id=self.hostA)
- def test_multiple_aoe_request(self):
- """
- Test jobs with multiple similar/various aoe request in subchunks.
- Job request cosisting of multiple subchunks with different aoe will
- fail submission, Whereas job request with same aoe across multiple
- subchunks should be succesful.
- """
- a1 = {'Resource_List.select':
- '1:ncpus=1:aoe=App1+1:ncpus=12:aoe=osimage1'}
- a2 = {'Resource_List.select':
- '1:ncpus=1:aoe=App1+1:ncpus=12:aoe=App1'}
- # Below job will fail submission, since different aoe's requested,
- # across multiple subchunks.
- j = Job(TEST_USER1)
- j.set_attributes(a1)
- j.set_sleep_time(5)
- jid = None
- try:
- jid = self.server.submit(j)
- self.assertTrue(jid is None, 'Job successfully submitted' +
- 'when it should have failed')
- except PbsSubmitError as e:
- self.assertTrue('Invalid provisioning request in chunk(s)'
- in e.msg[0],
- 'Job submission failed, but due to ' +
- 'unexpected reason.\n%s' % e.msg[0])
- self.logger.info("Job submission failed, as expected")
- # Below job will get submitted, since same aoe requested,
- # across multiple subchunks.
- j = Job(TEST_USER1)
- j.set_attributes(a2)
- jid = self.server.submit(j)
- self.assertTrue(jid is not None, 'Job submission failed' +
- 'when it should have succeeded')
- self.logger.info("Job submission succeeded, as expected")
- def test_provisioning_with_placement(self):
- """
- Test provisioining job with various placement options.
- """
- # Below job will not run, since placement is set to pack.
- # and no single node have all the requested resource.
- j = Job(TEST_USER1)
- j.set_attributes({'Resource_List.select':
- '1:ncpus=1:aoe=App1+1:ncpus=12',
- 'Resource_List.place': 'pack'})
- j.set_sleep_time(5)
- jid = self.server.submit(j)
- self.server.expect(JOB, {ATTR_state: 'Q',
- ATTR_comment:
- (MATCH_RE, 'Not Running: Insufficient ' +
- 'amount of resource: .*')}, id=jid)
- # Below job will run with placement set to pack.
- # since there is only one node with the requested resource.
- j = Job(TEST_USER1)
- j.set_attributes({'Resource_List.select':
- '1:ncpus=1:aoe=App1+1:ncpus=1',
- 'Resource_List.place': 'pack'})
- j.set_sleep_time(5)
- jid = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- self.server.expect(JOB, ATTR_execvnode, id=jid, op=SET)
- nodes = j.get_vnodes(j.exec_vnode)
- self.assertTrue(nodes[0] == self.momA.shortname)
- # Current aoe on momA, should be set to the requested aoe in job.
- self.server.expect(NODE, {'current_aoe': 'App1'}, id=self.hostA)
- # This was needed since sometime the above job takes longer
- # to finish and release the resources. This causes delay for
- # the next job to start and can probably fail the test.
- self.server.cleanup_jobs(extend='force')
- # Below job will run on two node with placement set to scatter.
- # even though single node can satisfy both the requested chunks.
- j = Job(TEST_USER1)
- j.set_attributes({'Resource_List.select':
- '1:ncpus=1:aoe=App1+1:ncpus=1',
- 'Resource_List.place': 'scatter'})
- j.set_sleep_time(5)
- jid = self.server.submit(j)
- self.server.expect(JOB, ATTR_execvnode, id=jid, op=SET)
- nodes = j.get_vnodes(j.exec_vnode)
- self.assertTrue((nodes[0] == self.momA.shortname and
- nodes[1] == self.momB.shortname) or
- (nodes[0] == self.momB.shortname and
- nodes[1] == self.momA.shortname))
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- # Current aoe on momA, should be set to the requested aoe in job.
- self.server.expect(NODE, {'current_aoe': 'App1'}, id=self.hostA)
- def test_sched_provisioning_response_with_runjob(self):
- """
- Test that if one provisioning job fails to run then scheduler
- correctly provides the node solution for the second job with aoe in
- it.
- """
- # Setup runjob hook.
- a = {'event': 'runjob', 'enabled': 'True'}
- rv = self.server.create_import_hook(
- 'reject_runjob_hook', a, self.reject_runjob_hook, overwrite=True)
- self.assertTrue(rv)
- # Set current aoe to App1
- self.server.manager(MGR_CMD_SET, NODE, id=self.hostA,
- attrib={'current_aoe': 'App1'}, expect=True)
- # Turn on scheduling
- self.server.manager(MGR_CMD_SET,
- SERVER, {'scheduling': 'False'})
- # submit two provisioning jobs
- a = {'Resource_List.select': '1:aoe=osimage1:ncpus=1+1:ncpus=4',
- 'Resource_List.place': 'vscatter'}
- j = Job(TEST_USER1, attrs=a)
- jid1 = self.server.submit(j)
- jid2 = self.server.submit(j)
- # Turn off scheduling
- self.server.manager(MGR_CMD_SET,
- SERVER, {'scheduling': 'True'})
- # Job will be rejected by runjob hook and it should log
- # correct exec_vnode for each job.
- msg = "job %s " + "solution (%s:aoe=osimage1:ncpus=1)+(%s:ncpus=4)"
- job1_msg = msg % (jid1, self.hostA, self.hostB)
- job2_msg = msg % (jid2, self.hostA, self.hostB)
- self.server.log_match(job1_msg)
- self.server.log_match(job2_msg)
- def test_sched_provisioning_response(self):
- """
- Test that if scheduler could not find node solution for one
- provisioning job then it will find the correct solution for the
- second one.
- """
- # Set current aoe to osimage1
- self.server.manager(MGR_CMD_SET, NODE, id=self.hostA,
- attrib={'current_aoe': 'osimage1'}, expect=True)
- # submit one job that will run on local node
- a = {'Resource_List.select': '1:ncpus=10'}
- j1 = Job(TEST_USER1, attrs=a)
- j1.set_sleep_time(200)
- jid1 = self.server.submit(j1)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- # Turn off scheduling
- self.server.manager(MGR_CMD_SET,
- SERVER, {'scheduling': 'False'})
- # submit two provisioning jobs where first job will not be able
- # to run and second one can
- a = {'Resource_List.select': '1:aoe=App1:ncpus=1+1:ncpus=3',
- 'Resource_List.place': 'vscatter'}
- j2 = Job(TEST_USER1, attrs=a)
- jid2 = self.server.submit(j2)
- a = {'Resource_List.select': '1:aoe=App1:ncpus=1+1:ncpus=2',
- 'Resource_List.place': 'vscatter'}
- j3 = Job(TEST_USER1, attrs=a)
- jid3 = self.server.submit(j3)
- # Turn on scheduling
- self.server.manager(MGR_CMD_SET,
- SERVER, {'scheduling': 'True'})
- ev_format = "(%s:aoe=App1:ncpus=1)+(%s:ncpus=2)"
- solution = ev_format % (self.hostA, self.hostB)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
- job_state = self.server.status(JOB, id=jid3)
- self.assertEqual(job_state[0]['exec_vnode'], solution)
|