123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943 |
- # coding: utf-8
- # Copyright (C) 1994-2018 Altair Engineering, Inc.
- # For more information, contact Altair at www.altair.com.
- #
- # This file is part of the PBS Professional ("PBS Pro") software.
- #
- # Open Source License Information:
- #
- # PBS Pro is free software. You can redistribute it and/or modify it under the
- # terms of the GNU Affero General Public License as published by the Free
- # Software Foundation, either version 3 of the License, or (at your option) any
- # later version.
- #
- # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
- # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- # FOR A PARTICULAR PURPOSE.
- # See the GNU Affero General Public License for more details.
- #
- # You should have received a copy of the GNU Affero General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
- #
- # Commercial License Information:
- #
- # For a copy of the commercial license terms and conditions,
- # go to: (http://www.pbspro.com/UserArea/agreement.html)
- # or contact the Altair Legal Department.
- #
- # Altair’s dual-license business model allows companies, individuals, and
- # organizations to create proprietary derivative works of PBS Pro and
- # distribute them - whether embedded or bundled with other software -
- # under a commercial license agreement.
- #
- # Use of Altair’s trademarks, including but not limited to "PBS™",
- # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
- # trademark licensing policies.
- from tests.functional import *
- class TestNodeBuckets(TestFunctional):
- """
- Test basic functionality of node buckets.
- """
- def setUp(self):
- TestFunctional.setUp(self)
- day = time.strftime("%Y%m%d", time.localtime(time.time()))
- filename = os.path.join(self.server.pbs_conf['PBS_HOME'],
- 'sched_logs', day)
- self.du.rm(path=filename, force=True, sudo=True, level=logging.DEBUG2)
- self.colors = \
- ['red', 'orange', 'yellow', 'green', 'blue', 'indigo', 'violet']
- self.shapes = ['circle', 'square', 'triangle',
- 'diamond', 'pyramid', 'sphere', 'cube']
- self.letters = ['A', 'B', 'C', 'D', 'E', 'F', 'G']
- self.server.manager(MGR_CMD_CREATE, RSC,
- {'type': 'string', 'flag': 'h'}, id='color')
- self.server.manager(MGR_CMD_CREATE, RSC,
- {'type': 'string_array', 'flag': 'h'}, id='shape')
- self.server.manager(MGR_CMD_CREATE, RSC,
- {'type': 'string_array', 'flag': 'h'}, id='letter')
- self.server.manager(MGR_CMD_CREATE, RSC,
- {'type': 'boolean', 'flag': 'h'}, id='bool')
- a = {'resources_available.ncpus': 2, 'resources_available.mem': '8gb'}
- # 10010 nodes since it divides into 7 evenly.
- # Each node bucket will have 1430 nodes in it
- self.server.create_vnodes(name='vnode', attrib=a, num=10010,
- mom=self.mom, sharednode=False,
- expect=False, attrfunc=self.cust_attr_func)
- # Make sure all the nodes are in state free. We can't let
- # create_vnodes() do this because it does a pbsnodes -v on each vnode.
- # This takes a long time.
- self.server.expect(NODE, {'state=free': (GE, 10010)})
- self.scheduler.add_resource('color')
- self.scheduler.set_sched_config({'log_filter': '2048'})
- def cust_attr_func(self, name, totalnodes, numnode, attribs):
- """
- Add resources to vnodes. There are 10010 nodes, which means 1430
- nodes of each color, letter, and shape. The value of bool is True
- for the last 5005 nodes and unset for the first 5005 nodes
- """
- a = {'resources_available.color': self.colors[numnode / 1430],
- 'resources_available.shape': self.shapes[numnode % 7],
- 'resources_available.letter': self.letters[numnode % 7]}
- if numnode / 5005 == 0:
- a['resources_available.bool'] = 'True'
- # Yellow buckets get a higher priority
- if numnode / 1430 == 2:
- a['Priority'] = 100
- return dict(attribs.items() + a.items())
- def check_normal_path(self, sel='2:ncpus=2:mem=1gb', pl='scatter:excl',
- queue='workq'):
- """
- Check if a job runs in the normal code path
- """
- a = {'Resource_List.select': sel, 'Resource_List.place': pl,
- 'queue': queue}
- j = Job(TEST_USER, attrs=a)
- jid = self.server.submit(j)
- self.scheduler.log_match(jid + ';Evaluating subchunk', n=10000)
- self.server.delete(jid, wait=True)
- @timeout(450)
- def test_basic(self):
- """
- Request nodes of a specific color and make sure they are correctly
- allocated to the job
- """
- chunk = '4:ncpus=1:color=yellow'
- a = {'Resource_List.select': chunk,
- 'Resource_List.place': 'scatter:excl'}
- J = Job(TEST_USER, a)
- jid = self.server.submit(J)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- self.scheduler.log_match(jid + ';Chunk: ' + chunk, n=10000)
- js = self.server.status(JOB, id=jid)
- nodes = J.get_vnodes(js[0]['exec_vnode'])
- for node in nodes:
- n = self.server.status(NODE, 'resources_available.color', id=node)
- self.assertTrue('yellow' in
- n[0]['resources_available.color'])
- @timeout(450)
- def test_multi_bucket(self):
- """
- Request two different chunk types which need to be allocated from
- different buckets and make sure they are allocated correctly.
- """
- a = {'Resource_List.select':
- '4:ncpus=1:color=yellow+4:ncpus=1:color=blue',
- 'Resource_List.place': 'scatter:excl'}
- J = Job(TEST_USER, a)
- jid = self.server.submit(J)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- self.scheduler.log_match(jid + ';Chunk: ', n=10000)
- js = self.server.status(JOB, id=jid)
- nodes = J.get_vnodes(js[0]['exec_vnode'])
- # Yellow nodes were requested first.
- # Make sure they come before the blue nodes.
- for i in range(4):
- n = self.server.status(NODE, id=nodes[i])
- self.assertTrue('yellow' in n[0]['resources_available.color'])
- for i in range(4, 8):
- n = self.server.status(NODE, id=nodes[i])
- self.assertTrue('blue' in n[0]['resources_available.color'])
- @timeout(450)
- def test_multi_bucket2(self):
- """
- Request nodes from all 7 different buckets and see them allocated
- correctly
- """
- select = ""
- for c in self.colors:
- select += "1:ncpus=1:color=%s+" % (c)
- # remove the trailing '+'
- select = select[:-1]
- a = {'Resource_List.select': select,
- 'Resource_List.place': 'scatter:excl'}
- J = Job(TEST_USER, a)
- jid = self.server.submit(J)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- self.scheduler.log_match(jid + ';Chunk:', n=10000)
- js = self.server.status(JOB, id=jid)
- nodes = J.get_vnodes(js[0]['exec_vnode'])
- for i, node in enumerate(nodes):
- n = self.server.status(NODE, id=node)
- self.assertTrue(self.colors[i] in
- n[0]['resources_available.color'])
- @timeout(450)
- def test_not_run(self):
- """
- Request more nodes of one color that is available to make sure
- the job is not run on incorrect nodes.
- """
- chunk = '1431:ncpus=1:color=yellow'
- a = {'Resource_List.select': chunk,
- 'Resource_List.place': 'scatter:excl'}
- J = Job(TEST_USER, a)
- jid = self.server.submit(J)
- a = {'comment': (MATCH_RE, '^Can Never Run'),
- 'job_state': 'Q'}
- self.server.expect(JOB, a, attrop=PTL_AND, id=jid)
- self.scheduler.log_match(jid + ';Chunk: ' + chunk, n=10000)
- @timeout(450)
- def test_calendaring1(self):
- """
- Test to see that nodes that are used in the future for
- calendared jobs are not used for filler jobs that would
- distrupt the scheduled time.
- """
- self.scheduler.set_sched_config({'strict_ordering': 'True'})
- chunk1 = '1:ncpus=1'
- a = {'Resource_List.select': chunk1,
- 'Resource_List.place': 'scatter:excl',
- 'Resource_List.walltime': '1:00:00'}
- j = Job(TEST_USER, attrs=a)
- jid1 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- self.scheduler.log_match(jid1 + ';Chunk: ' + chunk1, n=10000)
- chunk2 = '10010:ncpus=1'
- a = {'Resource_List.select': chunk2,
- 'Resource_List.place': 'scatter:excl',
- 'Resource_List.walltime': '2:00:00'}
- j = Job(TEST_USER, attrs=a)
- jid2 = self.server.submit(j)
- self.server.expect(JOB, 'comment', op=SET, id=jid2)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
- self.scheduler.log_match(jid2 + ';Chunk: ' + chunk2, n=10000)
- chunk3 = '2:ncpus=1'
- a = {'Resource_List.select': chunk3,
- 'Resource_List.place': 'scatter:excl',
- 'Resource_List.walltime': '30:00'}
- j = Job(TEST_USER, attrs=a)
- jid3 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
- self.scheduler.log_match(jid3 + ';Chunk: ' + chunk3, n=10000)
- a = {'Resource_List.select': chunk3,
- 'Resource_List.place': 'scatter:excl',
- 'Resource_List.walltime': '2:30:00'}
- j = Job(TEST_USER, attrs=a)
- jid4 = self.server.submit(j)
- self.server.expect(JOB, 'comment', op=SET, id=jid4)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid4)
- self.scheduler.log_match(jid4 + ';Chunk: ' + chunk3, n=10000)
- @timeout(450)
- def test_calendaring2(self):
- """
- Test that nodes that a reservation calendared on them later on
- are used before totally free nodes
- """
- self.scheduler.set_sched_config({'strict_ordering': 'True'})
- now = int(time.time())
- a = {'Resource_List.select': '1:vnode=vnode[2865]+1:vnode=vnode[2870]',
- 'Resource_List.place': 'scatter:excl',
- 'Resource_List.walltime': '1:00:00',
- 'reserve_start': now + 3600, 'reserve_end': now + 7200}
- r = Reservation(TEST_USER, attrs=a)
- rid = self.server.submit(r)
- self.server.expect(RESV, {'reserve_state':
- (MATCH_RE, 'RESV_CONFIRMED|2')}, id=rid)
- chunk = '2:ncpus=1:color=yellow'
- a = {'Resource_List.select': chunk,
- 'Resource_List.place': 'scatter:excl',
- 'Resource_List.walltime': '30:00'}
- j = Job(TEST_USER, attrs=a)
- jid = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- self.scheduler.log_match(jid + ';Chunk: ' + chunk, n=10000)
- s = self.server.status(JOB, 'exec_vnode', id=jid)
- n = j.get_vnodes(s[0]['exec_vnode'])
- msg = 'busy_later nodes not chosen first'
- self.assertTrue('vnode[2865]' in n, msg)
- self.assertTrue('vnode[2870]' in n, msg)
- @timeout(450)
- def test_calendaring3(self):
- """
- Test that a future reservation's nodes are used first for a job
- that is put into the calendar.
- """
- self.scheduler.set_sched_config({'strict_ordering': 'True'})
- now = int(time.time())
- a = {'Resource_List.select': '1:vnode=vnode[2865]+1:vnode=vnode[2870]',
- 'Resource_List.place': 'scatter:excl',
- 'Resource_List.walltime': '1:00:00',
- 'reserve_start': now + 3600, 'reserve_end': now + 7200}
- r = Reservation(TEST_USER, attrs=a)
- rid = self.server.submit(r)
- self.server.expect(RESV, {'reserve_state':
- (MATCH_RE, 'RESV_CONFIRMED|2')}, id=rid)
- chunk1 = '1430:ncpus=1:color=yellow'
- a = {'Resource_List.select': chunk1,
- 'Resource_List.place': 'scatter:excl',
- 'Resource_List.walltime': '30:00'}
- j = Job(TEST_USER, attrs=a)
- jid = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- self.scheduler.log_match(jid + ';Chunk: ' + chunk1, n=10000)
- chunk2 = '2:ncpus=1:color=yellow'
- a = {'Resource_List.select': chunk2,
- 'Resource_List.place': 'scatter:excl',
- 'Resource_List.walltime': '15:00'}
- j2 = Job(TEST_USER, attrs=a)
- jid2 = self.server.submit(j2)
- self.scheduler.log_match(jid2 + ';Chunk: ' + chunk2, n=10000)
- self.server.expect(JOB, 'estimated.exec_vnode', op=SET, id=jid2)
- s = self.server.status(JOB, 'estimated.exec_vnode', id=jid2)
- n = j2.get_vnodes(s[0]['estimated.exec_vnode'])
- msg = 'busy_later nodes not chosen first'
- self.assertTrue('vnode[2865]' in n, msg)
- self.assertTrue('vnode[2870]' in n, msg)
- @timeout(450)
- def test_buckets_and_non(self):
- """
- Test that jobs requesting buckets and not requesting buckets
- play nice together
- """
- # vnode[1435] is orange
- a = {'Resource_List.ncpus': 1,
- 'Resource_List.vnode': 'vnode[1435]'}
- j1 = Job(TEST_USER, attrs=a)
- jid1 = self.server.submit(j1)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- self.scheduler.log_match(jid1 + ';Evaluating subchunk', n=10000)
- chunk = '1429:ncpus=1:color=orange'
- a = {'Resource_List.select': chunk,
- 'Resource_List.place': 'scatter:excl'}
- j2 = Job(TEST_USER, attrs=a)
- jid2 = self.server.submit(j2)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
- self.scheduler.log_match(jid2 + ';Chunk: ' + chunk, n=10000)
- s1 = self.server.status(JOB, 'exec_vnode', id=jid1)
- s2 = self.server.status(JOB, 'exec_vnode', id=jid2)
- nodes1 = j1.get_vnodes(s1[0]['exec_vnode'])
- nodes2 = j2.get_vnodes(s2[0]['exec_vnode'])
- msg = 'Job 1 and Job 2 are sharing nodes'
- for n in nodes2:
- self.assertNotEqual(n, nodes1[0], msg)
- @timeout(600)
- def test_not_buckets(self):
- """
- Test to make sure the jobs that should use the standard node searching
- code path do not use the bucket code path
- """
- # Running a 10010 cpu job through the normal code path spams the log.
- # We don't care about it, so there is no reason to increase
- # the log size by so much.
- self.scheduler.set_sched_config({'log_filter': '3328'})
- # Run a job on all nodes leaving 1 cpus available on each node
- j = Job(TEST_USER, {'Resource_List.select': '10010:ncpus=1',
- 'Resource_List.place': 'scatter'})
- j.set_sleep_time(600)
- jid = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- self.scheduler.set_sched_config({'log_filter': '2048'})
- # Node sorting via unused resources uses the standard code path
- self.logger.info('Test node_sort_key with unused resources')
- a = {'node_sort_key': '\"ncpus HIGH unused\"'}
- self.scheduler.set_sched_config(a)
- self.check_normal_path()
- self.scheduler.revert_to_defaults()
- schd_attr = {'log_filter': '2048'}
- self.scheduler.set_sched_config(schd_attr)
- # provisioning_policy: avoid_provisioning uses the standard code path
- self.logger.info('Test avoid_provision')
- a = {'provision_policy': 'avoid_provision'}
- self.scheduler.set_sched_config(a)
- self.check_normal_path()
- self.scheduler.revert_to_defaults()
- self.scheduler.add_resource('color')
- self.scheduler.set_sched_config(schd_attr)
- # the bucket codepath requires excl
- self.logger.info('Test different place specs')
- self.check_normal_path(pl='scatter:shared')
- self.check_normal_path(pl='free')
- # can't request host or vnode resources on the bucket codepath
- self.logger.info('Test jobs requesting host and vnode')
- self.check_normal_path(sel='1:ncpus=2:host=vnode[0]')
- self.check_normal_path(sel='1:ncpus=2:vnode=vnode[0]')
- # suspended jobs use the normal codepath
- self.logger.info('Test suspended job')
- a = {'queue_type': 'execution', 'started': 'True', 'enabled': 'True',
- 'priority': 200}
- self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='expressq')
- self.server.delete(jid, wait=True)
- a = {'Resource_List.select': '1430:ncpus=1:color=orange',
- 'Resource_List.place': 'scatter:excl'}
- j2 = Job(TEST_USER, a)
- jid2 = self.server.submit(j2)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
- a = {'Resource_List.select': '1:ncpus=1:color=orange',
- 'queue': 'expressq'}
- j3 = Job(TEST_USER, a)
- jid3 = self.server.submit(j3)
- self.server.expect(JOB, {'job_state': 'S'}, id=jid2)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
- self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
- self.scheduler.log_match(jid3 + ';Evaluating subchunk', n=10000)
- self.server.delete([jid2, jid3], wait=True)
- # Checkpointed jobs use normal code path
- self.logger.info('Test checkpointed job')
- chk_script = """#!/bin/bash
- kill $1
- exit 0
- """
- self.chk_file = self.du.create_temp_file(body=chk_script)
- self.du.chmod(path=self.chk_file, mode=0o755)
- self.du.chown(path=self.chk_file, uid=0, gid=0, sudo=True)
- c = {'$action': 'checkpoint_abort 30 !' + self.chk_file + ' %sid'}
- self.mom.add_config(c)
- self.scheduler.set_sched_config({'preempt_order': 'C'})
- attrs = {'Resource_List.select': '1430:ncpus=1:color=orange',
- 'Resource_List.place': 'scatter:excl'}
- j_c1 = Job(TEST_USER, attrs)
- jid_c1 = self.server.submit(j_c1)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid_c1)
- self.scheduler.log_match(
- jid_c1 + ';Chunk: 1430:ncpus=1:color=orange', n=10000)
- a = {'Resource_List.select': '1:ncpus=1:color=orange',
- 'queue': 'expressq'}
- j_c2 = Job(TEST_USER, a)
- jid_c2 = self.server.submit(j_c2)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid_c1)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid_c2)
- self.scheduler.log_match(
- jid_c1 + ";Job preempted by checkpointing", n=10000)
- self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
- self.scheduler.log_match(jid_c2 + ';Evaluating subchunk', n=10000)
- self.server.delete([jid_c1, jid_c2], wait=True)
- # Job's in reservations use the standard codepath
- self.logger.info('Test job in reservation')
- now = int(time.time())
- a = {'Resource_List.select': '4:ncpus=2:mem=4gb',
- 'Resource_List.place': 'scatter:excl',
- 'reserve_start': now + 30, 'reserve_end': now + 120}
- r = Reservation(TEST_USER, a)
- rid = self.server.submit(r)
- self.server.expect(RESV,
- {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')})
- self.logger.info('Waiting 30s for reservation to start')
- self.server.expect(RESV,
- {'reserve_state': (MATCH_RE, 'RESV_RUNNING|5')},
- offset=30)
- r_queue = rid.split('.')[0]
- self.check_normal_path(sel='1:ncpus=3', queue=r_queue)
- self.server.delete(rid)
- # Jobs on multi-vnoded systems use the standard codepath
- self.logger.info('Test job on multi-vnoded system')
- a = {'resources_available.ncpus': 2, 'resources_available.mem': '8gb'}
- self.server.create_vnodes('vnode', a, 8, self.mom,
- sharednode=False, vnodes_per_host=4)
- self.check_normal_path(sel='2:ncpus=8')
- @timeout(450)
- def test_multi_vnode_resv(self):
- """
- Test that node buckets do not get in the way of running jobs on
- multi-vnoded systems in reservations
- """
- a = {'resources_available.ncpus': 2, 'resources_available.mem': '8gb'}
- self.server.create_vnodes('vnode', a, 12, self.mom,
- sharednode=False, vnodes_per_host=4,
- attrfunc=self.cust_attr_func)
- now = int(time.time())
- a = {'Resource_List.select': '8:ncpus=1',
- 'Resource_List.place': 'vscatter',
- 'reserve_start': now + 30,
- 'reserve_end': now + 3600}
- r = Reservation(TEST_USER, attrs=a)
- rid = self.server.submit(r)
- a = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
- self.server.expect(RESV, a, id=rid)
- self.logger.info('Waiting 30s for reservation to start')
- a['reserve_state'] = (MATCH_RE, 'RESV_RUNNING|5')
- self.server.expect(RESV, a, id=rid, offset=30)
- a = {'Resource_List.select': '2:ncpus=1',
- 'Resource_List.place': 'group=shape',
- 'queue': rid.split('.')[0]}
- j = Job(TEST_USER, attrs=a)
- jid = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- self.scheduler.log_match(jid + ';Evaluating subchunk', n=10000)
- ev = self.server.status(JOB, 'exec_vnode', id=jid)
- used_nodes = j.get_vnodes(ev[0]['exec_vnode'])
- n = self.server.status(NODE, 'resources_available.shape')
- s = [x['resources_available.shape']
- for x in n if x['id'] in used_nodes]
- self.assertEqual(len(set(s)), 1,
- "Job1 ran in more than one placement set")
- @timeout(450)
- def test_bucket_sort(self):
- """
- Test if buckets are sorted properly: all of the yellow bucket
- also has priority 100. It should be the first bucket.
- """
- a = {'node_sort_key': '\"sort_priority HIGH\"'}
- self.scheduler.set_sched_config(a)
- chunk = '2:ncpus=1'
- j = Job(TEST_USER, {'Resource_List.select': chunk,
- 'Resource_List.place': 'scatter:excl'})
- jid = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- self.scheduler.log_match(jid + ';Chunk: ' + chunk, n=10000)
- jobs = self.server.status(JOB, {'exec_vnode'})
- jn = j.get_vnodes(jobs[0]['exec_vnode'])
- n1 = self.server.status(NODE, 'resources_available.color',
- id=jn[0])
- n2 = self.server.status(NODE, 'resources_available.color',
- id=jn[1])
- c1 = n1[0]['resources_available.color']
- c2 = n2[0]['resources_available.color']
- self.assertEquals(c1, 'yellow', "Job didn't run on yellow nodes")
- self.assertEquals(c2, 'yellow', "Job didn't run on yellow nodes")
- @timeout(450)
- def test_psets(self):
- """
- Test placement sets with node buckets
- """
- a = {'node_group_key': 'shape', 'node_group_enable': 'True',
- 'scheduling': 'False'}
- self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
- chunk = '1430:ncpus=1'
- a = {'Resource_List.select': chunk,
- 'Resource_List.place': 'scatter:excl'}
- j1 = Job(TEST_USER, attrs=a)
- jid1 = self.server.submit(j1)
- j2 = Job(TEST_USER, attrs=a)
- jid2 = self.server.submit(j2)
- self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
- self.scheduler.log_match(jid1 + ';Chunk: ' + chunk, n=10000)
- self.scheduler.log_match(jid2 + ';Chunk: ' + chunk, n=10000)
- ev = self.server.status(JOB, 'exec_vnode', id=jid1)
- used_nodes1 = j1.get_vnodes(ev[0]['exec_vnode'])
- n = self.server.status(NODE, 'resources_available.shape')
- s = [x['resources_available.shape']
- for x in n if x['id'] in used_nodes1]
- self.assertEqual(len(set(s)), 1,
- "Job1 ran in more than one placement set")
- ev = self.server.status(JOB, 'exec_vnode', id=jid2)
- used_nodes2 = j2.get_vnodes(ev[0]['exec_vnode'])
- s = [x['resources_available.shape']
- for x in n if x['id'] in used_nodes2]
- self.assertEqual(len(set(s)), 1,
- "Job2 ran in more than one placement set")
- for node in used_nodes1:
- self.assertNotIn(node, used_nodes2, 'Jobs share nodes: ' + node)
- @timeout(450)
- def test_psets_calendaring(self):
- """
- Test that jobs in the calendar fit within a placement set
- """
- self.scheduler.set_sched_config({'strict_ordering': 'True'})
- self.server.manager(MGR_CMD_SET, SERVER, {'backfill_depth': 5})
- chunk1 = '10010:ncpus=1'
- a = {'Resource_List.select': chunk1,
- 'Resource_List.place': 'scatter:excl',
- 'Resource_List.walltime': '1:00:00'}
- j1 = Job(TEST_USER, attrs=a)
- jid1 = self.server.submit(j1)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- self.scheduler.log_match(jid1 + ';Chunk: ' + chunk1, n=10000)
- svr_attr = {'node_group_key': 'shape', 'node_group_enable': 'True'}
- self.server.manager(MGR_CMD_SET, SERVER, svr_attr)
- chunk2 = '1430:ncpus=1'
- a['Resource_List.select'] = chunk2
- j2 = Job(TEST_USER, a)
- jid2 = self.server.submit(j2)
- self.scheduler.log_match(jid2 + ';Chunk: ' + chunk2, n=10000)
- self.scheduler.log_match(jid2 + ';Job is a top job', n=10000)
- n = self.server.status(NODE, 'resources_available.shape')
- ev = self.server.status(JOB, 'estimated.exec_vnode', id=jid2)
- used_nodes2 = j2.get_vnodes(ev[0]['estimated.exec_vnode'])
- s = [x['resources_available.shape']
- for x in n if x['id'] in used_nodes2]
- self.assertEqual(len(set(s)), 1,
- "Job1 will run in more than one placement set")
- j3 = Job(TEST_USER, a)
- jid3 = self.server.submit(j3)
- self.scheduler.log_match(jid3 + ';Chunk: ' + chunk2, n=10000)
- self.scheduler.log_match(jid3 + ';Job is a top job', n=10000)
- ev = self.server.status(JOB, 'estimated.exec_vnode', id=jid3)
- used_nodes3 = j3.get_vnodes(ev[0]['estimated.exec_vnode'])
- s = [x['resources_available.shape']
- for x in n if x['id'] in used_nodes3]
- self.assertEqual(len(set(s)), 1,
- "Job1 will run in more than one placement set")
- for node in used_nodes2:
- self.assertNotIn(node, used_nodes3,
- 'Jobs will share nodes: ' + node)
- @timeout(450)
- def test_place_group(self):
- """
- Test node buckets with place=group
- """
- chunk = '1430:ncpus=1'
- a = {'Resource_List.select': chunk,
- 'Resource_List.place': 'scatter:excl:group=letter'}
- j = Job(TEST_USER, attrs=a)
- jid = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- self.scheduler.log_match(jid + ';Chunk: ' + chunk, n=10000)
- ev = self.server.status(JOB, 'exec_vnode', id=jid)
- used_nodes = j.get_vnodes(ev[0]['exec_vnode'])
- n = self.server.status(NODE, 'resources_available.letter')
- s = [x['resources_available.letter']
- for x in n if x['id'] in used_nodes]
- self.assertEqual(len(set(s)), 1,
- "Job ran in more than one placement set")
- @timeout(450)
- def test_psets_spanning(self):
- """
- Request more nodes than available in one placement set and see
- the job span or not depending on the value of do_not_span_psets
- """
- a = {'node_group_key': 'shape', 'node_group_enable': 'True'}
- self.server.manager(MGR_CMD_SET, SERVER, a)
- a = {'do_not_span_psets': 'True'}
- self.server.manager(MGR_CMD_SET, SCHED, a, id='default')
- # request one more node than the largest placement set
- chunk = '1431:ncpus=1'
- a = {'Resource_List.select': chunk,
- 'Resource_List.place': 'scatter:excl'}
- j = Job(TEST_USER, attrs=a)
- jid = self.server.submit(j)
- a = {'job_state': 'Q', 'comment':
- (MATCH_RE, 'can\'t fit in the largest placement set, '
- 'and can\'t span psets')}
- self.server.expect(JOB, a, attrop=PTL_AND, id=jid)
- self.scheduler.log_match(jid + ';Chunk: ' + chunk, n=10000)
- a = {'do_not_span_psets': 'False'}
- self.server.manager(MGR_CMD_SET, SCHED, a, id='default')
- self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- ev = self.server.status(JOB, 'exec_vnode', id=jid)
- used_nodes = j.get_vnodes(ev[0]['exec_vnode'])
- n = self.server.status(NODE, 'resources_available.shape')
- s = [x['resources_available.shape']
- for x in n if x['id'] in used_nodes]
- self.assertGreater(len(set(s)), 1,
- "Job did not span properly")
- @timeout(450)
- def test_psets_queue(self):
- """
- Test that placement sets work for nodes associated with queues
- """
- a = {'node_group_key': 'shape', 'node_group_enable': 'True'}
- self.server.manager(MGR_CMD_SET, SERVER, a)
- a = {'queue_type': 'Execution', 'started': 'True', 'enabled': 'True'}
- self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='workq2')
- # Take the first 14 vnodes. This means there are two nodes per shape
- nodes = ['vnode[0]', 'vnode[1]', 'vnode[2]', 'vnode[3]', 'vnode[4]',
- 'vnode[5]', 'vnode[6]', 'vnode[7]', 'vnode[8]', 'vnode[9]',
- 'vnode[10]', 'vnode[11]', 'vnode[12]', 'vnode[13]']
- self.server.manager(MGR_CMD_SET, NODE, {'queue': 'workq2'}, id=nodes)
- chunk = '2:ncpus=1'
- a = {'Resource_List.select': chunk, 'queue': 'workq2',
- 'Resource_List.place': 'scatter:excl'}
- for _ in range(7):
- j = Job(TEST_USER, a)
- jid = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- self.scheduler.log_match(jid + ';Chunk: ' + chunk, n=10000)
- # Check to see if jobs ran in one placement set
- jobs = self.server.status(JOB)
- for job in jobs:
- ev = self.server.status(JOB, 'exec_vnode', id=job['id'])
- used_nodes = j.get_vnodes(ev[0]['exec_vnode'])
- n = self.server.status(NODE, 'resources_available.shape')
- s = [x['resources_available.shape']
- for x in n if x['id'] in used_nodes]
- self.assertEqual(len(set(s)), 1,
- "Job " + job['id'] +
- "ran in more than one placement set")
- s = self.server.select()
- for jid in s:
- self.server.delete(jid, wait=True)
- # Check to see of jobs span correctly
- chunk = '7:ncpus=1'
- a = {'Resource_List.select': chunk, 'queue': 'workq2',
- 'Resource_List.place': 'scatter:excl'}
- j = Job(TEST_USER, a)
- jid = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- self.scheduler.log_match(jid + ';Chunk: ' + chunk, n=10000)
- ev = self.server.status(JOB, 'exec_vnode', id=jid)
- used_nodes = j.get_vnodes(ev[0]['exec_vnode'])
- n = self.server.status(NODE, 'resources_available.shape')
- s = [x['resources_available.shape']
- for x in n if x['id'] in used_nodes]
- self.assertGreater(len(set(s)), 1,
- "Job did not span properly")
- @timeout(450)
- def test_free(self):
- """
- Test that free placement works with the bucket code path
- """
- self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'})
- chunk = '1430:ncpus=1:color=yellow'
- a = {'Resource_List.select': chunk,
- 'Resource_List.place': 'excl'}
- j1 = Job(TEST_USER, attrs=a)
- jid1 = self.server.submit(j1)
- j2 = Job(TEST_USER, attrs=a)
- jid2 = self.server.submit(j2)
- self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
- self.scheduler.log_match(jid1 + ';Chunk: ' + chunk, n=10000)
- self.scheduler.log_match(jid2 + ';Chunk: ' + chunk, n=10000)
- s1 = self.server.status(JOB, 'exec_vnode', id=jid1)
- s2 = self.server.status(JOB, 'exec_vnode', id=jid2)
- n1 = j1.get_vnodes(s1[0]['exec_vnode'])
- n2 = j1.get_vnodes(s2[0]['exec_vnode'])
- msg = 'job did not run on correct number of nodes'
- self.assertEquals(len(n1), 715, msg)
- self.assertEquals(len(n2), 715, msg)
- for node in n1:
- self.assertTrue(node not in n2, 'Jobs share nodes: ' + node)
- @timeout(450)
- def test_queue_nodes(self):
- """
- Test that buckets work with nodes associated to a queue
- """
- v1 = 'vnode[1431]'
- v2 = 'vnode[1435]'
- a = {'queue_type': 'execution', 'started': 'True', 'enabled': 'True'}
- self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='q2')
- self.server.manager(MGR_CMD_SET, NODE, {'queue': 'q2'}, id=v1)
- self.server.manager(MGR_CMD_SET, NODE, {'queue': 'q2'}, id=v2)
- chunk1 = '1428:ncpus=1:color=orange'
- a = {'Resource_List.select': chunk1,
- 'Resource_List.place': 'scatter:excl'}
- j1 = Job(TEST_USER, attrs=a)
- jid1 = self.server.submit(j1)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- self.scheduler.log_match(jid1 + ';Chunk: ' + chunk1, n=10000)
- job = self.server.status(JOB, 'exec_vnode', id=jid1)[0]
- ev = j1.get_vnodes(job['exec_vnode'])
- msg = 'Job is using queue\'s nodes'
- self.assertNotIn(v1, ev)
- self.assertNotIn(v2, ev)
- chunk2 = '2:ncpus=1'
- a = {'Resource_List.select': chunk2,
- 'Resource_List.place': 'scatter:excl',
- 'queue': 'q2'}
- j2 = Job(TEST_USER, attrs=a)
- jid2 = self.server.submit(j2)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
- self.scheduler.log_match(jid2 + ';Chunk: ' + chunk2, n=10000)
- job = self.server.status(JOB, 'exec_vnode', id=jid2)[0]
- ev = j2.get_vnodes(job['exec_vnode'])
- msg = 'Job running on nodes not associated with queue'
- self.assertIn(v1, ev, msg)
- self.assertIn(v2, ev, msg)
- @timeout(450)
- def test_booleans(self):
- """
- Test that booleans are correctly handled if not in the sched_config
- resources line. This means that an unset boolean is considered false
- and that booleans that are True are considered even though they
- aren't on the resources line.
- """
- chunk1 = '2:ncpus=1'
- a = {'Resource_List.select': chunk1,
- 'Resource_List.place': 'scatter:excl'}
- j1 = Job(TEST_USER, attrs=a)
- jid1 = self.server.submit(j1)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- self.scheduler.log_match(jid1 + ';Chunk: ' + chunk1, n=10000)
- jst = self.server.status(JOB, 'exec_vnode', id=jid1)[0]
- ev = j1.get_vnodes(jst['exec_vnode'])
- for n in ev:
- self.server.expect(
- NODE, {'resources_available.bool': 'True'}, id=n)
- chunk2 = '2:ncpus=1:bool=False'
- a['Resource_List.select'] = chunk2
- j2 = Job(TEST_USER, attrs=a)
- jid2 = self.server.submit(j2)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
- self.scheduler.log_match(jid2 + ';Chunk: ' + chunk2, n=10000)
- jst = self.server.status(JOB, 'exec_vnode', id=jid2)[0]
- ev = j2.get_vnodes(jst['exec_vnode'])
- for n in ev:
- self.server.expect(
- NODE, 'resources_available.bool', op=UNSET, id=n)
- @timeout(450)
- def test_last_pset_can_never_run(self):
- """
- Test that the job does not retain the error value of last placement
- set seen by the node bucketing code. To check this make sure that the
- last placement set check results into a 'can never run' case because
- resources do not match and check that the job is not marked as
- never run.
- """
- self.server.manager(MGR_CMD_CREATE, RSC,
- {'type': 'long', 'flag': 'nh'}, id='foo')
- self.server.manager(MGR_CMD_CREATE, RSC,
- {'type': 'string', 'flag': 'h'}, id='bar')
- self.server.manager(MGR_CMD_SET, SERVER, {'node_group_key': 'bar'})
- self.server.manager(MGR_CMD_SET, SERVER, {'node_group_enable': 'true'})
- self.mom.delete_vnode_defs()
- a = {'resources_available.ncpus': 80,
- 'resources_available.bar': 'large'}
- self.server.create_vnodes(name='vnode', attrib=a, num=8,
- mom=self.mom, sharednode=False)
- self.scheduler.add_resource('foo')
- a['resources_available.foo'] = 8
- a['resources_available.ncpus'] = 8
- a['resources_available.bar'] = 'small'
- for val in range(0, 5):
- vname = "vnode[" + str(val) + "]"
- self.server.manager(MGR_CMD_SET, NODE, a, id=vname)
- chunk1 = '4:ncpus=5:foo=5'
- a = {'Resource_List.select': chunk1,
- 'Resource_List.place': 'scatter:excl'}
- j1 = Job(TEST_USER, attrs=a)
- jid1 = self.server.submit(j1)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- j2 = Job(TEST_USER, attrs=a)
- jid2 = self.server.submit(j2)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
- self.scheduler.log_match(jid2 + ';Job will never run',
- existence=False, max_attempts=10)
|