123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034 |
- # coding: utf-8
- # Copyright (C) 1994-2018 Altair Engineering, Inc.
- # For more information, contact Altair at www.altair.com.
- #
- # This file is part of the PBS Professional ("PBS Pro") software.
- #
- # Open Source License Information:
- #
- # PBS Pro is free software. You can redistribute it and/or modify it under the
- # terms of the GNU Affero General Public License as published by the Free
- # Software Foundation, either version 3 of the License, or (at your option) any
- # later version.
- #
- # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
- # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- # FOR A PARTICULAR PURPOSE.
- # See the GNU Affero General Public License for more details.
- #
- # You should have received a copy of the GNU Affero General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
- #
- # Commercial License Information:
- #
- # For a copy of the commercial license terms and conditions,
- # go to: (http://www.pbspro.com/UserArea/agreement.html)
- # or contact the Altair Legal Department.
- #
- # Altair’s dual-license business model allows companies, individuals, and
- # organizations to create proprietary derivative works of PBS Pro and
- # distribute them - whether embedded or bundled with other software -
- # under a commercial license agreement.
- #
- # Use of Altair’s trademarks, including but not limited to "PBS™",
- # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
- # trademark licensing policies.
- from tests.functional import *
- import resource
- class TestMultipleSchedulers(TestFunctional):
- """
- Test suite to test different scheduler interfaces
- """
- def setup_sc1(self):
- a = {'partition': 'P1,P4',
- 'sched_host': self.server.hostname,
- 'sched_port': '15050'}
- self.server.manager(MGR_CMD_CREATE, SCHED,
- a, id="sc1")
- self.scheds['sc1'].create_scheduler()
- self.scheds['sc1'].start()
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc1", expect=True)
- self.scheds['sc1'].set_sched_config({'log_filter': 2048})
- def setup_sc2(self):
- dir_path = os.path.join(os.sep, 'var', 'spool', 'pbs', 'sched_dir')
- if not os.path.exists(dir_path):
- self.du.mkdir(path=dir_path, sudo=True)
- a = {'partition': 'P2',
- 'sched_priv': os.path.join(dir_path, 'sched_priv_sc2'),
- 'sched_log': os.path.join(dir_path, 'sched_logs_sc2'),
- 'sched_host': self.server.hostname,
- 'sched_port': '15051'}
- self.server.manager(MGR_CMD_CREATE, SCHED,
- a, id="sc2")
- self.scheds['sc2'].create_scheduler(dir_path)
- self.scheds['sc2'].start(dir_path)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc2", expect=True)
- def setup_sc3(self):
- a = {'partition': 'P3',
- 'sched_host': self.server.hostname,
- 'sched_port': '15052'}
- self.server.manager(MGR_CMD_CREATE, SCHED,
- a, id="sc3")
- self.scheds['sc3'].create_scheduler()
- self.scheds['sc3'].start()
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc3", expect=True)
- def setup_queues_nodes(self):
- a = {'queue_type': 'execution',
- 'started': 'True',
- 'enabled': 'True'}
- self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq1')
- self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq2')
- self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq3')
- self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq4')
- p1 = {'partition': 'P1'}
- self.server.manager(MGR_CMD_SET, QUEUE, p1, id='wq1', expect=True)
- p2 = {'partition': 'P2'}
- self.server.manager(MGR_CMD_SET, QUEUE, p2, id='wq2', expect=True)
- p3 = {'partition': 'P3'}
- self.server.manager(MGR_CMD_SET, QUEUE, p3, id='wq3', expect=True)
- p4 = {'partition': 'P4'}
- self.server.manager(MGR_CMD_SET, QUEUE, p4, id='wq4', expect=True)
- a = {'resources_available.ncpus': 2}
- self.server.create_vnodes('vnode', a, 5, self.mom)
- self.server.manager(MGR_CMD_SET, NODE, p1, id='vnode[0]', expect=True)
- self.server.manager(MGR_CMD_SET, NODE, p2, id='vnode[1]', expect=True)
- self.server.manager(MGR_CMD_SET, NODE, p3, id='vnode[2]', expect=True)
- self.server.manager(MGR_CMD_SET, NODE, p4, id='vnode[3]', expect=True)
- def common_setup(self):
- self.setup_sc1()
- self.setup_sc2()
- self.setup_sc3()
- self.setup_queues_nodes()
- def check_vnodes(self, j, vnodes, jid):
- self.server.status(JOB, 'exec_vnode', id=jid)
- nodes = j.get_vnodes(j.exec_vnode)
- for vnode in vnodes:
- if vnode not in nodes:
- self.assertFalse(True, str(vnode) +
- " is not in exec_vnode list as expected")
- def test_set_sched_priv(self):
- """
- Test sched_priv can be only set to valid paths
- and check for appropriate comments
- """
- self.setup_sc1()
- if not os.path.exists('/var/sched_priv_do_not_exist'):
- self.server.manager(MGR_CMD_SET, SCHED,
- {'sched_priv': '/var/sched_priv_do_not_exist'},
- id="sc1")
- msg = 'PBS failed validation checks for sched_priv directory'
- a = {'sched_priv': '/var/sched_priv_do_not_exist',
- 'comment': msg,
- 'scheduling': 'False'}
- self.server.expect(SCHED, a, id='sc1', attrop=PTL_AND, max_attempts=10)
- pbs_home = self.server.pbs_conf['PBS_HOME']
- self.du.run_copy(self.server.hostname,
- os.path.join(pbs_home, 'sched_priv'),
- os.path.join(pbs_home, 'sc1_new_priv'),
- recursive=True)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'sched_priv': '/var/spool/pbs/sc1_new_priv'},
- id="sc1")
- a = {'sched_priv': '/var/spool/pbs/sc1_new_priv'}
- self.server.expect(SCHED, a, id='sc1', max_attempts=10)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc1")
- # Blocked by PP-1202 will revisit once its fixed
- # self.server.expect(SCHED, 'comment', id='sc1', op=UNSET)
- def test_set_sched_log(self):
- """
- Test sched_log can be only set to valid paths
- and check for appropriate comments
- """
- self.setup_sc1()
- if not os.path.exists('/var/sched_log_do_not_exist'):
- self.server.manager(MGR_CMD_SET, SCHED,
- {'sched_log': '/var/sched_log_do_not_exist'},
- id="sc1")
- a = {'sched_log': '/var/sched_log_do_not_exist',
- 'comment': 'Unable to change the sched_log directory',
- 'scheduling': 'False'}
- self.server.expect(SCHED, a, id='sc1', max_attempts=10)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc1")
- pbs_home = self.server.pbs_conf['PBS_HOME']
- self.du.mkdir(path=os.path.join(pbs_home, 'sc1_new_logs'),
- sudo=True)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'sched_log': '/var/spool/pbs/sc1_new_logs'},
- id="sc1")
- a = {'sched_log': '/var/spool/pbs/sc1_new_logs'}
- self.server.expect(SCHED, a, id='sc1', max_attempts=10)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc1")
- # Blocked by PP-1202 will revisit once its fixed
- # self.server.expect(SCHED, 'comment', id='sc1', op=UNSET)
- def test_start_scheduler(self):
- """
- Test that scheduler wont start without appropriate folders created.
- Scheduler will log a message if started without partition. Test
- scheduler states down, idle, scheduling.
- """
- self.setup_queues_nodes()
- pbs_home = self.server.pbs_conf['PBS_HOME']
- self.server.manager(MGR_CMD_CREATE, SCHED,
- id="sc5")
- a = {'sched_host': self.server.hostname,
- 'sched_port': '15055',
- 'scheduling': 'True'}
- self.server.manager(MGR_CMD_SET, SCHED, a, id="sc5")
- # Try starting without sched_priv and sched_logs
- ret = self.scheds['sc5'].start()
- self.server.expect(SCHED, {'state': 'down'}, id='sc5', max_attempts=10)
- msg = "sched_priv dir is not present for scheduler"
- self.assertTrue(ret['rc'], msg)
- self.du.run_copy(self.server.hostname,
- os.path.join(pbs_home, 'sched_priv'),
- os.path.join(pbs_home, 'sched_priv_sc5'),
- recursive=True, sudo=True)
- ret = self.scheds['sc5'].start()
- msg = "sched_logs dir is not present for scheduler"
- self.assertTrue(ret['rc'], msg)
- self.du.run_copy(self.server.hostname,
- os.path.join(pbs_home, 'sched_logs'),
- os.path.join(pbs_home, 'sched_logs_sc5'),
- recursive=True, sudo=True)
- ret = self.scheds['sc5'].start()
- self.scheds['sc5'].log_match(
- "Scheduler does not contain a partition",
- max_attempts=10, starttime=self.server.ctime)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'partition': 'P3'}, id="sc5")
- self.server.manager(MGR_CMD_SET, SCHED,
- {'Scheduling': 'True'}, id="sc5")
- self.server.expect(SCHED, {'state': 'idle'}, id='sc5', max_attempts=10)
- a = {'resources_available.ncpus': 100}
- self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[2]', expect=True)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'False'}, id="sc5")
- for _ in xrange(500):
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3'})
- self.server.submit(j)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc5")
- self.server.expect(SCHED, {'state': 'scheduling'},
- id='sc5', max_attempts=10)
- def test_resource_sched_reconfigure(self):
- """
- Test all schedulers will reconfigure while creating,
- setting or deleting a resource
- """
- self.common_setup()
- t = int(time.time())
- self.server.manager(MGR_CMD_CREATE, RSC, id='foo')
- for name in self.scheds:
- self.scheds[name].log_match(
- "Scheduler is reconfiguring",
- max_attempts=10, starttime=t)
- # sleeping to make sure we are not checking for the
- # same scheduler reconfiguring message again
- time.sleep(1)
- t = int(time.time())
- attr = {ATTR_RESC_TYPE: 'long'}
- self.server.manager(MGR_CMD_SET, RSC, attr, id='foo')
- for name in self.scheds:
- self.scheds[name].log_match(
- "Scheduler is reconfiguring",
- max_attempts=10, starttime=t)
- # sleeping to make sure we are not checking for the
- # same scheduler reconfiguring message again
- time.sleep(1)
- t = int(time.time())
- self.server.manager(MGR_CMD_DELETE, RSC, id='foo')
- for name in self.scheds:
- self.scheds[name].log_match(
- "Scheduler is reconfiguring",
- max_attempts=10, starttime=t)
- def test_remove_partition_sched(self):
- """
- Test that removing all the partitions from a scheduler
- unsets partition attribute on scheduler and update scheduler logs.
- """
- self.setup_sc1()
- # self.setup_sc2()
- self.server.manager(MGR_CMD_SET, SCHED,
- {'partition': (DECR, 'P1')}, id="sc1")
- self.server.manager(MGR_CMD_SET, SCHED,
- {'partition': (DECR, 'P4')}, id="sc1")
- self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
- id="sc1", expect=True)
- log_msg = "Scheduler does not contain a partition"
- self.scheds['sc1'].log_match(log_msg, max_attempts=10,
- starttime=self.server.ctime)
- # Blocked by PP-1202 will revisit once its fixed
- # self.server.manager(MGR_CMD_UNSET, SCHED, 'partition',
- # id="sc2", expect=True)
- def test_job_queue_partition(self):
- """
- Test job submitted to a queue associated to a partition will land
- into a node associated with that partition.
- """
- self.common_setup()
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
- 'Resource_List.select': '1:ncpus=2'})
- jid = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- self.check_vnodes(j, ['vnode[0]'], jid)
- self.scheds['sc1'].log_match(
- jid + ';Job run', max_attempts=10,
- starttime=self.server.ctime)
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq2',
- 'Resource_List.select': '1:ncpus=2'})
- jid = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- self.check_vnodes(j, ['vnode[1]'], jid)
- self.scheds['sc2'].log_match(
- jid + ';Job run', max_attempts=10,
- starttime=self.server.ctime)
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
- 'Resource_List.select': '1:ncpus=2'})
- jid = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- self.check_vnodes(j, ['vnode[2]'], jid)
- self.scheds['sc3'].log_match(
- jid + ';Job run', max_attempts=10,
- starttime=self.server.ctime)
- def test_multiple_partition_same_sched(self):
- """
- Test that scheduler will serve the jobs from different
- partition and run on nodes assigned to respective partitions.
- """
- self.setup_sc1()
- self.setup_queues_nodes()
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
- 'Resource_List.select': '1:ncpus=1'})
- jid1 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- self.check_vnodes(j, ['vnode[0]'], jid1)
- self.scheds['sc1'].log_match(
- jid1 + ';Job run', max_attempts=10,
- starttime=self.server.ctime)
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq4',
- 'Resource_List.select': '1:ncpus=1'})
- jid2 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
- self.check_vnodes(j, ['vnode[3]'], jid2)
- self.scheds['sc1'].log_match(
- jid2 + ';Job run', max_attempts=10,
- starttime=self.server.ctime)
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
- 'Resource_List.select': '1:ncpus=1'})
- jid3 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
- self.check_vnodes(j, ['vnode[0]'], jid3)
- self.scheds['sc1'].log_match(
- jid3 + ';Job run', max_attempts=10,
- starttime=self.server.ctime)
- def test_multiple_queue_same_partition(self):
- """
- Test multiple queue associated with same partition
- is serviced by same scheduler
- """
- self.setup_sc1()
- self.setup_queues_nodes()
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
- 'Resource_List.select': '1:ncpus=1'})
- jid = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- self.check_vnodes(j, ['vnode[0]'], jid)
- self.scheds['sc1'].log_match(
- jid + ';Job run', max_attempts=10,
- starttime=self.server.ctime)
- p1 = {'partition': 'P1'}
- self.server.manager(MGR_CMD_SET, QUEUE, p1, id='wq4')
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq4',
- 'Resource_List.select': '1:ncpus=1'})
- jid = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- self.check_vnodes(j, ['vnode[0]'], jid)
- self.scheds['sc1'].log_match(
- jid + ';Job run', max_attempts=10,
- starttime=self.server.ctime)
- def test_preemption_highp_queue(self):
- """
- Test preemption occures only within queues which are assigned
- to same partition and check for equivalence classes
- """
- self.common_setup()
- prio = {'Priority': 150, 'partition': 'P1'}
- self.server.manager(MGR_CMD_SET, QUEUE, prio, id='wq4')
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
- 'Resource_List.select': '1:ncpus=2'})
- jid1 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- t = int(time.time())
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq4',
- 'Resource_List.select': '1:ncpus=2'})
- jid2 = self.server.submit(j)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc1")
- self.scheds['sc1'].log_match("Number of job equivalence classes: 1",
- max_attempts=10, starttime=t)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq4',
- 'Resource_List.select': '1:ncpus=2'})
- jid3 = self.server.submit(j)
- self.server.expect(JOB, ATTR_comment, op=SET, id=jid3)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid3)
- self.server.expect(JOB, {'job_state': 'S'}, id=jid1)
- self.scheds['sc1'].log_match(
- jid1 + ';Job preempted by suspension',
- max_attempts=10, starttime=t)
- # Two equivalence class one for suspended and one for remaining jobs
- self.scheds['sc1'].log_match("Number of job equivalence classes: 2",
- max_attempts=10, starttime=t)
- def test_backfill_per_scheduler(self):
- """
- Test backfilling is applicable only per scheduler
- """
- self.common_setup()
- t = int(time.time())
- self.scheds['sc2'].set_sched_config(
- {'strict_ordering': 'True ALL'})
- a = {ATTR_queue: 'wq2',
- 'Resource_List.select': '1:ncpus=2',
- 'Resource_List.walltime': 60}
- j = Job(TEST_USER1, attrs=a)
- jid1 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- j = Job(TEST_USER1, attrs=a)
- jid2 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
- self.scheds['sc2'].log_match(
- jid2 + ';Job is a top job and will run at',
- starttime=t)
- a['queue'] = 'wq3'
- j = Job(TEST_USER1, attrs=a)
- jid3 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
- j = Job(TEST_USER1, attrs=a)
- jid4 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid4)
- self.scheds['sc3'].log_match(
- jid4 + ';Job is a top job and will run at',
- max_attempts=5, starttime=t, existence=False)
- def test_resource_per_scheduler(self):
- """
- Test resources will be considered only by scheduler
- to which resource is added in sched_config
- """
- self.common_setup()
- a = {'type': 'float', 'flag': 'nh'}
- self.server.manager(MGR_CMD_CREATE, RSC, a, id='gpus')
- self.scheds['sc3'].add_resource("gpus")
- a = {'resources_available.gpus': 2}
- self.server.manager(MGR_CMD_SET, NODE, a, id='@default', expect=True)
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
- 'Resource_List.select': '1:gpus=2',
- 'Resource_List.walltime': 60})
- jid1 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
- 'Resource_List.select': '1:gpus=2',
- 'Resource_List.walltime': 60})
- jid2 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
- job_comment = "Not Running: Insufficient amount of resource: "
- job_comment += "gpus (R: 2 A: 0 T: 2)"
- self.server.expect(JOB, {'comment': job_comment}, id=jid2)
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq2',
- 'Resource_List.select': '1:gpus=2'})
- jid3 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq2',
- 'Resource_List.select': '1:gpus=2'})
- jid4 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid4)
- def test_restart_server(self):
- """
- Test after server restarts sched attributes are persistent
- """
- self.setup_sc1()
- sched_priv = os.path.join(
- self.server.pbs_conf['PBS_HOME'], 'sched_priv_sc1')
- sched_logs = os.path.join(
- self.server.pbs_conf['PBS_HOME'], 'sched_logs_sc1')
- a = {'sched_port': 15050,
- 'sched_host': self.server.hostname,
- 'sched_priv': sched_priv,
- 'sched_log': sched_logs,
- 'scheduling': 'True',
- 'scheduler_iteration': 600,
- 'state': 'idle',
- 'sched_cycle_length': '00:20:00'}
- self.server.expect(SCHED, a, id='sc1',
- attrop=PTL_AND, max_attempts=10)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduler_iteration': 300,
- 'sched_cycle_length': '00:10:00'},
- id='sc1')
- self.server.restart()
- a['scheduler_iteration'] = 300
- a['sched_cycle_length'] = '00:10:00'
- self.server.expect(SCHED, a, id='sc1',
- attrop=PTL_AND, max_attempts=10)
- def test_resv_default_sched(self):
- """
- Test reservations will only go to defualt scheduler
- """
- self.setup_queues_nodes()
- t = int(time.time())
- r = Reservation(TEST_USER)
- a = {'Resource_List.select': '2:ncpus=1'}
- r.set_attributes(a)
- rid = self.server.submit(r)
- a = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
- self.server.expect(RESV, a, rid)
- self.scheds['default'].log_match(
- rid + ';Reservation Confirmed',
- max_attempts=10, starttime=t)
- def test_job_sorted_per_scheduler(self):
- """
- Test jobs are sorted as per job_sort_formula
- inside each scheduler
- """
- self.common_setup()
- self.server.manager(MGR_CMD_SET, SERVER,
- {'job_sort_formula': 'ncpus'})
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'False'}, id="default")
- j = Job(TEST_USER1, attrs={'Resource_List.select': '1:ncpus=1'})
- jid1 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid1)
- j = Job(TEST_USER1, attrs={'Resource_List.select': '1:ncpus=2'})
- jid2 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="default")
- self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'False'}, id="sc3")
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
- 'Resource_List.select': '1:ncpus=1'})
- jid3 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid3)
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
- 'Resource_List.select': '1:ncpus=2'})
- jid4 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid4)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc3")
- self.server.expect(JOB, {'job_state': 'R'}, id=jid4)
- def test_qrun_job(self):
- """
- Test jobs can be run by qrun by a newly created scheduler.
- """
- self.setup_sc1()
- self.setup_queues_nodes()
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'False'}, id="sc1")
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
- 'Resource_List.select': '1:ncpus=2'})
- jid1 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid1)
- self.server.runjob(jid1)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- def test_run_limts_per_scheduler(self):
- """
- Test run_limits applied at server level is
- applied for every scheduler seperately.
- """
- self.common_setup()
- self.server.manager(MGR_CMD_SET, SERVER,
- {'max_run': '[u:PBS_GENERIC=1]'})
- j = Job(TEST_USER1, attrs={'Resource_List.select': '1:ncpus=1'})
- jid1 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- j = Job(TEST_USER1, attrs={'Resource_List.select': '1:ncpus=1'})
- jid2 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
- j = Job(TEST_USER1, attrs={'Resource_List.select': '1:ncpus=1'})
- jc = "Not Running: User has reached server running job limit."
- self.server.expect(JOB, {'comment': jc}, id=jid2)
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
- 'Resource_List.select': '1:ncpus=1'})
- jid3 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
- 'Resource_List.select': '1:ncpus=1'})
- jid4 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid4)
- jc = "Not Running: User has reached server running job limit."
- self.server.expect(JOB, {'comment': jc}, id=jid4)
- def test_multi_fairshare(self):
- """
- Test different schedulers have their own fairshare trees with
- their own usage
- """
- self.common_setup()
- default_shares = 10
- default_usage = 100
- sc1_shares = 20
- sc1_usage = 200
- sc2_shares = 30
- sc2_usage = 300
- sc3_shares = 40
- sc3_usage = 400
- self.scheds['default'].add_to_resource_group(TEST_USER, 10, 'root',
- default_shares)
- self.scheds['default'].set_fairshare_usage(TEST_USER, default_usage)
- self.scheds['sc1'].add_to_resource_group(TEST_USER, 10, 'root',
- sc1_shares)
- self.scheds['sc1'].set_fairshare_usage(TEST_USER, sc1_usage)
- self.scheds['sc2'].add_to_resource_group(TEST_USER, 10, 'root',
- sc2_shares)
- self.scheds['sc2'].set_fairshare_usage(TEST_USER, sc2_usage)
- self.scheds['sc3'].add_to_resource_group(TEST_USER, 10, 'root',
- sc3_shares)
- self.scheds['sc3'].set_fairshare_usage(TEST_USER, sc3_usage)
- # requery fairshare info from pbsfs
- default_fs = self.scheds['default'].query_fairshare()
- sc1_fs = self.scheds['sc1'].query_fairshare()
- sc2_fs = self.scheds['sc2'].query_fairshare()
- sc3_fs = self.scheds['sc3'].query_fairshare()
- n = default_fs.get_node(id=10)
- self.assertEquals(n.nshares, default_shares)
- self.assertEquals(n.usage, default_usage)
- n = sc1_fs.get_node(id=10)
- self.assertEquals(n.nshares, sc1_shares)
- self.assertEquals(n.usage, sc1_usage)
- n = sc2_fs.get_node(id=10)
- self.assertEquals(n.nshares, sc2_shares)
- self.assertEquals(n.usage, sc2_usage)
- n = sc3_fs.get_node(id=10)
- self.assertEquals(n.nshares, sc3_shares)
- self.assertEquals(n.usage, sc3_usage)
- def test_fairshare_usage(self):
- """
- Test the schedulers fairshare usage file and
- check the usage file is updating correctly or not
- """
- self.setup_sc1()
- a = {'queue_type': 'execution',
- 'started': 'True',
- 'enabled': 'True',
- 'partition': 'P1'}
- self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq1')
- # Set resources to node
- resc = {'resources_available.ncpus': 1,
- 'partition': 'P1'}
- self.server.manager(MGR_CMD_SET, NODE, resc, self.mom.shortname)
- # Add entry to the resource group of multisched 'sc1'
- self.scheds['sc1'].add_to_resource_group('grp1', 100, 'root', 60)
- self.scheds['sc1'].add_to_resource_group('grp2', 200, 'root', 40)
- self.scheds['sc1'].add_to_resource_group(TEST_USER1,
- 101, 'grp1', 40)
- self.scheds['sc1'].add_to_resource_group(TEST_USER2,
- 102, 'grp1', 20)
- self.scheds['sc1'].add_to_resource_group(TEST_USER3,
- 201, 'grp2', 30)
- self.scheds['sc1'].add_to_resource_group(TEST_USER4,
- 202, 'grp2', 10)
- # Set scheduler iteration
- sc_attr = {'scheduler_iteration': 7,
- 'scheduling': 'False'}
- self.server.manager(MGR_CMD_SET, SCHED, sc_attr, id='sc1')
- # Update scheduler config file
- sc_config = {'fair_share': 'True',
- 'fairshare_usage_res': 'ncpus*100'}
- self.scheds['sc1'].set_sched_config(sc_config)
- # submit jobs to multisched 'sc1'
- sc1_attr = {ATTR_queue: 'wq1',
- 'Resource_List.select': '1:ncpus=1',
- 'Resource_List.walltime': 10}
- sc1_J1 = Job(TEST_USER1, attrs=sc1_attr)
- sc1_jid1 = self.server.submit(sc1_J1)
- sc1_J2 = Job(TEST_USER2, attrs=sc1_attr)
- sc1_jid2 = self.server.submit(sc1_J2)
- sc1_J3 = Job(TEST_USER3, attrs=sc1_attr)
- sc1_jid3 = self.server.submit(sc1_J3)
- self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
- id='sc1')
- # pbsuser1 job will run and other two will be queued
- self.server.expect(JOB, {'job_state': 'R'}, id=sc1_jid1)
- self.server.expect(JOB, {'job_state': 'Q'}, id=sc1_jid3)
- self.server.expect(JOB, {'job_state': 'Q'}, id=sc1_jid2)
- self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
- id='sc1')
- # need to delete the running job because PBS has only 1 ncpu and
- # our work is also done with the job.
- # this step will decrease the execution time as well
- self.server.delete(sc1_jid1, wait=True)
- # pbsuser3 job will run after pbsuser1
- self.server.expect(JOB, {'job_state': 'R'}, id=sc1_jid3)
- self.server.expect(JOB, {'job_state': 'Q'}, id=sc1_jid2)
- self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
- id='sc1')
- # deleting the currently running job
- self.server.delete(sc1_jid3, wait=True)
- # pbsuser2 job will run in the end
- self.server.expect(JOB, {'job_state': 'R'}, id=sc1_jid2)
- self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
- id='sc1')
- # deleting the currently running job
- self.server.delete(sc1_jid2, wait=True)
- # query fairshare and check usage
- sc1_fs_user1 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER1))
- self.assertEquals(sc1_fs_user1.usage, 101)
- sc1_fs_user2 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER2))
- self.assertEquals(sc1_fs_user2.usage, 101)
- sc1_fs_user3 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER3))
- self.assertEquals(sc1_fs_user3.usage, 101)
- sc1_fs_user4 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER4))
- self.assertEquals(sc1_fs_user4.usage, 1)
- # Restart the scheduler
- self.scheds['sc1'].restart()
- # Check the multisched 'sc1' usage file whether it's updating or not
- self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'False'},
- id='sc1')
- sc1_J1 = Job(TEST_USER1, attrs=sc1_attr)
- sc1_jid1 = self.server.submit(sc1_J1)
- sc1_J2 = Job(TEST_USER2, attrs=sc1_attr)
- sc1_jid2 = self.server.submit(sc1_J2)
- sc1_J4 = Job(TEST_USER4, attrs=sc1_attr)
- sc1_jid4 = self.server.submit(sc1_J4)
- self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
- id='sc1')
- # pbsuser4 job will run and other two will be queued
- self.server.expect(JOB, {'job_state': 'R'}, id=sc1_jid4)
- self.server.expect(JOB, {'job_state': 'Q'}, id=sc1_jid1)
- self.server.expect(JOB, {'job_state': 'Q'}, id=sc1_jid2)
- self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
- id='sc1')
- # deleting the currently running job
- self.server.delete(sc1_jid4, wait=True)
- # pbsuser1 job will run after pbsuser4
- self.server.expect(JOB, {'job_state': 'R'}, id=sc1_jid1)
- self.server.expect(JOB, {'job_state': 'Q'}, id=sc1_jid2)
- self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
- id='sc1')
- # deleting the currently running job
- self.server.delete(sc1_jid1, wait=True)
- # pbsuser2 job will run in the end
- self.server.expect(JOB, {'job_state': 'R'}, id=sc1_jid2)
- self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
- id='sc1')
- # deleting the currently running job
- self.server.delete(sc1_jid2, wait=True)
- # query fairshare and check usage
- sc1_fs_user1 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER1))
- self.assertEquals(sc1_fs_user1.usage, 201)
- sc1_fs_user2 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER2))
- self.assertEquals(sc1_fs_user2.usage, 201)
- sc1_fs_user3 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER3))
- self.assertEquals(sc1_fs_user3.usage, 101)
- sc1_fs_user4 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER4))
- self.assertEquals(sc1_fs_user4.usage, 101)
- def test_sched_priv_change(self):
- """
- Test that when the sched_priv directory changes, all of the
- PTL internal scheduler objects (e.g. fairshare tree) are reread
- """
- new_sched_priv = os.path.join(self.server.pbs_conf['PBS_HOME'],
- 'sched_priv2')
- if os.path.exists(new_sched_priv):
- self.du.rm(path=new_sched_priv, recursive=True,
- sudo=True, force=True)
- dflt_sched_priv = os.path.join(self.server.pbs_conf['PBS_HOME'],
- 'sched_priv')
- self.du.run_copy(src=dflt_sched_priv, dest=new_sched_priv,
- recursive=True, sudo=True)
- self.setup_sc3()
- s = self.server.status(SCHED, id='sc3')
- old_sched_priv = s[0]['sched_priv']
- self.scheds['sc3'].add_to_resource_group(TEST_USER, 10, 'root', 20)
- self.scheds['sc3'].holidays_set_year(new_year="3000")
- self.scheds['sc3'].set_sched_config({'fair_share': 'True ALL'})
- self.server.manager(MGR_CMD_SET, SCHED,
- {'sched_priv': new_sched_priv}, id='sc3')
- n = self.scheds['sc3'].fairshare_tree.get_node(id=10)
- self.assertFalse(n)
- y = self.scheds['sc3'].holidays_get_year()
- self.assertNotEquals(y, "3000")
- self.assertTrue(self.scheds['sc3'].
- sched_config['fair_share'].startswith('false'))
- # clean up: revert_to_defaults() will remove the new sched_priv. We
- # need to remove the old one
- self.du.rm(path=old_sched_priv, sudo=True, recursive=True, force=True)
- def test_fairshare_decay(self):
- """
- Test pbsfs's fairshare decay for multisched
- """
- self.setup_sc3()
- self.scheds['sc3'].add_to_resource_group(TEST_USER, 10, 'root', 20)
- self.scheds['sc3'].set_fairshare_usage(name=TEST_USER, usage=10)
- self.scheds['sc3'].decay_fairshare_tree()
- n = self.scheds['sc3'].fairshare_tree.get_node(id=10)
- self.assertTrue(n.usage, 5)
- def test_cmp_fairshare(self):
- """
- Test pbsfs's compare fairshare functionality for multisched
- """
- self.setup_sc3()
- self.scheds['sc3'].add_to_resource_group(TEST_USER, 10, 'root', 20)
- self.scheds['sc3'].set_fairshare_usage(name=TEST_USER, usage=10)
- self.scheds['sc3'].add_to_resource_group(TEST_USER2, 20, 'root', 20)
- self.scheds['sc3'].set_fairshare_usage(name=TEST_USER2, usage=100)
- user = self.scheds['sc3'].cmp_fairshare_entities(TEST_USER, TEST_USER2)
- self.assertEquals(user, str(TEST_USER))
- def test_pbsfs_invalid_sched(self):
- """
- Test pbsfs -I <sched_name> where sched_name does not exist
- """
- sched_name = 'foo'
- pbsfs_cmd = os.path.join(self.server.pbs_conf['PBS_EXEC'],
- 'sbin', 'pbsfs') + ' -I ' + sched_name
- ret = self.du.run_cmd(cmd=pbsfs_cmd, sudo=True)
- err_msg = 'Scheduler %s does not exist' % sched_name
- self.assertEquals(err_msg, ret['err'][0])
- def test_pbsfs_no_fairshare_data(self):
- """
- Test pbsfs -I <sched_name> where sched_priv_<sched_name> dir
- does not exist
- """
- a = {'partition': 'P5',
- 'sched_host': self.server.hostname,
- 'sched_port': '15050'}
- self.server.manager(MGR_CMD_CREATE, SCHED, a, id="sc5")
- err_msg = 'Unable to access fairshare data: No such file or directory'
- try:
- # Only a scheduler object is created. Corresponding sched_priv
- # dir not created yet. Try to query fairshare data.
- self.scheds['sc5'].query_fairshare()
- except PbsFairshareError as e:
- self.assertTrue(err_msg in e.msg)
- def test_pbsfs_server_restart(self):
- """
- Verify that server restart has no impact on fairshare data
- """
- self.setup_sc1()
- self.scheds['sc1'].add_to_resource_group(TEST_USER, 20, 'root', 50)
- self.scheds['sc1'].set_fairshare_usage(name=TEST_USER, usage=25)
- n = self.scheds['sc1'].query_fairshare().get_node(name=str(TEST_USER))
- self.assertTrue(n.usage, 25)
- self.server.restart()
- n = self.scheds['sc1'].query_fairshare().get_node(name=str(TEST_USER))
- self.assertTrue(n.usage, 25)
- def test_pbsfs_revert_to_defaults(self):
- """
- Test if revert_to_defaults() works properly with multi scheds.
- revert_to_defaults() removes entities from resource_group file and
- removes their usage(with pbsfs -e)
- """
- self.setup_sc1()
- a = {'queue_type': 'execution',
- 'started': 'True',
- 'enabled': 'True',
- 'partition': 'P1'}
- self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq1', expect=True)
- a = {'partition': 'P1', 'resources_available.ncpus': 2}
- self.server.manager(MGR_CMD_SET, NODE, a,
- id=self.mom.shortname, expect=True)
- self.scheds['sc1'].add_to_resource_group(TEST_USER,
- 11, 'root', 10)
- self.scheds['sc1'].add_to_resource_group(TEST_USER1,
- 12, 'root', 10)
- self.scheds['sc1'].set_sched_config({'fair_share': 'True'})
- self.scheds['sc1'].set_fairshare_usage(TEST_USER, 100)
- self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'False'},
- id='sc1')
- j1 = Job(TEST_USER, attrs={ATTR_queue: 'wq1'})
- jid1 = self.server.submit(j1)
- j2 = Job(TEST_USER1, attrs={ATTR_queue: 'wq1'})
- jid2 = self.server.submit(j2)
- t_start = int(time.time())
- self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
- id='sc1')
- self.scheds['sc1'].log_match(
- 'Leaving Scheduling Cycle', starttime=t_start)
- t_end = int(time.time())
- job_list = self.scheds['sc1'].log_match(
- 'Considering job to run', starttime=t_start,
- allmatch=True, endtime=t_end)
- # job 1 runs second as it's run by an entity with usage = 100
- self.assertTrue(jid1 in job_list[0][1])
- self.server.deljob(id=jid1, wait=True)
- self.server.deljob(id=jid2, wait=True)
- # revert_to_defaults() does a pbsfs -I <sched_name> -e and cleans up
- # the resource_group file
- self.scheds['sc1'].revert_to_defaults()
- # Fairshare tree is trimmed now. TEST_USER1 is the only entity with
- # usage set. So its job, job2 will run second. If trimming was not
- # successful TEST_USER would still have usage=100 and job1 would run
- # second
- self.scheds['sc1'].add_to_resource_group(TEST_USER,
- 15, 'root', 10)
- self.scheds['sc1'].add_to_resource_group(TEST_USER1,
- 16, 'root', 10)
- self.scheds['sc1'].set_sched_config({'fair_share': 'True'})
- self.scheds['sc1'].set_fairshare_usage(TEST_USER1, 50)
- self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'False'},
- id='sc1')
- j1 = Job(TEST_USER, attrs={ATTR_queue: 'wq1'})
- jid1 = self.server.submit(j1)
- j2 = Job(TEST_USER1, attrs={ATTR_queue: 'wq1'})
- jid2 = self.server.submit(j2)
- t_start = int(time.time())
- self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
- id='sc1')
- self.scheds['sc1'].log_match(
- 'Leaving Scheduling Cycle', starttime=t_start)
- t_end = int(time.time())
- job_list = self.scheds['sc1'].log_match(
- 'Considering job to run', starttime=t_start,
- allmatch=True, endtime=t_end)
- self.assertTrue(jid2 in job_list[0][1])
- def submit_jobs(self, num_jobs=1, attrs=None, user=TEST_USER):
- """
- Submit num_jobs number of jobs with attrs attributes for user.
- Return a list of job ids
- """
- if attrs is None:
- attrs = {'Resource_List.select': '1:ncpus=2'}
- ret_jids = []
- for _ in range(num_jobs):
- J = Job(user, attrs)
- jid = self.server.submit(J)
- ret_jids += [jid]
- return ret_jids
- def test_equiv_partition(self):
- """
- Test the basic behavior of job equivalence classes: submit two
- different types of jobs into 2 partitions and see they are
- in four different equivalence classes
- """
- self.setup_sc1()
- self.setup_queues_nodes()
- t = int(time.time())
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'False'}, id="sc1")
- # Eat up all the resources with the first job to each queue
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
- self.submit_jobs(4, a)
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
- self.submit_jobs(4, a)
- a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'wq1'}
- self.submit_jobs(3, a)
- a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'wq4'}
- self.submit_jobs(3, a)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc1")
- self.scheds['sc1'].log_match("Number of job equivalence classes: 4",
- max_attempts=10, starttime=t)
- def test_equiv_multisched(self):
- """
- Test the basic behavior of job equivalence classes: submit two
- different types of jobs into 2 different schedulers and see they
- are in two different classes in each scheduler
- """
- self.setup_sc1()
- self.setup_sc2()
- self.setup_queues_nodes()
- self.scheds['sc2'].set_sched_config({'log_filter': 2048})
- t = int(time.time())
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'False'}, id="sc1")
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'False'}, id="sc2")
- # Eat up all the resources with the first job to each queue
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
- self.submit_jobs(4, a)
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq2'}
- self.submit_jobs(4, a)
- a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'wq1'}
- self.submit_jobs(3, a)
- a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'wq2'}
- self.submit_jobs(3, a)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc1")
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc2")
- self.scheds['sc1'].log_match("Number of job equivalence classes: 2",
- max_attempts=10, starttime=t)
- self.scheds['sc2'].log_match("Number of job equivalence classes: 2",
- max_attempts=10, starttime=t)
- def test_select_partition(self):
- """
- Test to see if jobs with select resources not in the resources line
- fall into the same equivalence class and jobs in different partition
- fall into different equivalence classes
- """
- self.server.manager(MGR_CMD_CREATE, RSC,
- {'type': 'long', 'flag': 'nh'}, id='foo')
- self.setup_sc1()
- self.setup_queues_nodes()
- t = int(time.time())
- # Eat up all the resources
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
- J = Job(TEST_USER, attrs=a)
- self.server.submit(J)
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
- J = Job(TEST_USER, attrs=a)
- self.server.submit(J)
- a = {'Resource_List.select': '1:ncpus=1:foo=4', ATTR_queue: 'wq1'}
- self.submit_jobs(3, a)
- a = {'Resource_List.select': '1:ncpus=1:foo=4', ATTR_queue: 'wq4'}
- self.submit_jobs(3, a)
- a = {'Resource_List.select': '1:ncpus=1:foo=8', ATTR_queue: 'wq1'}
- self.submit_jobs(3, a)
- a = {'Resource_List.select': '1:ncpus=1:foo=8', ATTR_queue: 'wq4'}
- self.submit_jobs(3, a)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc1")
- # Four equivalence classes: two for the resource eating job in each
- # partition and two for the other jobs in each partition. While jobs
- # have different amount of the foo resources which isn't in the
- # resources line
- self.scheds['sc1'].log_match("Number of job equivalence classes: 4",
- max_attempts=10, starttime=t)
- def test_select_res_partition(self):
- """
- Test to see if jobs with select resources in the resources line and
- in different partitions fall into the different equivalence class
- """
- self.server.manager(MGR_CMD_CREATE, RSC,
- {'type': 'long', 'flag': 'nh'}, id='foo')
- self.setup_sc1()
- self.setup_queues_nodes()
- self.scheds['sc1'].add_resource("foo")
- t = int(time.time())
- # Eat up all the resources
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
- J = Job(TEST_USER, attrs=a)
- self.server.submit(J)
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
- J = Job(TEST_USER, attrs=a)
- self.server.submit(J)
- a = {'Resource_List.select': '1:ncpus=1:foo=4', ATTR_queue: 'wq1'}
- self.submit_jobs(3, a)
- a = {'Resource_List.select': '1:ncpus=1:foo=4', ATTR_queue: 'wq4'}
- self.submit_jobs(3, a)
- a = {'Resource_List.select': '1:ncpus=1:foo=8', ATTR_queue: 'wq1'}
- self.submit_jobs(3, a)
- a = {'Resource_List.select': '1:ncpus=1:foo=8', ATTR_queue: 'wq4'}
- self.submit_jobs(3, a)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc1")
- # Six equivalence classes: two for the resource eating jobs in each
- # partition and 4 for the other jobs requesting different amounts of
- # the foo resource in each partition.
- self.scheds['sc1'].log_match("Number of job equivalence classes: 6",
- max_attempts=10, starttime=t)
- def test_multiple_res_partition(self):
- """
- Test to see if jobs with select resources in the resources line
- with multiple custom resources fall into the different equiv class
- and jobs in different partitions fall into different equiv classes
- """
- self.server.manager(MGR_CMD_CREATE, RSC,
- {'type': 'long', 'flag': 'nh'}, id='foo')
- self.server.manager(MGR_CMD_CREATE, RSC,
- {'type': 'string', 'flag': 'h'}, id='colour')
- self.setup_sc1()
- self.setup_queues_nodes()
- self.scheds['sc1'].add_resource("foo")
- self.scheds['sc1'].add_resource("colour")
- t = int(time.time())
- # Eat up all the resources
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
- J = Job(TEST_USER, attrs=a)
- self.server.submit(J)
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
- J = Job(TEST_USER, attrs=a)
- self.server.submit(J)
- a = {'Resource_List.select': '1:ncpus=1:foo=4', ATTR_queue: 'wq1'}
- self.submit_jobs(3, a)
- a = {'Resource_List.select': '1:ncpus=1:foo=4', ATTR_queue: 'wq4'}
- self.submit_jobs(3, a)
- a = {'Resource_List.select': '1:ncpus=1:colour=blue',
- ATTR_queue: 'wq1'}
- self.submit_jobs(3, a)
- a = {'Resource_List.select': '1:ncpus=1:colour=blue',
- ATTR_queue: 'wq4'}
- self.submit_jobs(3, a)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc1")
- # Six equivalence classes: two for the resource eating job in each
- # partition and four for the other jobs. While jobs have different
- # resource requests two for each resource in different partitions
- self.scheds['sc1'].log_match("Number of job equivalence classes: 6",
- max_attempts=10, starttime=t)
- def test_place_partition(self):
- """
- Test to see if jobs with different place statements and different
- partitions fall into the different equivalence classes
- """
- self.setup_sc1()
- self.setup_queues_nodes()
- t = int(time.time())
- # Eat up all the resources
- a = {'Resource_List.select': '1:ncpus=2',
- ATTR_queue: 'wq1'}
- J = Job(TEST_USER, attrs=a)
- self.server.submit(J)
- a = {'Resource_List.select': '1:ncpus=2',
- ATTR_queue: 'wq4'}
- J = Job(TEST_USER, attrs=a)
- self.server.submit(J)
- a = {'Resource_List.select': '1:ncpus=1',
- 'Resource_List.place': 'free',
- ATTR_queue: 'wq1'}
- self.submit_jobs(3, a)
- a = {'Resource_List.select': '1:ncpus=1',
- 'Resource_List.place': 'free',
- ATTR_queue: 'wq4'}
- self.submit_jobs(3, a)
- a = {'Resource_List.select': '1:ncpus=1',
- 'Resource_List.place': 'excl',
- ATTR_queue: 'wq1'}
- self.submit_jobs(3, a)
- a = {'Resource_List.select': '1:ncpus=1',
- 'Resource_List.place': 'excl',
- ATTR_queue: 'wq4'}
- self.submit_jobs(3, a)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc1")
- # Six equivalence classes: two for the resource eating job in
- # each partition and one for each place statement in each partition
- self.scheds['sc1'].log_match("Number of job equivalence classes: 6",
- max_attempts=10, starttime=t)
- def test_nolimits_partition(self):
- """
- Test to see that jobs from different users, groups, and projects
- all fall into the same equivalence class when there are no limits
- but fall into different equivalence classes for each partition
- """
- self.setup_sc1()
- self.setup_queues_nodes()
- t = int(time.time())
- # Eat up all the resources
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
- J = Job(TEST_USER, attrs=a)
- self.server.submit(J)
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
- J = Job(TEST_USER, attrs=a)
- self.server.submit(J)
- a = {ATTR_queue: 'wq1'}
- self.submit_jobs(3, a, user=TEST_USER)
- self.submit_jobs(3, a, user=TEST_USER2)
- a = {ATTR_queue: 'wq4'}
- self.submit_jobs(3, a, user=TEST_USER)
- self.submit_jobs(3, a, user=TEST_USER2)
- b = {'group_list': TSTGRP1, ATTR_queue: 'wq1'}
- self.submit_jobs(3, b, TEST_USER1)
- b = {'group_list': TSTGRP2, ATTR_queue: 'wq1'}
- self.submit_jobs(3, b, TEST_USER1)
- b = {'group_list': TSTGRP1, ATTR_queue: 'wq4'}
- self.submit_jobs(3, b, TEST_USER1)
- b = {'group_list': TSTGRP2, ATTR_queue: 'wq4'}
- self.submit_jobs(3, b, TEST_USER1)
- b = {'project': 'p1', ATTR_queue: 'wq1'}
- self.submit_jobs(3, b)
- b = {'project': 'p2', ATTR_queue: 'wq1'}
- self.submit_jobs(3, b)
- b = {'project': 'p1', ATTR_queue: 'wq4'}
- self.submit_jobs(3, b)
- b = {'project': 'p2', ATTR_queue: 'wq4'}
- self.submit_jobs(3, b)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc1")
- # Four equivalence classes: two for the resource eating job in each
- # partition and two for the rest. Since there are no limits, user,
- # group, nor project are taken into account
- self.scheds['sc1'].log_match("Number of job equivalence classes: 4",
- max_attempts=10, starttime=t)
- def test_limits_partition(self):
- """
- Test to see that jobs from different users fall into different
- equivalence classes with queue hard limits and partitions
- """
- self.setup_sc1()
- self.setup_queues_nodes()
- t = int(time.time())
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'False'}, id="sc1")
- self.server.manager(MGR_CMD_SET, QUEUE,
- {'max_run': '[u:PBS_GENERIC=1]'}, id='wq1')
- self.server.manager(MGR_CMD_SET, QUEUE,
- {'max_run': '[u:PBS_GENERIC=1]'}, id='wq4')
- # Eat up all the resources
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
- J = Job(TEST_USER, attrs=a)
- self.server.submit(J)
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
- J = Job(TEST_USER, attrs=a)
- self.server.submit(J)
- a = {ATTR_queue: 'wq1'}
- self.submit_jobs(3, a, user=TEST_USER1)
- self.submit_jobs(3, a, user=TEST_USER2)
- a = {ATTR_queue: 'wq4'}
- self.submit_jobs(3, a, user=TEST_USER1)
- self.submit_jobs(3, a, user=TEST_USER2)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc1")
- # Six equivalence classes. Two for the resource eating job in
- # different partitions and one for each user per partition.
- self.scheds['sc1'].log_match("Number of job equivalence classes: 6",
- max_attempts=10, starttime=t)
- def test_job_array_partition(self):
- """
- Test that various job types will fall into single equivalence
- class with same type of request and will only fall into different
- equivalence class if partition is different
- """
- self.setup_sc1()
- self.setup_queues_nodes()
- t = int(time.time())
- # Eat up all the resources
- a = {'Resource_List.select': '1:ncpus=2', 'queue': 'wq1'}
- J = Job(TEST_USER1, attrs=a)
- self.server.submit(J)
- a = {'Resource_List.select': '1:ncpus=2', 'queue': 'wq4'}
- J = Job(TEST_USER1, attrs=a)
- self.server.submit(J)
- # Submit a job array
- j = Job(TEST_USER)
- j.set_attributes(
- {ATTR_J: '1-3:1',
- 'Resource_List.select': '1:ncpus=2',
- 'queue': 'wq1'})
- self.server.submit(j)
- j.set_attributes(
- {ATTR_J: '1-3:1',
- 'Resource_List.select': '1:ncpus=2',
- 'queue': 'wq4'})
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc1")
- # Two equivalence class one for each partition
- self.scheds['sc1'].log_match("Number of job equivalence classes: 2",
- max_attempts=10, starttime=t)
- def test_equiv_suspend_jobs(self):
- """
- Test that jobs fall into different equivalence classes
- after they get suspended
- """
- self.setup_sc1()
- self.setup_queues_nodes()
- t = int(time.time())
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'False'}, id="sc1")
- # Eat up all the resources
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
- J = Job(TEST_USER, attrs=a)
- jid1 = self.server.submit(J)
- self.server.submit(J)
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
- J = Job(TEST_USER, attrs=a)
- jid3 = self.server.submit(J)
- self.server.submit(J)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc1")
- # 2 equivalence classes one for each partition
- self.scheds['sc1'].log_match("Number of job equivalence classes: 2",
- max_attempts=10, starttime=t)
- t = int(time.time())
- # Make sure that Job is in R state before issuing a signal to suspend
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- self.server.sigjob(jobid=jid1, signal="suspend")
- self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
- self.server.sigjob(jobid=jid3, signal="suspend")
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc1")
- # 4 equivalance classes 2 for partition 2 for suspended jobs
- self.scheds['sc1'].log_match("Number of job equivalence classes: 4",
- max_attempts=10, starttime=t)
- def test_equiv_single_partition(self):
- """
- Test that jobs fall into same equivalence class if jobs fall
- into queues set to same partition
- """
- self.setup_sc1()
- self.setup_queues_nodes()
- t = int(time.time())
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'False'}, id="sc1")
- self.server.manager(MGR_CMD_SET, QUEUE,
- {'partition': 'P1'}, id='wq4')
- # Eat up all the resources with the first job to wq1
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
- self.submit_jobs(4, a)
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
- self.submit_jobs(3, a)
- a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'wq1'}
- self.submit_jobs(3, a)
- a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'wq4'}
- self.submit_jobs(3, a)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc1")
- # 2 equivalence classes one for each with different ncpus request
- # as both queues are having same partition
- self.scheds['sc1'].log_match("Number of job equivalence classes: 2",
- max_attempts=10, starttime=t)
- def test_list_multi_sched(self):
- """
- Test to verify that qmgr list sched works when multiple
- schedulers are present
- """
- self.setup_sc1()
- self.setup_sc2()
- self.setup_sc3()
- self.server.manager(MGR_CMD_LIST, SCHED)
- self.server.manager(MGR_CMD_LIST, SCHED, id="default")
- self.server.manager(MGR_CMD_LIST, SCHED, id="sc1")
- dir_path = os.path.join(os.sep, 'var', 'spool', 'pbs', 'sched_dir')
- a = {'partition': 'P2',
- 'sched_priv': os.path.join(dir_path, 'sched_priv_sc2'),
- 'sched_log': os.path.join(dir_path, 'sched_logs_sc2'),
- 'sched_host': self.server.hostname,
- 'sched_port': '15051'}
- self.server.manager(MGR_CMD_LIST, SCHED, a, id="sc2", expect=True)
- self.server.manager(MGR_CMD_LIST, SCHED, id="sc3")
- try:
- self.server.manager(MGR_CMD_LIST, SCHED, id="invalid_scname")
- except PbsManagerError as e:
- err_msg = "Unknown Scheduler"
- self.assertTrue(err_msg in e.msg[0],
- "Error message is not expected")
- # delete sc3 sched
- self.server.manager(MGR_CMD_DELETE, SCHED, id="sc3", sudo=True)
- try:
- self.server.manager(MGR_CMD_LIST, SCHED, id="sc3")
- except PbsManagerError as e:
- err_msg = "Unknown Scheduler"
- self.assertTrue(err_msg in e.msg[0],
- "Error message is not expected")
- self.server.manager(MGR_CMD_LIST, SCHED)
- self.server.manager(MGR_CMD_LIST, SCHED, id="default")
- self.server.manager(MGR_CMD_LIST, SCHED, id="sc1")
- self.server.manager(MGR_CMD_LIST, SCHED, id="sc2")
- # delete sc1 sched
- self.server.manager(MGR_CMD_DELETE, SCHED, id="sc1")
- try:
- self.server.manager(MGR_CMD_LIST, SCHED, id="sc1")
- except PbsManagerError as e:
- err_msg = "Unknown Scheduler"
- self.assertTrue(err_msg in e.msg[0],
- "Error message is not expected")
- def test_job_sort_formula_threshold(self):
- """
- Test the scheduler attribute job_sort_formula_threshold for multisched
- """
- # Multisched setup
- self.setup_sc3()
- p3 = {'partition': 'P3'}
- a = {'queue_type': 'execution',
- 'started': 'True',
- 'enabled': 'True'}
- a.update(p3)
- self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq1', expect=True)
- a = {'resources_available.ncpus': 2}
- self.server.create_vnodes('vnode', a, 2, self.mom)
- self.server.manager(MGR_CMD_SET, NODE, p3, id='vnode[0]', expect=True)
- # Set job_sort_formula on the server
- self.server.manager(MGR_CMD_SET, SERVER, {'job_sort_formula': 'ncpus'})
- # Set job_sort_formula_threshold on the multisched
- self.server.manager(MGR_CMD_SET, SCHED,
- {'job_sort_formula_threshold': '2'},
- id="sc3", expect=True)
- # Submit job to multisched
- j1_attrs = {ATTR_queue: 'wq1', 'Resource_List.ncpus': '1'}
- J1 = Job(TEST_USER, j1_attrs)
- jid_1 = self.server.submit(J1)
- # Submit job to default scheduler
- J2 = Job(TEST_USER, attrs={'Resource_List.ncpus': '1'})
- jid_2 = self.server.submit(J2)
- msg = {'job_state': 'Q',
- 'comment': ('Not Running: Job is ' +
- 'under job_sort_formula threshold value')}
- self.server.expect(JOB, msg, id=jid_1)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid_2)
- @staticmethod
- def cust_attr(name, totnodes, numnode, attrib):
- a = {}
- if numnode in range(0, 3):
- a['resources_available.switch'] = 'A'
- if numnode in range(3, 5):
- a['resources_available.switch'] = 'B'
- if numnode in range(6, 9):
- a['resources_available.switch'] = 'A'
- a['partition'] = 'P2'
- if numnode in range(9, 11):
- a['resources_available.switch'] = 'B'
- a['partition'] = 'P2'
- if numnode is 11:
- a['partition'] = 'P2'
- return dict(attrib.items() + a.items())
- def setup_placement_set(self):
- self.server.add_resource('switch', 'string_array', 'h')
- a = {'resources_available.ncpus': 2}
- self.server.create_vnodes(
- 'vnode', a, 12, self.mom, attrfunc=self.cust_attr)
- self.server.manager(MGR_CMD_SET, SERVER, {'node_group_key': 'switch'})
- self.server.manager(MGR_CMD_SET, SERVER, {'node_group_enable': 't'})
- def test_multi_sched_explicit_ps(self):
- """
- Test only_explicit_ps set to sched attr will be in affect
- and will not read from default scheduler
- """
- self.setup_placement_set()
- self.setup_sc2()
- a = {'queue_type': 'execution',
- 'started': 'True',
- 'enabled': 'True',
- 'partition': 'P2'}
- self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq2')
- a = {'Resource_List.select': '1:ncpus=2'}
- j = Job(TEST_USER, attrs=a)
- j1id = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=j1id)
- nodes = ['vnode[5]']
- self.check_vnodes(j, nodes, j1id)
- a = {'Resource_List.select': '2:ncpus=2'}
- j = Job(TEST_USER, attrs=a)
- j2id = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=j2id)
- nodes = ['vnode[3]', 'vnode[4]']
- self.check_vnodes(j, nodes, j2id)
- a = {'Resource_List.select': '3:ncpus=2'}
- j = Job(TEST_USER, attrs=a)
- j3id = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=j3id)
- nodes = ['vnode[0]', 'vnode[1]', 'vnode[2]']
- self.check_vnodes(j, nodes, j3id)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'only_explicit_psets': 't'}, id='sc2')
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq2'}
- j = Job(TEST_USER, attrs=a)
- j4id = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=j4id)
- nodes = ['vnode[9]']
- self.check_vnodes(j, nodes, j4id)
- a = {'Resource_List.select': '2:ncpus=2', ATTR_queue: 'wq2'}
- j = Job(TEST_USER, attrs=a)
- j5id = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=j5id)
- nodes = ['vnode[6]', 'vnode[7]']
- self.check_vnodes(j, nodes, j5id)
- a = {'Resource_List.select': '3:ncpus=2', ATTR_queue: 'wq2'}
- j = Job(TEST_USER, attrs=a)
- j6id = self.server.submit(j)
- self.server.expect(JOB, {
- 'job_state': 'Q',
- 'comment': 'Not Running: Placement set switch=A'
- ' has too few free resources'}, id=j6id)
- def test_jobs_do_not_span_ps(self):
- """
- Test do_not_span_psets set to sched attr will be in affect
- and will not read from default scheduler
- """
- self.setup_placement_set()
- self.setup_sc2()
- a = {'queue_type': 'execution',
- 'started': 'True',
- 'enabled': 'True',
- 'partition': 'P2'}
- self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq2')
- # Scheduler sc2 cannot span across placement sets
- self.server.manager(MGR_CMD_SET, SCHED, {
- 'do_not_span_psets': 't'}, id='sc2')
- self.server.manager(MGR_CMD_SET, SCHED, {
- 'scheduling': 't'}, id='sc2')
- a = {'Resource_List.select': '4:ncpus=2', ATTR_queue: 'wq2'}
- j = Job(TEST_USER, attrs=a)
- j1id = self.server.submit(j)
- self.server.expect(
- JOB, {'job_state': 'Q', 'comment': 'Can Never Run: can\'t fit in '
- 'the largest placement set, and can\'t span psets'}, id=j1id)
- # Default scheduler can span as do_not_span_psets is not set
- a = {'Resource_List.select': '4:ncpus=2'}
- j = Job(TEST_USER, attrs=a)
- j2id = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=j2id)
- def test_sched_preempt_enforce_resumption(self):
- """
- Test sched_preempt_enforce_resumption can be set to a multi sched
- and that even if topjob_ineligible is set for a preempted job
- and sched_preempt_enforce_resumption is set true , the
- preempted job will be calandered
- """
- self.setup_sc1()
- self.setup_queues_nodes()
- prio = {'Priority': 150, 'partition': 'P1'}
- self.server.manager(MGR_CMD_SET, QUEUE, prio, id='wq4')
- self.server.manager(MGR_CMD_SET, SCHED,
- {'sched_preempt_enforce_resumption': 'true'},
- id='sc1')
- self.server.manager(MGR_CMD_SET, SERVER, {'backfill_depth': '2'})
- # Submit a job
- j = Job(TEST_USER, {'Resource_List.walltime': '120',
- 'Resource_List.ncpus': '2'})
- jid1 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- j = Job(TEST_USER, {'Resource_List.walltime': '120',
- 'Resource_List.ncpus': '2',
- ATTR_queue: 'wq1'})
- jid2 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
- # Alter topjob_ineligible for running job
- self.server.alterjob(jid1, {ATTR_W: "topjob_ineligible = true"},
- runas=ROOT_USER, logerr=True)
- self.server.alterjob(jid1, {ATTR_W: "topjob_ineligible = true"},
- runas=ROOT_USER, logerr=True)
- # Create a high priority queue
- a = {'queue_type': 'e', 'started': 't',
- 'enabled': 't', 'Priority': '150'}
- self.server.manager(MGR_CMD_CREATE, QUEUE, a, id="highp")
- # Submit 2 jobs to high priority queue
- j = Job(TEST_USER, {'queue': 'highp', 'Resource_List.walltime': '60',
- 'Resource_List.ncpus': '2'})
- jid3 = self.server.submit(j)
- j = Job(TEST_USER, {'queue': 'wq4', 'Resource_List.walltime': '60',
- 'Resource_List.ncpus': '2'})
- jid4 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid4)
- # Verify that job1 is not calendered
- self.server.expect(JOB, 'estimated.start_time',
- op=UNSET, id=jid1)
- # Verify that job2 is calendared
- self.server.expect(JOB, 'estimated.start_time',
- op=SET, id=jid2)
- qstat = self.server.status(JOB, 'estimated.start_time',
- id=jid2)
- est_time = qstat[0]['estimated.start_time']
- self.assertNotEqual(est_time, None)
- self.scheds['sc1'].log_match(jid2 + ";Job is a top job",
- starttime=self.server.ctime)
- def set_primetime(self, ptime_start, ptime_end, scid='default'):
- """
- This function will set the prime time
- in holidays file
- """
- p_day = 'weekday'
- p_hhmm = time.strftime('%H%M', time.localtime(ptime_start))
- np_hhmm = time.strftime('%H%M', time.localtime(ptime_end))
- self.scheds[scid].holidays_set_day(p_day, p_hhmm, np_hhmm)
- p_day = 'saturday'
- self.scheds[scid].holidays_set_day(p_day, p_hhmm, np_hhmm)
- p_day = 'sunday'
- self.scheds[scid].holidays_set_day(p_day, p_hhmm, np_hhmm)
- def test_prime_time_backfill(self):
- """
- Test opt_backfill_fuzzy can be set to a multi sched and
- while calandering primetime/nonprimetime will be considered
- """
- self.setup_sc2()
- self.setup_queues_nodes()
- a = {'strict_ordering': "True ALL"}
- self.scheds['sc2'].set_sched_config(a)
- # set primetime which will start after 30min
- prime_start = int(time.time()) + 1800
- prime_end = int(time.time()) + 3600
- self.set_primetime(prime_start, prime_end, scid='sc2')
- self.server.manager(MGR_CMD_SET, SCHED,
- {'opt_backfill_fuzzy': 'high'}, id='sc2')
- self.server.manager(MGR_CMD_SET, SERVER, {'backfill_depth': '2'})
- # Submit a job
- j = Job(TEST_USER, {'Resource_List.walltime': '60',
- 'Resource_List.ncpus': '2',
- ATTR_queue: 'wq2'})
- jid1 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- j = Job(TEST_USER1, {'Resource_List.ncpus': '2',
- ATTR_queue: 'wq2'})
- jid2 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
- # Verify that job2 is calendared to start at primetime start
- self.server.expect(JOB, 'estimated.start_time',
- op=SET, id=jid2)
- qstat = self.server.status(JOB, 'estimated.start_time',
- id=jid2)
- est_time = qstat[0]['estimated.start_time']
- est_epoch = est_time
- if self.server.get_op_mode() == PTL_CLI:
- est_epoch = int(time.mktime(time.strptime(est_time, '%c')))
- prime_mod = prime_start % 60 # ignoring the seconds
- self.assertEqual((prime_start - prime_mod), est_epoch)
- def test_prime_time_multisched(self):
- """
- Test prime time queue can be set partition and multi sched
- considers prime time queue for jobs submitted to the p_queue
- """
- self.setup_sc2()
- self.setup_queues_nodes()
- # set primetime which will start after 30min
- prime_start = int(time.time()) + 1800
- prime_end = int(time.time()) + 3600
- self.set_primetime(prime_start, prime_end, scid='sc2')
- a = {'queue_type': 'e', 'started': 't',
- 'enabled': 't', 'partition': 'P2'}
- self.server.manager(MGR_CMD_CREATE, QUEUE, a, id="p_queue")
- j = Job(TEST_USER1, {'Resource_List.ncpus': '1',
- ATTR_queue: 'wq2'})
- jid1 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- j = Job(TEST_USER1, {'Resource_List.ncpus': '1',
- ATTR_queue: 'p_queue'})
- jid2 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
- msg = 'Job will run in primetime only'
- self.server.expect(JOB, {ATTR_comment: "Not Running: " + msg}, id=jid2)
- self.scheds['sc2'].log_match(jid2 + ";Job only runs in primetime",
- starttime=self.server.ctime)
- def test_dedicated_time_multisched(self):
- """
- Test dedicated time queue can be set partition and multi sched
- considers dedicated time for jobs submitted to the ded_queue
- """
- self.setup_sc2()
- self.setup_queues_nodes()
- # Create a dedicated time queue
- ded_start = int(time.time()) + 1800
- ded_end = int(time.time()) + 3600
- self.scheds['sc2'].add_dedicated_time(start=ded_start, end=ded_end)
- a = {'queue_type': 'e', 'started': 't',
- 'enabled': 't', 'partition': 'P2'}
- self.server.manager(MGR_CMD_CREATE, QUEUE, a, id="ded_queue")
- j = Job(TEST_USER1, {'Resource_List.ncpus': '1',
- ATTR_queue: 'wq2'})
- jid1 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- j = Job(TEST_USER1, {'Resource_List.ncpus': '1',
- ATTR_queue: 'ded_queue'})
- jid2 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
- msg = 'Dedicated time conflict'
- self.server.expect(JOB, {ATTR_comment: "Not Running: " + msg}, id=jid2)
- self.scheds['sc2'].log_match(jid2 + ";Dedicated Time",
- starttime=self.server.ctime)
- def test_auto_sched_off_due_to_fds_limit(self):
- """
- Test to make sure scheduling should be turned off automatically
- when number of open files per process are exhausted
- """
- if os.getuid() != 0 or sys.platform in ('cygwin', 'win32'):
- self.skipTest("Test need to run as root")
- try:
- # get the number of open files per process
- (open_files_soft_limit, open_files_hard_limit) =\
- resource.getrlimit(resource.RLIMIT_NOFILE)
- # set the soft limit of number of open files per process to 10
- resource.setrlimit(resource.RLIMIT_NOFILE,
- (10, open_files_hard_limit))
- except (ValueError, resource.error):
- self.assertFalse(True, "Error in accessing system RLIMIT_ "
- "variables, test fails.")
- self.setup_sc3()
- self.server.manager(MGR_CMD_SET, SCHED, {'scheduler_iteration': 1},
- id="sc3", expect=True)
- self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
- id="sc3", expect=True)
- self.logger.info('The sleep is 15 seconds which will trigger required '
- 'number of scheduling cycles that are needed to '
- 'exhaust open files per process which is 10 in our '
- 'case')
- time.sleep(15)
- # scheduling should not go to false once all fds per process
- # are exhausted.
- self.server.expect(SCHED, {'scheduling': 'True'},
- id='sc3', max_attempts=10)
- try:
- resource.setrlimit(resource.RLIMIT_NOFILE, (open_files_soft_limit,
- open_files_hard_limit))
- except (ValueError, resource.error):
- self.assertFalse(True, "Error in accessing system RLIMIT_ "
- "variables, test fails.")
- def test_set_msched_attr_sched_log_with_sched_off(self):
- """
- Test to set Multisched attributes even when its scheduling is off
- and check whether they are actually be effective
- """
- self.setup_sc3()
- self.scheds['sc3'].set_sched_config({'log_filter': 2048})
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'False'}, id="sc3")
- new_sched_log = os.path.join(self.server.pbs_conf['PBS_HOME'],
- 'sc3_new_logs')
- if os.path.exists(new_sched_log):
- self.du.rm(path=new_sched_log, recursive=True,
- sudo=True, force=True)
- self.du.mkdir(path=new_sched_log, sudo=True)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'sched_log': new_sched_log},
- id="sc3", expect=True)
- a = {'sched_log': new_sched_log}
- self.server.expect(SCHED, a, id='sc3', max_attempts=10)
- # This is required since we need to call log_match only when
- # the new log file is created.
- time.sleep(1)
- self.scheds['sc3'].log_match(
- "scheduler log directory is changed to " + new_sched_log,
- max_attempts=10, starttime=self.server.ctime)
- def test_set_msched_attr_sched_priv_with_sched_off(self):
- """
- Test to set Multisched attributes even when its scheduling is off
- and check whether they are actually be effective
- """
- self.setup_sc3()
- self.scheds['sc3'].set_sched_config({'log_filter': 2048})
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'False'}, id="sc3")
- # create and set-up a new priv directory for sc3
- new_sched_priv = os.path.join(self.server.pbs_conf['PBS_HOME'],
- 'sched_priv_new')
- if os.path.exists(new_sched_priv):
- self.du.rm(path=new_sched_priv, recursive=True,
- sudo=True, force=True)
- dflt_sched_priv = os.path.join(self.server.pbs_conf['PBS_HOME'],
- 'sched_priv')
- self.du.run_copy(src=dflt_sched_priv, dest=new_sched_priv,
- recursive=True, sudo=True)
- self.server.manager(MGR_CMD_SET, SCHED, {'sched_priv': new_sched_priv},
- id="sc3", expect=True)
- a = {'sched_priv': new_sched_priv}
- self.server.expect(SCHED, a, id='sc3', max_attempts=10)
- # This is required since we need to call log_match only when
- # the new log file is created.
- time.sleep(1)
- self.scheds['sc3'].log_match(
- "scheduler priv directory has changed to " + new_sched_priv,
- max_attempts=10, starttime=self.server.ctime)
- def test_set_msched_update_inbuilt_attrs_accrue_type(self):
- """
- Test to make sure Multisched is able to update any one of the builtin
- attributes like accrue_type
- """
- a = {'eligible_time_enable': 'True'}
- self.server.manager(MGR_CMD_SET, SERVER, a)
- self.setup_sc3()
- self.setup_queues_nodes()
- a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq3'}
- J1 = Job(TEST_USER1, attrs=a)
- J2 = Job(TEST_USER1, attrs=a)
- jid1 = self.server.submit(J1)
- self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
- jid2 = self.server.submit(J2)
- self.server.expect(JOB, {ATTR_state: 'Q'}, id=jid2)
- # accrue_type = 2 is eligible_time
- self.server.expect(JOB, {ATTR_accrue_type: 2}, id=jid2)
- self.server.delete(jid1, wait=True)
- self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
- # This makes sure that accrue_type is indeed getting changed
- self.server.expect(JOB, {ATTR_accrue_type: 3}, id=jid2)
- def test_multisched_not_crash(self):
- """
- Test to make sure Multisched does not crash when all nodes in partition
- are not associated with the corresponding queue
- """
- self.setup_sc1()
- self.setup_queues_nodes()
- # Assign a queue with the partition P1. This queue association is not
- # required as per the current Multisched feature. But this is just to
- # verify even if we associate a queue to one of the nodes in partition
- # the scheduler won't crash.
- # Ex: Here we are associating wq1 to vnode[0] but vnode[4] has no
- # queue associated to it. Expectation is in this case scheduler won't
- # crash
- a = {ATTR_queue: 'wq1'}
- self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[0]', expect="True")
- self.scheds['sc1'].terminate()
- self.scheds['sc1'].start()
- # Ideally the following statement is not requried. start() method
- # itself should take care of updating the PID in its cache. I have
- # created a new bug to fix in this framework. For the time being
- # the following statement is required as a work around.
- self.scheds['sc1']._update_pid(self.scheds['sc1'])
- j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
- 'Resource_List.select': '1:ncpus=1'})
- jid1 = self.server.submit(j)
- # If job goes to R state means scheduler is still alive.
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- def test_multi_sched_job_sort_key(self):
- """
- Test to make sure that jobs are sorted as per
- job_sort_key in a multi sched
- """
- self.setup_sc1()
- self.setup_queues_nodes()
- a = {'job_sort_key': '"ncpus LOW"'}
- self.scheds['sc1'].set_sched_config(a)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'False'}, id="sc1")
- j = Job(TEST_USER, {'Resource_List.ncpus': '2',
- ATTR_queue: 'wq1'})
- jid1 = self.server.submit(j)
- j = Job(TEST_USER, {'Resource_List.ncpus': '1',
- ATTR_queue: 'wq1'})
- jid2 = self.server.submit(j)
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id="sc1")
- self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
- self.server.expect(JOB, {'job_state': 'Q'}, id=jid1)
- def test_multi_sched_node_sort_key(self):
- """
- Test to make sure nodes are sorted in the order
- as per node_sort_key in a multi sched
- """
- self.setup_sc1()
- self.setup_queues_nodes()
- a = {'partition': 'P1'}
- self.server.manager(MGR_CMD_SET, NODE, a, id='@default', expect=True)
- a = {'node_sort_key': '"ncpus HIGH " ALL'}
- self.scheds['sc1'].set_sched_config(a)
- a = {'resources_available.ncpus': 1}
- self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[0]', expect=True)
- a = {'resources_available.ncpus': 2}
- self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[1]', expect=True)
- a = {'resources_available.ncpus': 3}
- self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[2]', expect=True)
- a = {'resources_available.ncpus': 4}
- self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[3]', expect=True)
- # Offlining the node as we do not need for the test
- a = {'state': 'offline'}
- self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[4]', expect=True)
- a = {'Resource_List.select': '1:ncpus=1',
- 'Resource_List.place': 'excl',
- ATTR_queue: 'wq1'}
- j = Job(TEST_USER1, a)
- jid = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- self.check_vnodes(j, ['vnode[3]'], jid)
- j = Job(TEST_USER1, a)
- jid1 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
- self.check_vnodes(j, ['vnode[2]'], jid1)
- j = Job(TEST_USER1, a)
- jid2 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
- self.check_vnodes(j, ['vnode[1]'], jid2)
- j = Job(TEST_USER1, a)
- jid3 = self.server.submit(j)
- self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
- self.check_vnodes(j, ['vnode[0]'], jid3)
- def test_multi_sched_priority_sockets(self):
- """
- Test scheduler socket connections from all the schedulers
- are processed on priority
- """
- self.common_setup()
- self.server.manager(MGR_CMD_SET, SERVER, {'log_events': 2047})
- for name in self.scheds:
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'False'}, id=name, expect=True)
- a = {ATTR_queue: 'wq1',
- 'Resource_List.select': '1:ncpus=2',
- 'Resource_List.walltime': 60}
- j = Job(TEST_USER1, attrs=a)
- self.server.submit(j)
- t = int(time.time())
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id='sc1', expect=True)
- self.server.log_match("processing priority socket", starttime=t)
- a = {ATTR_queue: 'wq2',
- 'Resource_List.select': '1:ncpus=2',
- 'Resource_List.walltime': 60}
- j = Job(TEST_USER1, attrs=a)
- self.server.submit(j)
- t = int(time.time())
- self.server.manager(MGR_CMD_SET, SCHED,
- {'scheduling': 'True'}, id='sc2', expect=True)
- self.server.log_match("processing priority socket", starttime=t)
|