pbs_multi_sched.py 88 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034
  1. # coding: utf-8
  2. # Copyright (C) 1994-2018 Altair Engineering, Inc.
  3. # For more information, contact Altair at www.altair.com.
  4. #
  5. # This file is part of the PBS Professional ("PBS Pro") software.
  6. #
  7. # Open Source License Information:
  8. #
  9. # PBS Pro is free software. You can redistribute it and/or modify it under the
  10. # terms of the GNU Affero General Public License as published by the Free
  11. # Software Foundation, either version 3 of the License, or (at your option) any
  12. # later version.
  13. #
  14. # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
  15. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16. # FOR A PARTICULAR PURPOSE.
  17. # See the GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. #
  22. # Commercial License Information:
  23. #
  24. # For a copy of the commercial license terms and conditions,
  25. # go to: (http://www.pbspro.com/UserArea/agreement.html)
  26. # or contact the Altair Legal Department.
  27. #
  28. # Altair’s dual-license business model allows companies, individuals, and
  29. # organizations to create proprietary derivative works of PBS Pro and
  30. # distribute them - whether embedded or bundled with other software -
  31. # under a commercial license agreement.
  32. #
  33. # Use of Altair’s trademarks, including but not limited to "PBS™",
  34. # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
  35. # trademark licensing policies.
  36. from tests.functional import *
  37. import resource
  38. class TestMultipleSchedulers(TestFunctional):
  39. """
  40. Test suite to test different scheduler interfaces
  41. """
  42. def setup_sc1(self):
  43. a = {'partition': 'P1,P4',
  44. 'sched_host': self.server.hostname,
  45. 'sched_port': '15050'}
  46. self.server.manager(MGR_CMD_CREATE, SCHED,
  47. a, id="sc1")
  48. self.scheds['sc1'].create_scheduler()
  49. self.scheds['sc1'].start()
  50. self.server.manager(MGR_CMD_SET, SCHED,
  51. {'scheduling': 'True'}, id="sc1", expect=True)
  52. self.scheds['sc1'].set_sched_config({'log_filter': 2048})
  53. def setup_sc2(self):
  54. dir_path = os.path.join(os.sep, 'var', 'spool', 'pbs', 'sched_dir')
  55. if not os.path.exists(dir_path):
  56. self.du.mkdir(path=dir_path, sudo=True)
  57. a = {'partition': 'P2',
  58. 'sched_priv': os.path.join(dir_path, 'sched_priv_sc2'),
  59. 'sched_log': os.path.join(dir_path, 'sched_logs_sc2'),
  60. 'sched_host': self.server.hostname,
  61. 'sched_port': '15051'}
  62. self.server.manager(MGR_CMD_CREATE, SCHED,
  63. a, id="sc2")
  64. self.scheds['sc2'].create_scheduler(dir_path)
  65. self.scheds['sc2'].start(dir_path)
  66. self.server.manager(MGR_CMD_SET, SCHED,
  67. {'scheduling': 'True'}, id="sc2", expect=True)
  68. def setup_sc3(self):
  69. a = {'partition': 'P3',
  70. 'sched_host': self.server.hostname,
  71. 'sched_port': '15052'}
  72. self.server.manager(MGR_CMD_CREATE, SCHED,
  73. a, id="sc3")
  74. self.scheds['sc3'].create_scheduler()
  75. self.scheds['sc3'].start()
  76. self.server.manager(MGR_CMD_SET, SCHED,
  77. {'scheduling': 'True'}, id="sc3", expect=True)
  78. def setup_queues_nodes(self):
  79. a = {'queue_type': 'execution',
  80. 'started': 'True',
  81. 'enabled': 'True'}
  82. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq1')
  83. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq2')
  84. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq3')
  85. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq4')
  86. p1 = {'partition': 'P1'}
  87. self.server.manager(MGR_CMD_SET, QUEUE, p1, id='wq1', expect=True)
  88. p2 = {'partition': 'P2'}
  89. self.server.manager(MGR_CMD_SET, QUEUE, p2, id='wq2', expect=True)
  90. p3 = {'partition': 'P3'}
  91. self.server.manager(MGR_CMD_SET, QUEUE, p3, id='wq3', expect=True)
  92. p4 = {'partition': 'P4'}
  93. self.server.manager(MGR_CMD_SET, QUEUE, p4, id='wq4', expect=True)
  94. a = {'resources_available.ncpus': 2}
  95. self.server.create_vnodes('vnode', a, 5, self.mom)
  96. self.server.manager(MGR_CMD_SET, NODE, p1, id='vnode[0]', expect=True)
  97. self.server.manager(MGR_CMD_SET, NODE, p2, id='vnode[1]', expect=True)
  98. self.server.manager(MGR_CMD_SET, NODE, p3, id='vnode[2]', expect=True)
  99. self.server.manager(MGR_CMD_SET, NODE, p4, id='vnode[3]', expect=True)
  100. def common_setup(self):
  101. self.setup_sc1()
  102. self.setup_sc2()
  103. self.setup_sc3()
  104. self.setup_queues_nodes()
  105. def check_vnodes(self, j, vnodes, jid):
  106. self.server.status(JOB, 'exec_vnode', id=jid)
  107. nodes = j.get_vnodes(j.exec_vnode)
  108. for vnode in vnodes:
  109. if vnode not in nodes:
  110. self.assertFalse(True, str(vnode) +
  111. " is not in exec_vnode list as expected")
  112. def test_set_sched_priv(self):
  113. """
  114. Test sched_priv can be only set to valid paths
  115. and check for appropriate comments
  116. """
  117. self.setup_sc1()
  118. if not os.path.exists('/var/sched_priv_do_not_exist'):
  119. self.server.manager(MGR_CMD_SET, SCHED,
  120. {'sched_priv': '/var/sched_priv_do_not_exist'},
  121. id="sc1")
  122. msg = 'PBS failed validation checks for sched_priv directory'
  123. a = {'sched_priv': '/var/sched_priv_do_not_exist',
  124. 'comment': msg,
  125. 'scheduling': 'False'}
  126. self.server.expect(SCHED, a, id='sc1', attrop=PTL_AND, max_attempts=10)
  127. pbs_home = self.server.pbs_conf['PBS_HOME']
  128. self.du.run_copy(self.server.hostname,
  129. os.path.join(pbs_home, 'sched_priv'),
  130. os.path.join(pbs_home, 'sc1_new_priv'),
  131. recursive=True)
  132. self.server.manager(MGR_CMD_SET, SCHED,
  133. {'sched_priv': '/var/spool/pbs/sc1_new_priv'},
  134. id="sc1")
  135. a = {'sched_priv': '/var/spool/pbs/sc1_new_priv'}
  136. self.server.expect(SCHED, a, id='sc1', max_attempts=10)
  137. self.server.manager(MGR_CMD_SET, SCHED,
  138. {'scheduling': 'True'}, id="sc1")
  139. # Blocked by PP-1202 will revisit once its fixed
  140. # self.server.expect(SCHED, 'comment', id='sc1', op=UNSET)
  141. def test_set_sched_log(self):
  142. """
  143. Test sched_log can be only set to valid paths
  144. and check for appropriate comments
  145. """
  146. self.setup_sc1()
  147. if not os.path.exists('/var/sched_log_do_not_exist'):
  148. self.server.manager(MGR_CMD_SET, SCHED,
  149. {'sched_log': '/var/sched_log_do_not_exist'},
  150. id="sc1")
  151. a = {'sched_log': '/var/sched_log_do_not_exist',
  152. 'comment': 'Unable to change the sched_log directory',
  153. 'scheduling': 'False'}
  154. self.server.expect(SCHED, a, id='sc1', max_attempts=10)
  155. self.server.manager(MGR_CMD_SET, SCHED,
  156. {'scheduling': 'True'}, id="sc1")
  157. pbs_home = self.server.pbs_conf['PBS_HOME']
  158. self.du.mkdir(path=os.path.join(pbs_home, 'sc1_new_logs'),
  159. sudo=True)
  160. self.server.manager(MGR_CMD_SET, SCHED,
  161. {'sched_log': '/var/spool/pbs/sc1_new_logs'},
  162. id="sc1")
  163. a = {'sched_log': '/var/spool/pbs/sc1_new_logs'}
  164. self.server.expect(SCHED, a, id='sc1', max_attempts=10)
  165. self.server.manager(MGR_CMD_SET, SCHED,
  166. {'scheduling': 'True'}, id="sc1")
  167. # Blocked by PP-1202 will revisit once its fixed
  168. # self.server.expect(SCHED, 'comment', id='sc1', op=UNSET)
  169. def test_start_scheduler(self):
  170. """
  171. Test that scheduler wont start without appropriate folders created.
  172. Scheduler will log a message if started without partition. Test
  173. scheduler states down, idle, scheduling.
  174. """
  175. self.setup_queues_nodes()
  176. pbs_home = self.server.pbs_conf['PBS_HOME']
  177. self.server.manager(MGR_CMD_CREATE, SCHED,
  178. id="sc5")
  179. a = {'sched_host': self.server.hostname,
  180. 'sched_port': '15055',
  181. 'scheduling': 'True'}
  182. self.server.manager(MGR_CMD_SET, SCHED, a, id="sc5")
  183. # Try starting without sched_priv and sched_logs
  184. ret = self.scheds['sc5'].start()
  185. self.server.expect(SCHED, {'state': 'down'}, id='sc5', max_attempts=10)
  186. msg = "sched_priv dir is not present for scheduler"
  187. self.assertTrue(ret['rc'], msg)
  188. self.du.run_copy(self.server.hostname,
  189. os.path.join(pbs_home, 'sched_priv'),
  190. os.path.join(pbs_home, 'sched_priv_sc5'),
  191. recursive=True, sudo=True)
  192. ret = self.scheds['sc5'].start()
  193. msg = "sched_logs dir is not present for scheduler"
  194. self.assertTrue(ret['rc'], msg)
  195. self.du.run_copy(self.server.hostname,
  196. os.path.join(pbs_home, 'sched_logs'),
  197. os.path.join(pbs_home, 'sched_logs_sc5'),
  198. recursive=True, sudo=True)
  199. ret = self.scheds['sc5'].start()
  200. self.scheds['sc5'].log_match(
  201. "Scheduler does not contain a partition",
  202. max_attempts=10, starttime=self.server.ctime)
  203. self.server.manager(MGR_CMD_SET, SCHED,
  204. {'partition': 'P3'}, id="sc5")
  205. self.server.manager(MGR_CMD_SET, SCHED,
  206. {'Scheduling': 'True'}, id="sc5")
  207. self.server.expect(SCHED, {'state': 'idle'}, id='sc5', max_attempts=10)
  208. a = {'resources_available.ncpus': 100}
  209. self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[2]', expect=True)
  210. self.server.manager(MGR_CMD_SET, SCHED,
  211. {'scheduling': 'False'}, id="sc5")
  212. for _ in xrange(500):
  213. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3'})
  214. self.server.submit(j)
  215. self.server.manager(MGR_CMD_SET, SCHED,
  216. {'scheduling': 'True'}, id="sc5")
  217. self.server.expect(SCHED, {'state': 'scheduling'},
  218. id='sc5', max_attempts=10)
  219. def test_resource_sched_reconfigure(self):
  220. """
  221. Test all schedulers will reconfigure while creating,
  222. setting or deleting a resource
  223. """
  224. self.common_setup()
  225. t = int(time.time())
  226. self.server.manager(MGR_CMD_CREATE, RSC, id='foo')
  227. for name in self.scheds:
  228. self.scheds[name].log_match(
  229. "Scheduler is reconfiguring",
  230. max_attempts=10, starttime=t)
  231. # sleeping to make sure we are not checking for the
  232. # same scheduler reconfiguring message again
  233. time.sleep(1)
  234. t = int(time.time())
  235. attr = {ATTR_RESC_TYPE: 'long'}
  236. self.server.manager(MGR_CMD_SET, RSC, attr, id='foo')
  237. for name in self.scheds:
  238. self.scheds[name].log_match(
  239. "Scheduler is reconfiguring",
  240. max_attempts=10, starttime=t)
  241. # sleeping to make sure we are not checking for the
  242. # same scheduler reconfiguring message again
  243. time.sleep(1)
  244. t = int(time.time())
  245. self.server.manager(MGR_CMD_DELETE, RSC, id='foo')
  246. for name in self.scheds:
  247. self.scheds[name].log_match(
  248. "Scheduler is reconfiguring",
  249. max_attempts=10, starttime=t)
  250. def test_remove_partition_sched(self):
  251. """
  252. Test that removing all the partitions from a scheduler
  253. unsets partition attribute on scheduler and update scheduler logs.
  254. """
  255. self.setup_sc1()
  256. # self.setup_sc2()
  257. self.server.manager(MGR_CMD_SET, SCHED,
  258. {'partition': (DECR, 'P1')}, id="sc1")
  259. self.server.manager(MGR_CMD_SET, SCHED,
  260. {'partition': (DECR, 'P4')}, id="sc1")
  261. self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
  262. id="sc1", expect=True)
  263. log_msg = "Scheduler does not contain a partition"
  264. self.scheds['sc1'].log_match(log_msg, max_attempts=10,
  265. starttime=self.server.ctime)
  266. # Blocked by PP-1202 will revisit once its fixed
  267. # self.server.manager(MGR_CMD_UNSET, SCHED, 'partition',
  268. # id="sc2", expect=True)
  269. def test_job_queue_partition(self):
  270. """
  271. Test job submitted to a queue associated to a partition will land
  272. into a node associated with that partition.
  273. """
  274. self.common_setup()
  275. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
  276. 'Resource_List.select': '1:ncpus=2'})
  277. jid = self.server.submit(j)
  278. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  279. self.check_vnodes(j, ['vnode[0]'], jid)
  280. self.scheds['sc1'].log_match(
  281. jid + ';Job run', max_attempts=10,
  282. starttime=self.server.ctime)
  283. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq2',
  284. 'Resource_List.select': '1:ncpus=2'})
  285. jid = self.server.submit(j)
  286. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  287. self.check_vnodes(j, ['vnode[1]'], jid)
  288. self.scheds['sc2'].log_match(
  289. jid + ';Job run', max_attempts=10,
  290. starttime=self.server.ctime)
  291. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
  292. 'Resource_List.select': '1:ncpus=2'})
  293. jid = self.server.submit(j)
  294. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  295. self.check_vnodes(j, ['vnode[2]'], jid)
  296. self.scheds['sc3'].log_match(
  297. jid + ';Job run', max_attempts=10,
  298. starttime=self.server.ctime)
  299. def test_multiple_partition_same_sched(self):
  300. """
  301. Test that scheduler will serve the jobs from different
  302. partition and run on nodes assigned to respective partitions.
  303. """
  304. self.setup_sc1()
  305. self.setup_queues_nodes()
  306. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
  307. 'Resource_List.select': '1:ncpus=1'})
  308. jid1 = self.server.submit(j)
  309. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  310. self.check_vnodes(j, ['vnode[0]'], jid1)
  311. self.scheds['sc1'].log_match(
  312. jid1 + ';Job run', max_attempts=10,
  313. starttime=self.server.ctime)
  314. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq4',
  315. 'Resource_List.select': '1:ncpus=1'})
  316. jid2 = self.server.submit(j)
  317. self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
  318. self.check_vnodes(j, ['vnode[3]'], jid2)
  319. self.scheds['sc1'].log_match(
  320. jid2 + ';Job run', max_attempts=10,
  321. starttime=self.server.ctime)
  322. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
  323. 'Resource_List.select': '1:ncpus=1'})
  324. jid3 = self.server.submit(j)
  325. self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
  326. self.check_vnodes(j, ['vnode[0]'], jid3)
  327. self.scheds['sc1'].log_match(
  328. jid3 + ';Job run', max_attempts=10,
  329. starttime=self.server.ctime)
  330. def test_multiple_queue_same_partition(self):
  331. """
  332. Test multiple queue associated with same partition
  333. is serviced by same scheduler
  334. """
  335. self.setup_sc1()
  336. self.setup_queues_nodes()
  337. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
  338. 'Resource_List.select': '1:ncpus=1'})
  339. jid = self.server.submit(j)
  340. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  341. self.check_vnodes(j, ['vnode[0]'], jid)
  342. self.scheds['sc1'].log_match(
  343. jid + ';Job run', max_attempts=10,
  344. starttime=self.server.ctime)
  345. p1 = {'partition': 'P1'}
  346. self.server.manager(MGR_CMD_SET, QUEUE, p1, id='wq4')
  347. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq4',
  348. 'Resource_List.select': '1:ncpus=1'})
  349. jid = self.server.submit(j)
  350. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  351. self.check_vnodes(j, ['vnode[0]'], jid)
  352. self.scheds['sc1'].log_match(
  353. jid + ';Job run', max_attempts=10,
  354. starttime=self.server.ctime)
  355. def test_preemption_highp_queue(self):
  356. """
  357. Test preemption occures only within queues which are assigned
  358. to same partition and check for equivalence classes
  359. """
  360. self.common_setup()
  361. prio = {'Priority': 150, 'partition': 'P1'}
  362. self.server.manager(MGR_CMD_SET, QUEUE, prio, id='wq4')
  363. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
  364. 'Resource_List.select': '1:ncpus=2'})
  365. jid1 = self.server.submit(j)
  366. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  367. t = int(time.time())
  368. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq4',
  369. 'Resource_List.select': '1:ncpus=2'})
  370. jid2 = self.server.submit(j)
  371. self.server.manager(MGR_CMD_SET, SCHED,
  372. {'scheduling': 'True'}, id="sc1")
  373. self.scheds['sc1'].log_match("Number of job equivalence classes: 1",
  374. max_attempts=10, starttime=t)
  375. self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
  376. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq4',
  377. 'Resource_List.select': '1:ncpus=2'})
  378. jid3 = self.server.submit(j)
  379. self.server.expect(JOB, ATTR_comment, op=SET, id=jid3)
  380. self.server.expect(JOB, {'job_state': 'Q'}, id=jid3)
  381. self.server.expect(JOB, {'job_state': 'S'}, id=jid1)
  382. self.scheds['sc1'].log_match(
  383. jid1 + ';Job preempted by suspension',
  384. max_attempts=10, starttime=t)
  385. # Two equivalence class one for suspended and one for remaining jobs
  386. self.scheds['sc1'].log_match("Number of job equivalence classes: 2",
  387. max_attempts=10, starttime=t)
  388. def test_backfill_per_scheduler(self):
  389. """
  390. Test backfilling is applicable only per scheduler
  391. """
  392. self.common_setup()
  393. t = int(time.time())
  394. self.scheds['sc2'].set_sched_config(
  395. {'strict_ordering': 'True ALL'})
  396. a = {ATTR_queue: 'wq2',
  397. 'Resource_List.select': '1:ncpus=2',
  398. 'Resource_List.walltime': 60}
  399. j = Job(TEST_USER1, attrs=a)
  400. jid1 = self.server.submit(j)
  401. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  402. j = Job(TEST_USER1, attrs=a)
  403. jid2 = self.server.submit(j)
  404. self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
  405. self.scheds['sc2'].log_match(
  406. jid2 + ';Job is a top job and will run at',
  407. starttime=t)
  408. a['queue'] = 'wq3'
  409. j = Job(TEST_USER1, attrs=a)
  410. jid3 = self.server.submit(j)
  411. self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
  412. j = Job(TEST_USER1, attrs=a)
  413. jid4 = self.server.submit(j)
  414. self.server.expect(JOB, {'job_state': 'Q'}, id=jid4)
  415. self.scheds['sc3'].log_match(
  416. jid4 + ';Job is a top job and will run at',
  417. max_attempts=5, starttime=t, existence=False)
  418. def test_resource_per_scheduler(self):
  419. """
  420. Test resources will be considered only by scheduler
  421. to which resource is added in sched_config
  422. """
  423. self.common_setup()
  424. a = {'type': 'float', 'flag': 'nh'}
  425. self.server.manager(MGR_CMD_CREATE, RSC, a, id='gpus')
  426. self.scheds['sc3'].add_resource("gpus")
  427. a = {'resources_available.gpus': 2}
  428. self.server.manager(MGR_CMD_SET, NODE, a, id='@default', expect=True)
  429. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
  430. 'Resource_List.select': '1:gpus=2',
  431. 'Resource_List.walltime': 60})
  432. jid1 = self.server.submit(j)
  433. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  434. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
  435. 'Resource_List.select': '1:gpus=2',
  436. 'Resource_List.walltime': 60})
  437. jid2 = self.server.submit(j)
  438. self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
  439. job_comment = "Not Running: Insufficient amount of resource: "
  440. job_comment += "gpus (R: 2 A: 0 T: 2)"
  441. self.server.expect(JOB, {'comment': job_comment}, id=jid2)
  442. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq2',
  443. 'Resource_List.select': '1:gpus=2'})
  444. jid3 = self.server.submit(j)
  445. self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
  446. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq2',
  447. 'Resource_List.select': '1:gpus=2'})
  448. jid4 = self.server.submit(j)
  449. self.server.expect(JOB, {'job_state': 'R'}, id=jid4)
  450. def test_restart_server(self):
  451. """
  452. Test after server restarts sched attributes are persistent
  453. """
  454. self.setup_sc1()
  455. sched_priv = os.path.join(
  456. self.server.pbs_conf['PBS_HOME'], 'sched_priv_sc1')
  457. sched_logs = os.path.join(
  458. self.server.pbs_conf['PBS_HOME'], 'sched_logs_sc1')
  459. a = {'sched_port': 15050,
  460. 'sched_host': self.server.hostname,
  461. 'sched_priv': sched_priv,
  462. 'sched_log': sched_logs,
  463. 'scheduling': 'True',
  464. 'scheduler_iteration': 600,
  465. 'state': 'idle',
  466. 'sched_cycle_length': '00:20:00'}
  467. self.server.expect(SCHED, a, id='sc1',
  468. attrop=PTL_AND, max_attempts=10)
  469. self.server.manager(MGR_CMD_SET, SCHED,
  470. {'scheduler_iteration': 300,
  471. 'sched_cycle_length': '00:10:00'},
  472. id='sc1')
  473. self.server.restart()
  474. a['scheduler_iteration'] = 300
  475. a['sched_cycle_length'] = '00:10:00'
  476. self.server.expect(SCHED, a, id='sc1',
  477. attrop=PTL_AND, max_attempts=10)
  478. def test_resv_default_sched(self):
  479. """
  480. Test reservations will only go to defualt scheduler
  481. """
  482. self.setup_queues_nodes()
  483. t = int(time.time())
  484. r = Reservation(TEST_USER)
  485. a = {'Resource_List.select': '2:ncpus=1'}
  486. r.set_attributes(a)
  487. rid = self.server.submit(r)
  488. a = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  489. self.server.expect(RESV, a, rid)
  490. self.scheds['default'].log_match(
  491. rid + ';Reservation Confirmed',
  492. max_attempts=10, starttime=t)
  493. def test_job_sorted_per_scheduler(self):
  494. """
  495. Test jobs are sorted as per job_sort_formula
  496. inside each scheduler
  497. """
  498. self.common_setup()
  499. self.server.manager(MGR_CMD_SET, SERVER,
  500. {'job_sort_formula': 'ncpus'})
  501. self.server.manager(MGR_CMD_SET, SCHED,
  502. {'scheduling': 'False'}, id="default")
  503. j = Job(TEST_USER1, attrs={'Resource_List.select': '1:ncpus=1'})
  504. jid1 = self.server.submit(j)
  505. self.server.expect(JOB, {'job_state': 'Q'}, id=jid1)
  506. j = Job(TEST_USER1, attrs={'Resource_List.select': '1:ncpus=2'})
  507. jid2 = self.server.submit(j)
  508. self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
  509. self.server.manager(MGR_CMD_SET, SCHED,
  510. {'scheduling': 'True'}, id="default")
  511. self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
  512. self.server.manager(MGR_CMD_SET, SCHED,
  513. {'scheduling': 'False'}, id="sc3")
  514. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
  515. 'Resource_List.select': '1:ncpus=1'})
  516. jid3 = self.server.submit(j)
  517. self.server.expect(JOB, {'job_state': 'Q'}, id=jid3)
  518. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
  519. 'Resource_List.select': '1:ncpus=2'})
  520. jid4 = self.server.submit(j)
  521. self.server.expect(JOB, {'job_state': 'Q'}, id=jid4)
  522. self.server.manager(MGR_CMD_SET, SCHED,
  523. {'scheduling': 'True'}, id="sc3")
  524. self.server.expect(JOB, {'job_state': 'R'}, id=jid4)
  525. def test_qrun_job(self):
  526. """
  527. Test jobs can be run by qrun by a newly created scheduler.
  528. """
  529. self.setup_sc1()
  530. self.setup_queues_nodes()
  531. self.server.manager(MGR_CMD_SET, SCHED,
  532. {'scheduling': 'False'}, id="sc1")
  533. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
  534. 'Resource_List.select': '1:ncpus=2'})
  535. jid1 = self.server.submit(j)
  536. self.server.expect(JOB, {'job_state': 'Q'}, id=jid1)
  537. self.server.runjob(jid1)
  538. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  539. def test_run_limts_per_scheduler(self):
  540. """
  541. Test run_limits applied at server level is
  542. applied for every scheduler seperately.
  543. """
  544. self.common_setup()
  545. self.server.manager(MGR_CMD_SET, SERVER,
  546. {'max_run': '[u:PBS_GENERIC=1]'})
  547. j = Job(TEST_USER1, attrs={'Resource_List.select': '1:ncpus=1'})
  548. jid1 = self.server.submit(j)
  549. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  550. j = Job(TEST_USER1, attrs={'Resource_List.select': '1:ncpus=1'})
  551. jid2 = self.server.submit(j)
  552. self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
  553. j = Job(TEST_USER1, attrs={'Resource_List.select': '1:ncpus=1'})
  554. jc = "Not Running: User has reached server running job limit."
  555. self.server.expect(JOB, {'comment': jc}, id=jid2)
  556. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
  557. 'Resource_List.select': '1:ncpus=1'})
  558. jid3 = self.server.submit(j)
  559. self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
  560. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
  561. 'Resource_List.select': '1:ncpus=1'})
  562. jid4 = self.server.submit(j)
  563. self.server.expect(JOB, {'job_state': 'Q'}, id=jid4)
  564. jc = "Not Running: User has reached server running job limit."
  565. self.server.expect(JOB, {'comment': jc}, id=jid4)
  566. def test_multi_fairshare(self):
  567. """
  568. Test different schedulers have their own fairshare trees with
  569. their own usage
  570. """
  571. self.common_setup()
  572. default_shares = 10
  573. default_usage = 100
  574. sc1_shares = 20
  575. sc1_usage = 200
  576. sc2_shares = 30
  577. sc2_usage = 300
  578. sc3_shares = 40
  579. sc3_usage = 400
  580. self.scheds['default'].add_to_resource_group(TEST_USER, 10, 'root',
  581. default_shares)
  582. self.scheds['default'].set_fairshare_usage(TEST_USER, default_usage)
  583. self.scheds['sc1'].add_to_resource_group(TEST_USER, 10, 'root',
  584. sc1_shares)
  585. self.scheds['sc1'].set_fairshare_usage(TEST_USER, sc1_usage)
  586. self.scheds['sc2'].add_to_resource_group(TEST_USER, 10, 'root',
  587. sc2_shares)
  588. self.scheds['sc2'].set_fairshare_usage(TEST_USER, sc2_usage)
  589. self.scheds['sc3'].add_to_resource_group(TEST_USER, 10, 'root',
  590. sc3_shares)
  591. self.scheds['sc3'].set_fairshare_usage(TEST_USER, sc3_usage)
  592. # requery fairshare info from pbsfs
  593. default_fs = self.scheds['default'].query_fairshare()
  594. sc1_fs = self.scheds['sc1'].query_fairshare()
  595. sc2_fs = self.scheds['sc2'].query_fairshare()
  596. sc3_fs = self.scheds['sc3'].query_fairshare()
  597. n = default_fs.get_node(id=10)
  598. self.assertEquals(n.nshares, default_shares)
  599. self.assertEquals(n.usage, default_usage)
  600. n = sc1_fs.get_node(id=10)
  601. self.assertEquals(n.nshares, sc1_shares)
  602. self.assertEquals(n.usage, sc1_usage)
  603. n = sc2_fs.get_node(id=10)
  604. self.assertEquals(n.nshares, sc2_shares)
  605. self.assertEquals(n.usage, sc2_usage)
  606. n = sc3_fs.get_node(id=10)
  607. self.assertEquals(n.nshares, sc3_shares)
  608. self.assertEquals(n.usage, sc3_usage)
  609. def test_fairshare_usage(self):
  610. """
  611. Test the schedulers fairshare usage file and
  612. check the usage file is updating correctly or not
  613. """
  614. self.setup_sc1()
  615. a = {'queue_type': 'execution',
  616. 'started': 'True',
  617. 'enabled': 'True',
  618. 'partition': 'P1'}
  619. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq1')
  620. # Set resources to node
  621. resc = {'resources_available.ncpus': 1,
  622. 'partition': 'P1'}
  623. self.server.manager(MGR_CMD_SET, NODE, resc, self.mom.shortname)
  624. # Add entry to the resource group of multisched 'sc1'
  625. self.scheds['sc1'].add_to_resource_group('grp1', 100, 'root', 60)
  626. self.scheds['sc1'].add_to_resource_group('grp2', 200, 'root', 40)
  627. self.scheds['sc1'].add_to_resource_group(TEST_USER1,
  628. 101, 'grp1', 40)
  629. self.scheds['sc1'].add_to_resource_group(TEST_USER2,
  630. 102, 'grp1', 20)
  631. self.scheds['sc1'].add_to_resource_group(TEST_USER3,
  632. 201, 'grp2', 30)
  633. self.scheds['sc1'].add_to_resource_group(TEST_USER4,
  634. 202, 'grp2', 10)
  635. # Set scheduler iteration
  636. sc_attr = {'scheduler_iteration': 7,
  637. 'scheduling': 'False'}
  638. self.server.manager(MGR_CMD_SET, SCHED, sc_attr, id='sc1')
  639. # Update scheduler config file
  640. sc_config = {'fair_share': 'True',
  641. 'fairshare_usage_res': 'ncpus*100'}
  642. self.scheds['sc1'].set_sched_config(sc_config)
  643. # submit jobs to multisched 'sc1'
  644. sc1_attr = {ATTR_queue: 'wq1',
  645. 'Resource_List.select': '1:ncpus=1',
  646. 'Resource_List.walltime': 10}
  647. sc1_J1 = Job(TEST_USER1, attrs=sc1_attr)
  648. sc1_jid1 = self.server.submit(sc1_J1)
  649. sc1_J2 = Job(TEST_USER2, attrs=sc1_attr)
  650. sc1_jid2 = self.server.submit(sc1_J2)
  651. sc1_J3 = Job(TEST_USER3, attrs=sc1_attr)
  652. sc1_jid3 = self.server.submit(sc1_J3)
  653. self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
  654. id='sc1')
  655. # pbsuser1 job will run and other two will be queued
  656. self.server.expect(JOB, {'job_state': 'R'}, id=sc1_jid1)
  657. self.server.expect(JOB, {'job_state': 'Q'}, id=sc1_jid3)
  658. self.server.expect(JOB, {'job_state': 'Q'}, id=sc1_jid2)
  659. self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
  660. id='sc1')
  661. # need to delete the running job because PBS has only 1 ncpu and
  662. # our work is also done with the job.
  663. # this step will decrease the execution time as well
  664. self.server.delete(sc1_jid1, wait=True)
  665. # pbsuser3 job will run after pbsuser1
  666. self.server.expect(JOB, {'job_state': 'R'}, id=sc1_jid3)
  667. self.server.expect(JOB, {'job_state': 'Q'}, id=sc1_jid2)
  668. self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
  669. id='sc1')
  670. # deleting the currently running job
  671. self.server.delete(sc1_jid3, wait=True)
  672. # pbsuser2 job will run in the end
  673. self.server.expect(JOB, {'job_state': 'R'}, id=sc1_jid2)
  674. self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
  675. id='sc1')
  676. # deleting the currently running job
  677. self.server.delete(sc1_jid2, wait=True)
  678. # query fairshare and check usage
  679. sc1_fs_user1 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER1))
  680. self.assertEquals(sc1_fs_user1.usage, 101)
  681. sc1_fs_user2 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER2))
  682. self.assertEquals(sc1_fs_user2.usage, 101)
  683. sc1_fs_user3 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER3))
  684. self.assertEquals(sc1_fs_user3.usage, 101)
  685. sc1_fs_user4 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER4))
  686. self.assertEquals(sc1_fs_user4.usage, 1)
  687. # Restart the scheduler
  688. self.scheds['sc1'].restart()
  689. # Check the multisched 'sc1' usage file whether it's updating or not
  690. self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'False'},
  691. id='sc1')
  692. sc1_J1 = Job(TEST_USER1, attrs=sc1_attr)
  693. sc1_jid1 = self.server.submit(sc1_J1)
  694. sc1_J2 = Job(TEST_USER2, attrs=sc1_attr)
  695. sc1_jid2 = self.server.submit(sc1_J2)
  696. sc1_J4 = Job(TEST_USER4, attrs=sc1_attr)
  697. sc1_jid4 = self.server.submit(sc1_J4)
  698. self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
  699. id='sc1')
  700. # pbsuser4 job will run and other two will be queued
  701. self.server.expect(JOB, {'job_state': 'R'}, id=sc1_jid4)
  702. self.server.expect(JOB, {'job_state': 'Q'}, id=sc1_jid1)
  703. self.server.expect(JOB, {'job_state': 'Q'}, id=sc1_jid2)
  704. self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
  705. id='sc1')
  706. # deleting the currently running job
  707. self.server.delete(sc1_jid4, wait=True)
  708. # pbsuser1 job will run after pbsuser4
  709. self.server.expect(JOB, {'job_state': 'R'}, id=sc1_jid1)
  710. self.server.expect(JOB, {'job_state': 'Q'}, id=sc1_jid2)
  711. self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
  712. id='sc1')
  713. # deleting the currently running job
  714. self.server.delete(sc1_jid1, wait=True)
  715. # pbsuser2 job will run in the end
  716. self.server.expect(JOB, {'job_state': 'R'}, id=sc1_jid2)
  717. self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
  718. id='sc1')
  719. # deleting the currently running job
  720. self.server.delete(sc1_jid2, wait=True)
  721. # query fairshare and check usage
  722. sc1_fs_user1 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER1))
  723. self.assertEquals(sc1_fs_user1.usage, 201)
  724. sc1_fs_user2 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER2))
  725. self.assertEquals(sc1_fs_user2.usage, 201)
  726. sc1_fs_user3 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER3))
  727. self.assertEquals(sc1_fs_user3.usage, 101)
  728. sc1_fs_user4 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER4))
  729. self.assertEquals(sc1_fs_user4.usage, 101)
  730. def test_sched_priv_change(self):
  731. """
  732. Test that when the sched_priv directory changes, all of the
  733. PTL internal scheduler objects (e.g. fairshare tree) are reread
  734. """
  735. new_sched_priv = os.path.join(self.server.pbs_conf['PBS_HOME'],
  736. 'sched_priv2')
  737. if os.path.exists(new_sched_priv):
  738. self.du.rm(path=new_sched_priv, recursive=True,
  739. sudo=True, force=True)
  740. dflt_sched_priv = os.path.join(self.server.pbs_conf['PBS_HOME'],
  741. 'sched_priv')
  742. self.du.run_copy(src=dflt_sched_priv, dest=new_sched_priv,
  743. recursive=True, sudo=True)
  744. self.setup_sc3()
  745. s = self.server.status(SCHED, id='sc3')
  746. old_sched_priv = s[0]['sched_priv']
  747. self.scheds['sc3'].add_to_resource_group(TEST_USER, 10, 'root', 20)
  748. self.scheds['sc3'].holidays_set_year(new_year="3000")
  749. self.scheds['sc3'].set_sched_config({'fair_share': 'True ALL'})
  750. self.server.manager(MGR_CMD_SET, SCHED,
  751. {'sched_priv': new_sched_priv}, id='sc3')
  752. n = self.scheds['sc3'].fairshare_tree.get_node(id=10)
  753. self.assertFalse(n)
  754. y = self.scheds['sc3'].holidays_get_year()
  755. self.assertNotEquals(y, "3000")
  756. self.assertTrue(self.scheds['sc3'].
  757. sched_config['fair_share'].startswith('false'))
  758. # clean up: revert_to_defaults() will remove the new sched_priv. We
  759. # need to remove the old one
  760. self.du.rm(path=old_sched_priv, sudo=True, recursive=True, force=True)
  761. def test_fairshare_decay(self):
  762. """
  763. Test pbsfs's fairshare decay for multisched
  764. """
  765. self.setup_sc3()
  766. self.scheds['sc3'].add_to_resource_group(TEST_USER, 10, 'root', 20)
  767. self.scheds['sc3'].set_fairshare_usage(name=TEST_USER, usage=10)
  768. self.scheds['sc3'].decay_fairshare_tree()
  769. n = self.scheds['sc3'].fairshare_tree.get_node(id=10)
  770. self.assertTrue(n.usage, 5)
  771. def test_cmp_fairshare(self):
  772. """
  773. Test pbsfs's compare fairshare functionality for multisched
  774. """
  775. self.setup_sc3()
  776. self.scheds['sc3'].add_to_resource_group(TEST_USER, 10, 'root', 20)
  777. self.scheds['sc3'].set_fairshare_usage(name=TEST_USER, usage=10)
  778. self.scheds['sc3'].add_to_resource_group(TEST_USER2, 20, 'root', 20)
  779. self.scheds['sc3'].set_fairshare_usage(name=TEST_USER2, usage=100)
  780. user = self.scheds['sc3'].cmp_fairshare_entities(TEST_USER, TEST_USER2)
  781. self.assertEquals(user, str(TEST_USER))
  782. def test_pbsfs_invalid_sched(self):
  783. """
  784. Test pbsfs -I <sched_name> where sched_name does not exist
  785. """
  786. sched_name = 'foo'
  787. pbsfs_cmd = os.path.join(self.server.pbs_conf['PBS_EXEC'],
  788. 'sbin', 'pbsfs') + ' -I ' + sched_name
  789. ret = self.du.run_cmd(cmd=pbsfs_cmd, sudo=True)
  790. err_msg = 'Scheduler %s does not exist' % sched_name
  791. self.assertEquals(err_msg, ret['err'][0])
  792. def test_pbsfs_no_fairshare_data(self):
  793. """
  794. Test pbsfs -I <sched_name> where sched_priv_<sched_name> dir
  795. does not exist
  796. """
  797. a = {'partition': 'P5',
  798. 'sched_host': self.server.hostname,
  799. 'sched_port': '15050'}
  800. self.server.manager(MGR_CMD_CREATE, SCHED, a, id="sc5")
  801. err_msg = 'Unable to access fairshare data: No such file or directory'
  802. try:
  803. # Only a scheduler object is created. Corresponding sched_priv
  804. # dir not created yet. Try to query fairshare data.
  805. self.scheds['sc5'].query_fairshare()
  806. except PbsFairshareError as e:
  807. self.assertTrue(err_msg in e.msg)
  808. def test_pbsfs_server_restart(self):
  809. """
  810. Verify that server restart has no impact on fairshare data
  811. """
  812. self.setup_sc1()
  813. self.scheds['sc1'].add_to_resource_group(TEST_USER, 20, 'root', 50)
  814. self.scheds['sc1'].set_fairshare_usage(name=TEST_USER, usage=25)
  815. n = self.scheds['sc1'].query_fairshare().get_node(name=str(TEST_USER))
  816. self.assertTrue(n.usage, 25)
  817. self.server.restart()
  818. n = self.scheds['sc1'].query_fairshare().get_node(name=str(TEST_USER))
  819. self.assertTrue(n.usage, 25)
  820. def test_pbsfs_revert_to_defaults(self):
  821. """
  822. Test if revert_to_defaults() works properly with multi scheds.
  823. revert_to_defaults() removes entities from resource_group file and
  824. removes their usage(with pbsfs -e)
  825. """
  826. self.setup_sc1()
  827. a = {'queue_type': 'execution',
  828. 'started': 'True',
  829. 'enabled': 'True',
  830. 'partition': 'P1'}
  831. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq1', expect=True)
  832. a = {'partition': 'P1', 'resources_available.ncpus': 2}
  833. self.server.manager(MGR_CMD_SET, NODE, a,
  834. id=self.mom.shortname, expect=True)
  835. self.scheds['sc1'].add_to_resource_group(TEST_USER,
  836. 11, 'root', 10)
  837. self.scheds['sc1'].add_to_resource_group(TEST_USER1,
  838. 12, 'root', 10)
  839. self.scheds['sc1'].set_sched_config({'fair_share': 'True'})
  840. self.scheds['sc1'].set_fairshare_usage(TEST_USER, 100)
  841. self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'False'},
  842. id='sc1')
  843. j1 = Job(TEST_USER, attrs={ATTR_queue: 'wq1'})
  844. jid1 = self.server.submit(j1)
  845. j2 = Job(TEST_USER1, attrs={ATTR_queue: 'wq1'})
  846. jid2 = self.server.submit(j2)
  847. t_start = int(time.time())
  848. self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
  849. id='sc1')
  850. self.scheds['sc1'].log_match(
  851. 'Leaving Scheduling Cycle', starttime=t_start)
  852. t_end = int(time.time())
  853. job_list = self.scheds['sc1'].log_match(
  854. 'Considering job to run', starttime=t_start,
  855. allmatch=True, endtime=t_end)
  856. # job 1 runs second as it's run by an entity with usage = 100
  857. self.assertTrue(jid1 in job_list[0][1])
  858. self.server.deljob(id=jid1, wait=True)
  859. self.server.deljob(id=jid2, wait=True)
  860. # revert_to_defaults() does a pbsfs -I <sched_name> -e and cleans up
  861. # the resource_group file
  862. self.scheds['sc1'].revert_to_defaults()
  863. # Fairshare tree is trimmed now. TEST_USER1 is the only entity with
  864. # usage set. So its job, job2 will run second. If trimming was not
  865. # successful TEST_USER would still have usage=100 and job1 would run
  866. # second
  867. self.scheds['sc1'].add_to_resource_group(TEST_USER,
  868. 15, 'root', 10)
  869. self.scheds['sc1'].add_to_resource_group(TEST_USER1,
  870. 16, 'root', 10)
  871. self.scheds['sc1'].set_sched_config({'fair_share': 'True'})
  872. self.scheds['sc1'].set_fairshare_usage(TEST_USER1, 50)
  873. self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'False'},
  874. id='sc1')
  875. j1 = Job(TEST_USER, attrs={ATTR_queue: 'wq1'})
  876. jid1 = self.server.submit(j1)
  877. j2 = Job(TEST_USER1, attrs={ATTR_queue: 'wq1'})
  878. jid2 = self.server.submit(j2)
  879. t_start = int(time.time())
  880. self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
  881. id='sc1')
  882. self.scheds['sc1'].log_match(
  883. 'Leaving Scheduling Cycle', starttime=t_start)
  884. t_end = int(time.time())
  885. job_list = self.scheds['sc1'].log_match(
  886. 'Considering job to run', starttime=t_start,
  887. allmatch=True, endtime=t_end)
  888. self.assertTrue(jid2 in job_list[0][1])
  889. def submit_jobs(self, num_jobs=1, attrs=None, user=TEST_USER):
  890. """
  891. Submit num_jobs number of jobs with attrs attributes for user.
  892. Return a list of job ids
  893. """
  894. if attrs is None:
  895. attrs = {'Resource_List.select': '1:ncpus=2'}
  896. ret_jids = []
  897. for _ in range(num_jobs):
  898. J = Job(user, attrs)
  899. jid = self.server.submit(J)
  900. ret_jids += [jid]
  901. return ret_jids
  902. def test_equiv_partition(self):
  903. """
  904. Test the basic behavior of job equivalence classes: submit two
  905. different types of jobs into 2 partitions and see they are
  906. in four different equivalence classes
  907. """
  908. self.setup_sc1()
  909. self.setup_queues_nodes()
  910. t = int(time.time())
  911. self.server.manager(MGR_CMD_SET, SCHED,
  912. {'scheduling': 'False'}, id="sc1")
  913. # Eat up all the resources with the first job to each queue
  914. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
  915. self.submit_jobs(4, a)
  916. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
  917. self.submit_jobs(4, a)
  918. a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'wq1'}
  919. self.submit_jobs(3, a)
  920. a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'wq4'}
  921. self.submit_jobs(3, a)
  922. self.server.manager(MGR_CMD_SET, SCHED,
  923. {'scheduling': 'True'}, id="sc1")
  924. self.scheds['sc1'].log_match("Number of job equivalence classes: 4",
  925. max_attempts=10, starttime=t)
  926. def test_equiv_multisched(self):
  927. """
  928. Test the basic behavior of job equivalence classes: submit two
  929. different types of jobs into 2 different schedulers and see they
  930. are in two different classes in each scheduler
  931. """
  932. self.setup_sc1()
  933. self.setup_sc2()
  934. self.setup_queues_nodes()
  935. self.scheds['sc2'].set_sched_config({'log_filter': 2048})
  936. t = int(time.time())
  937. self.server.manager(MGR_CMD_SET, SCHED,
  938. {'scheduling': 'False'}, id="sc1")
  939. self.server.manager(MGR_CMD_SET, SCHED,
  940. {'scheduling': 'False'}, id="sc2")
  941. # Eat up all the resources with the first job to each queue
  942. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
  943. self.submit_jobs(4, a)
  944. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq2'}
  945. self.submit_jobs(4, a)
  946. a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'wq1'}
  947. self.submit_jobs(3, a)
  948. a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'wq2'}
  949. self.submit_jobs(3, a)
  950. self.server.manager(MGR_CMD_SET, SCHED,
  951. {'scheduling': 'True'}, id="sc1")
  952. self.server.manager(MGR_CMD_SET, SCHED,
  953. {'scheduling': 'True'}, id="sc2")
  954. self.scheds['sc1'].log_match("Number of job equivalence classes: 2",
  955. max_attempts=10, starttime=t)
  956. self.scheds['sc2'].log_match("Number of job equivalence classes: 2",
  957. max_attempts=10, starttime=t)
  958. def test_select_partition(self):
  959. """
  960. Test to see if jobs with select resources not in the resources line
  961. fall into the same equivalence class and jobs in different partition
  962. fall into different equivalence classes
  963. """
  964. self.server.manager(MGR_CMD_CREATE, RSC,
  965. {'type': 'long', 'flag': 'nh'}, id='foo')
  966. self.setup_sc1()
  967. self.setup_queues_nodes()
  968. t = int(time.time())
  969. # Eat up all the resources
  970. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
  971. J = Job(TEST_USER, attrs=a)
  972. self.server.submit(J)
  973. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
  974. J = Job(TEST_USER, attrs=a)
  975. self.server.submit(J)
  976. a = {'Resource_List.select': '1:ncpus=1:foo=4', ATTR_queue: 'wq1'}
  977. self.submit_jobs(3, a)
  978. a = {'Resource_List.select': '1:ncpus=1:foo=4', ATTR_queue: 'wq4'}
  979. self.submit_jobs(3, a)
  980. a = {'Resource_List.select': '1:ncpus=1:foo=8', ATTR_queue: 'wq1'}
  981. self.submit_jobs(3, a)
  982. a = {'Resource_List.select': '1:ncpus=1:foo=8', ATTR_queue: 'wq4'}
  983. self.submit_jobs(3, a)
  984. self.server.manager(MGR_CMD_SET, SCHED,
  985. {'scheduling': 'True'}, id="sc1")
  986. # Four equivalence classes: two for the resource eating job in each
  987. # partition and two for the other jobs in each partition. While jobs
  988. # have different amount of the foo resources which isn't in the
  989. # resources line
  990. self.scheds['sc1'].log_match("Number of job equivalence classes: 4",
  991. max_attempts=10, starttime=t)
  992. def test_select_res_partition(self):
  993. """
  994. Test to see if jobs with select resources in the resources line and
  995. in different partitions fall into the different equivalence class
  996. """
  997. self.server.manager(MGR_CMD_CREATE, RSC,
  998. {'type': 'long', 'flag': 'nh'}, id='foo')
  999. self.setup_sc1()
  1000. self.setup_queues_nodes()
  1001. self.scheds['sc1'].add_resource("foo")
  1002. t = int(time.time())
  1003. # Eat up all the resources
  1004. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
  1005. J = Job(TEST_USER, attrs=a)
  1006. self.server.submit(J)
  1007. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
  1008. J = Job(TEST_USER, attrs=a)
  1009. self.server.submit(J)
  1010. a = {'Resource_List.select': '1:ncpus=1:foo=4', ATTR_queue: 'wq1'}
  1011. self.submit_jobs(3, a)
  1012. a = {'Resource_List.select': '1:ncpus=1:foo=4', ATTR_queue: 'wq4'}
  1013. self.submit_jobs(3, a)
  1014. a = {'Resource_List.select': '1:ncpus=1:foo=8', ATTR_queue: 'wq1'}
  1015. self.submit_jobs(3, a)
  1016. a = {'Resource_List.select': '1:ncpus=1:foo=8', ATTR_queue: 'wq4'}
  1017. self.submit_jobs(3, a)
  1018. self.server.manager(MGR_CMD_SET, SCHED,
  1019. {'scheduling': 'True'}, id="sc1")
  1020. # Six equivalence classes: two for the resource eating jobs in each
  1021. # partition and 4 for the other jobs requesting different amounts of
  1022. # the foo resource in each partition.
  1023. self.scheds['sc1'].log_match("Number of job equivalence classes: 6",
  1024. max_attempts=10, starttime=t)
  1025. def test_multiple_res_partition(self):
  1026. """
  1027. Test to see if jobs with select resources in the resources line
  1028. with multiple custom resources fall into the different equiv class
  1029. and jobs in different partitions fall into different equiv classes
  1030. """
  1031. self.server.manager(MGR_CMD_CREATE, RSC,
  1032. {'type': 'long', 'flag': 'nh'}, id='foo')
  1033. self.server.manager(MGR_CMD_CREATE, RSC,
  1034. {'type': 'string', 'flag': 'h'}, id='colour')
  1035. self.setup_sc1()
  1036. self.setup_queues_nodes()
  1037. self.scheds['sc1'].add_resource("foo")
  1038. self.scheds['sc1'].add_resource("colour")
  1039. t = int(time.time())
  1040. # Eat up all the resources
  1041. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
  1042. J = Job(TEST_USER, attrs=a)
  1043. self.server.submit(J)
  1044. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
  1045. J = Job(TEST_USER, attrs=a)
  1046. self.server.submit(J)
  1047. a = {'Resource_List.select': '1:ncpus=1:foo=4', ATTR_queue: 'wq1'}
  1048. self.submit_jobs(3, a)
  1049. a = {'Resource_List.select': '1:ncpus=1:foo=4', ATTR_queue: 'wq4'}
  1050. self.submit_jobs(3, a)
  1051. a = {'Resource_List.select': '1:ncpus=1:colour=blue',
  1052. ATTR_queue: 'wq1'}
  1053. self.submit_jobs(3, a)
  1054. a = {'Resource_List.select': '1:ncpus=1:colour=blue',
  1055. ATTR_queue: 'wq4'}
  1056. self.submit_jobs(3, a)
  1057. self.server.manager(MGR_CMD_SET, SCHED,
  1058. {'scheduling': 'True'}, id="sc1")
  1059. # Six equivalence classes: two for the resource eating job in each
  1060. # partition and four for the other jobs. While jobs have different
  1061. # resource requests two for each resource in different partitions
  1062. self.scheds['sc1'].log_match("Number of job equivalence classes: 6",
  1063. max_attempts=10, starttime=t)
  1064. def test_place_partition(self):
  1065. """
  1066. Test to see if jobs with different place statements and different
  1067. partitions fall into the different equivalence classes
  1068. """
  1069. self.setup_sc1()
  1070. self.setup_queues_nodes()
  1071. t = int(time.time())
  1072. # Eat up all the resources
  1073. a = {'Resource_List.select': '1:ncpus=2',
  1074. ATTR_queue: 'wq1'}
  1075. J = Job(TEST_USER, attrs=a)
  1076. self.server.submit(J)
  1077. a = {'Resource_List.select': '1:ncpus=2',
  1078. ATTR_queue: 'wq4'}
  1079. J = Job(TEST_USER, attrs=a)
  1080. self.server.submit(J)
  1081. a = {'Resource_List.select': '1:ncpus=1',
  1082. 'Resource_List.place': 'free',
  1083. ATTR_queue: 'wq1'}
  1084. self.submit_jobs(3, a)
  1085. a = {'Resource_List.select': '1:ncpus=1',
  1086. 'Resource_List.place': 'free',
  1087. ATTR_queue: 'wq4'}
  1088. self.submit_jobs(3, a)
  1089. a = {'Resource_List.select': '1:ncpus=1',
  1090. 'Resource_List.place': 'excl',
  1091. ATTR_queue: 'wq1'}
  1092. self.submit_jobs(3, a)
  1093. a = {'Resource_List.select': '1:ncpus=1',
  1094. 'Resource_List.place': 'excl',
  1095. ATTR_queue: 'wq4'}
  1096. self.submit_jobs(3, a)
  1097. self.server.manager(MGR_CMD_SET, SCHED,
  1098. {'scheduling': 'True'}, id="sc1")
  1099. # Six equivalence classes: two for the resource eating job in
  1100. # each partition and one for each place statement in each partition
  1101. self.scheds['sc1'].log_match("Number of job equivalence classes: 6",
  1102. max_attempts=10, starttime=t)
  1103. def test_nolimits_partition(self):
  1104. """
  1105. Test to see that jobs from different users, groups, and projects
  1106. all fall into the same equivalence class when there are no limits
  1107. but fall into different equivalence classes for each partition
  1108. """
  1109. self.setup_sc1()
  1110. self.setup_queues_nodes()
  1111. t = int(time.time())
  1112. # Eat up all the resources
  1113. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
  1114. J = Job(TEST_USER, attrs=a)
  1115. self.server.submit(J)
  1116. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
  1117. J = Job(TEST_USER, attrs=a)
  1118. self.server.submit(J)
  1119. a = {ATTR_queue: 'wq1'}
  1120. self.submit_jobs(3, a, user=TEST_USER)
  1121. self.submit_jobs(3, a, user=TEST_USER2)
  1122. a = {ATTR_queue: 'wq4'}
  1123. self.submit_jobs(3, a, user=TEST_USER)
  1124. self.submit_jobs(3, a, user=TEST_USER2)
  1125. b = {'group_list': TSTGRP1, ATTR_queue: 'wq1'}
  1126. self.submit_jobs(3, b, TEST_USER1)
  1127. b = {'group_list': TSTGRP2, ATTR_queue: 'wq1'}
  1128. self.submit_jobs(3, b, TEST_USER1)
  1129. b = {'group_list': TSTGRP1, ATTR_queue: 'wq4'}
  1130. self.submit_jobs(3, b, TEST_USER1)
  1131. b = {'group_list': TSTGRP2, ATTR_queue: 'wq4'}
  1132. self.submit_jobs(3, b, TEST_USER1)
  1133. b = {'project': 'p1', ATTR_queue: 'wq1'}
  1134. self.submit_jobs(3, b)
  1135. b = {'project': 'p2', ATTR_queue: 'wq1'}
  1136. self.submit_jobs(3, b)
  1137. b = {'project': 'p1', ATTR_queue: 'wq4'}
  1138. self.submit_jobs(3, b)
  1139. b = {'project': 'p2', ATTR_queue: 'wq4'}
  1140. self.submit_jobs(3, b)
  1141. self.server.manager(MGR_CMD_SET, SCHED,
  1142. {'scheduling': 'True'}, id="sc1")
  1143. # Four equivalence classes: two for the resource eating job in each
  1144. # partition and two for the rest. Since there are no limits, user,
  1145. # group, nor project are taken into account
  1146. self.scheds['sc1'].log_match("Number of job equivalence classes: 4",
  1147. max_attempts=10, starttime=t)
  1148. def test_limits_partition(self):
  1149. """
  1150. Test to see that jobs from different users fall into different
  1151. equivalence classes with queue hard limits and partitions
  1152. """
  1153. self.setup_sc1()
  1154. self.setup_queues_nodes()
  1155. t = int(time.time())
  1156. self.server.manager(MGR_CMD_SET, SCHED,
  1157. {'scheduling': 'False'}, id="sc1")
  1158. self.server.manager(MGR_CMD_SET, QUEUE,
  1159. {'max_run': '[u:PBS_GENERIC=1]'}, id='wq1')
  1160. self.server.manager(MGR_CMD_SET, QUEUE,
  1161. {'max_run': '[u:PBS_GENERIC=1]'}, id='wq4')
  1162. # Eat up all the resources
  1163. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
  1164. J = Job(TEST_USER, attrs=a)
  1165. self.server.submit(J)
  1166. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
  1167. J = Job(TEST_USER, attrs=a)
  1168. self.server.submit(J)
  1169. a = {ATTR_queue: 'wq1'}
  1170. self.submit_jobs(3, a, user=TEST_USER1)
  1171. self.submit_jobs(3, a, user=TEST_USER2)
  1172. a = {ATTR_queue: 'wq4'}
  1173. self.submit_jobs(3, a, user=TEST_USER1)
  1174. self.submit_jobs(3, a, user=TEST_USER2)
  1175. self.server.manager(MGR_CMD_SET, SCHED,
  1176. {'scheduling': 'True'}, id="sc1")
  1177. # Six equivalence classes. Two for the resource eating job in
  1178. # different partitions and one for each user per partition.
  1179. self.scheds['sc1'].log_match("Number of job equivalence classes: 6",
  1180. max_attempts=10, starttime=t)
  1181. def test_job_array_partition(self):
  1182. """
  1183. Test that various job types will fall into single equivalence
  1184. class with same type of request and will only fall into different
  1185. equivalence class if partition is different
  1186. """
  1187. self.setup_sc1()
  1188. self.setup_queues_nodes()
  1189. t = int(time.time())
  1190. # Eat up all the resources
  1191. a = {'Resource_List.select': '1:ncpus=2', 'queue': 'wq1'}
  1192. J = Job(TEST_USER1, attrs=a)
  1193. self.server.submit(J)
  1194. a = {'Resource_List.select': '1:ncpus=2', 'queue': 'wq4'}
  1195. J = Job(TEST_USER1, attrs=a)
  1196. self.server.submit(J)
  1197. # Submit a job array
  1198. j = Job(TEST_USER)
  1199. j.set_attributes(
  1200. {ATTR_J: '1-3:1',
  1201. 'Resource_List.select': '1:ncpus=2',
  1202. 'queue': 'wq1'})
  1203. self.server.submit(j)
  1204. j.set_attributes(
  1205. {ATTR_J: '1-3:1',
  1206. 'Resource_List.select': '1:ncpus=2',
  1207. 'queue': 'wq4'})
  1208. self.server.manager(MGR_CMD_SET, SCHED,
  1209. {'scheduling': 'True'}, id="sc1")
  1210. # Two equivalence class one for each partition
  1211. self.scheds['sc1'].log_match("Number of job equivalence classes: 2",
  1212. max_attempts=10, starttime=t)
  1213. def test_equiv_suspend_jobs(self):
  1214. """
  1215. Test that jobs fall into different equivalence classes
  1216. after they get suspended
  1217. """
  1218. self.setup_sc1()
  1219. self.setup_queues_nodes()
  1220. t = int(time.time())
  1221. self.server.manager(MGR_CMD_SET, SCHED,
  1222. {'scheduling': 'False'}, id="sc1")
  1223. # Eat up all the resources
  1224. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
  1225. J = Job(TEST_USER, attrs=a)
  1226. jid1 = self.server.submit(J)
  1227. self.server.submit(J)
  1228. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
  1229. J = Job(TEST_USER, attrs=a)
  1230. jid3 = self.server.submit(J)
  1231. self.server.submit(J)
  1232. self.server.manager(MGR_CMD_SET, SCHED,
  1233. {'scheduling': 'True'}, id="sc1")
  1234. # 2 equivalence classes one for each partition
  1235. self.scheds['sc1'].log_match("Number of job equivalence classes: 2",
  1236. max_attempts=10, starttime=t)
  1237. t = int(time.time())
  1238. # Make sure that Job is in R state before issuing a signal to suspend
  1239. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  1240. self.server.sigjob(jobid=jid1, signal="suspend")
  1241. self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
  1242. self.server.sigjob(jobid=jid3, signal="suspend")
  1243. self.server.manager(MGR_CMD_SET, SCHED,
  1244. {'scheduling': 'True'}, id="sc1")
  1245. # 4 equivalance classes 2 for partition 2 for suspended jobs
  1246. self.scheds['sc1'].log_match("Number of job equivalence classes: 4",
  1247. max_attempts=10, starttime=t)
  1248. def test_equiv_single_partition(self):
  1249. """
  1250. Test that jobs fall into same equivalence class if jobs fall
  1251. into queues set to same partition
  1252. """
  1253. self.setup_sc1()
  1254. self.setup_queues_nodes()
  1255. t = int(time.time())
  1256. self.server.manager(MGR_CMD_SET, SCHED,
  1257. {'scheduling': 'False'}, id="sc1")
  1258. self.server.manager(MGR_CMD_SET, QUEUE,
  1259. {'partition': 'P1'}, id='wq4')
  1260. # Eat up all the resources with the first job to wq1
  1261. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
  1262. self.submit_jobs(4, a)
  1263. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
  1264. self.submit_jobs(3, a)
  1265. a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'wq1'}
  1266. self.submit_jobs(3, a)
  1267. a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'wq4'}
  1268. self.submit_jobs(3, a)
  1269. self.server.manager(MGR_CMD_SET, SCHED,
  1270. {'scheduling': 'True'}, id="sc1")
  1271. # 2 equivalence classes one for each with different ncpus request
  1272. # as both queues are having same partition
  1273. self.scheds['sc1'].log_match("Number of job equivalence classes: 2",
  1274. max_attempts=10, starttime=t)
  1275. def test_list_multi_sched(self):
  1276. """
  1277. Test to verify that qmgr list sched works when multiple
  1278. schedulers are present
  1279. """
  1280. self.setup_sc1()
  1281. self.setup_sc2()
  1282. self.setup_sc3()
  1283. self.server.manager(MGR_CMD_LIST, SCHED)
  1284. self.server.manager(MGR_CMD_LIST, SCHED, id="default")
  1285. self.server.manager(MGR_CMD_LIST, SCHED, id="sc1")
  1286. dir_path = os.path.join(os.sep, 'var', 'spool', 'pbs', 'sched_dir')
  1287. a = {'partition': 'P2',
  1288. 'sched_priv': os.path.join(dir_path, 'sched_priv_sc2'),
  1289. 'sched_log': os.path.join(dir_path, 'sched_logs_sc2'),
  1290. 'sched_host': self.server.hostname,
  1291. 'sched_port': '15051'}
  1292. self.server.manager(MGR_CMD_LIST, SCHED, a, id="sc2", expect=True)
  1293. self.server.manager(MGR_CMD_LIST, SCHED, id="sc3")
  1294. try:
  1295. self.server.manager(MGR_CMD_LIST, SCHED, id="invalid_scname")
  1296. except PbsManagerError as e:
  1297. err_msg = "Unknown Scheduler"
  1298. self.assertTrue(err_msg in e.msg[0],
  1299. "Error message is not expected")
  1300. # delete sc3 sched
  1301. self.server.manager(MGR_CMD_DELETE, SCHED, id="sc3", sudo=True)
  1302. try:
  1303. self.server.manager(MGR_CMD_LIST, SCHED, id="sc3")
  1304. except PbsManagerError as e:
  1305. err_msg = "Unknown Scheduler"
  1306. self.assertTrue(err_msg in e.msg[0],
  1307. "Error message is not expected")
  1308. self.server.manager(MGR_CMD_LIST, SCHED)
  1309. self.server.manager(MGR_CMD_LIST, SCHED, id="default")
  1310. self.server.manager(MGR_CMD_LIST, SCHED, id="sc1")
  1311. self.server.manager(MGR_CMD_LIST, SCHED, id="sc2")
  1312. # delete sc1 sched
  1313. self.server.manager(MGR_CMD_DELETE, SCHED, id="sc1")
  1314. try:
  1315. self.server.manager(MGR_CMD_LIST, SCHED, id="sc1")
  1316. except PbsManagerError as e:
  1317. err_msg = "Unknown Scheduler"
  1318. self.assertTrue(err_msg in e.msg[0],
  1319. "Error message is not expected")
  1320. def test_job_sort_formula_threshold(self):
  1321. """
  1322. Test the scheduler attribute job_sort_formula_threshold for multisched
  1323. """
  1324. # Multisched setup
  1325. self.setup_sc3()
  1326. p3 = {'partition': 'P3'}
  1327. a = {'queue_type': 'execution',
  1328. 'started': 'True',
  1329. 'enabled': 'True'}
  1330. a.update(p3)
  1331. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq1', expect=True)
  1332. a = {'resources_available.ncpus': 2}
  1333. self.server.create_vnodes('vnode', a, 2, self.mom)
  1334. self.server.manager(MGR_CMD_SET, NODE, p3, id='vnode[0]', expect=True)
  1335. # Set job_sort_formula on the server
  1336. self.server.manager(MGR_CMD_SET, SERVER, {'job_sort_formula': 'ncpus'})
  1337. # Set job_sort_formula_threshold on the multisched
  1338. self.server.manager(MGR_CMD_SET, SCHED,
  1339. {'job_sort_formula_threshold': '2'},
  1340. id="sc3", expect=True)
  1341. # Submit job to multisched
  1342. j1_attrs = {ATTR_queue: 'wq1', 'Resource_List.ncpus': '1'}
  1343. J1 = Job(TEST_USER, j1_attrs)
  1344. jid_1 = self.server.submit(J1)
  1345. # Submit job to default scheduler
  1346. J2 = Job(TEST_USER, attrs={'Resource_List.ncpus': '1'})
  1347. jid_2 = self.server.submit(J2)
  1348. msg = {'job_state': 'Q',
  1349. 'comment': ('Not Running: Job is ' +
  1350. 'under job_sort_formula threshold value')}
  1351. self.server.expect(JOB, msg, id=jid_1)
  1352. self.server.expect(JOB, {'job_state': 'R'}, id=jid_2)
  1353. @staticmethod
  1354. def cust_attr(name, totnodes, numnode, attrib):
  1355. a = {}
  1356. if numnode in range(0, 3):
  1357. a['resources_available.switch'] = 'A'
  1358. if numnode in range(3, 5):
  1359. a['resources_available.switch'] = 'B'
  1360. if numnode in range(6, 9):
  1361. a['resources_available.switch'] = 'A'
  1362. a['partition'] = 'P2'
  1363. if numnode in range(9, 11):
  1364. a['resources_available.switch'] = 'B'
  1365. a['partition'] = 'P2'
  1366. if numnode is 11:
  1367. a['partition'] = 'P2'
  1368. return dict(attrib.items() + a.items())
  1369. def setup_placement_set(self):
  1370. self.server.add_resource('switch', 'string_array', 'h')
  1371. a = {'resources_available.ncpus': 2}
  1372. self.server.create_vnodes(
  1373. 'vnode', a, 12, self.mom, attrfunc=self.cust_attr)
  1374. self.server.manager(MGR_CMD_SET, SERVER, {'node_group_key': 'switch'})
  1375. self.server.manager(MGR_CMD_SET, SERVER, {'node_group_enable': 't'})
  1376. def test_multi_sched_explicit_ps(self):
  1377. """
  1378. Test only_explicit_ps set to sched attr will be in affect
  1379. and will not read from default scheduler
  1380. """
  1381. self.setup_placement_set()
  1382. self.setup_sc2()
  1383. a = {'queue_type': 'execution',
  1384. 'started': 'True',
  1385. 'enabled': 'True',
  1386. 'partition': 'P2'}
  1387. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq2')
  1388. a = {'Resource_List.select': '1:ncpus=2'}
  1389. j = Job(TEST_USER, attrs=a)
  1390. j1id = self.server.submit(j)
  1391. self.server.expect(JOB, {'job_state': 'R'}, id=j1id)
  1392. nodes = ['vnode[5]']
  1393. self.check_vnodes(j, nodes, j1id)
  1394. a = {'Resource_List.select': '2:ncpus=2'}
  1395. j = Job(TEST_USER, attrs=a)
  1396. j2id = self.server.submit(j)
  1397. self.server.expect(JOB, {'job_state': 'R'}, id=j2id)
  1398. nodes = ['vnode[3]', 'vnode[4]']
  1399. self.check_vnodes(j, nodes, j2id)
  1400. a = {'Resource_List.select': '3:ncpus=2'}
  1401. j = Job(TEST_USER, attrs=a)
  1402. j3id = self.server.submit(j)
  1403. self.server.expect(JOB, {'job_state': 'R'}, id=j3id)
  1404. nodes = ['vnode[0]', 'vnode[1]', 'vnode[2]']
  1405. self.check_vnodes(j, nodes, j3id)
  1406. self.server.manager(MGR_CMD_SET, SCHED,
  1407. {'only_explicit_psets': 't'}, id='sc2')
  1408. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq2'}
  1409. j = Job(TEST_USER, attrs=a)
  1410. j4id = self.server.submit(j)
  1411. self.server.expect(JOB, {'job_state': 'R'}, id=j4id)
  1412. nodes = ['vnode[9]']
  1413. self.check_vnodes(j, nodes, j4id)
  1414. a = {'Resource_List.select': '2:ncpus=2', ATTR_queue: 'wq2'}
  1415. j = Job(TEST_USER, attrs=a)
  1416. j5id = self.server.submit(j)
  1417. self.server.expect(JOB, {'job_state': 'R'}, id=j5id)
  1418. nodes = ['vnode[6]', 'vnode[7]']
  1419. self.check_vnodes(j, nodes, j5id)
  1420. a = {'Resource_List.select': '3:ncpus=2', ATTR_queue: 'wq2'}
  1421. j = Job(TEST_USER, attrs=a)
  1422. j6id = self.server.submit(j)
  1423. self.server.expect(JOB, {
  1424. 'job_state': 'Q',
  1425. 'comment': 'Not Running: Placement set switch=A'
  1426. ' has too few free resources'}, id=j6id)
  1427. def test_jobs_do_not_span_ps(self):
  1428. """
  1429. Test do_not_span_psets set to sched attr will be in affect
  1430. and will not read from default scheduler
  1431. """
  1432. self.setup_placement_set()
  1433. self.setup_sc2()
  1434. a = {'queue_type': 'execution',
  1435. 'started': 'True',
  1436. 'enabled': 'True',
  1437. 'partition': 'P2'}
  1438. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq2')
  1439. # Scheduler sc2 cannot span across placement sets
  1440. self.server.manager(MGR_CMD_SET, SCHED, {
  1441. 'do_not_span_psets': 't'}, id='sc2')
  1442. self.server.manager(MGR_CMD_SET, SCHED, {
  1443. 'scheduling': 't'}, id='sc2')
  1444. a = {'Resource_List.select': '4:ncpus=2', ATTR_queue: 'wq2'}
  1445. j = Job(TEST_USER, attrs=a)
  1446. j1id = self.server.submit(j)
  1447. self.server.expect(
  1448. JOB, {'job_state': 'Q', 'comment': 'Can Never Run: can\'t fit in '
  1449. 'the largest placement set, and can\'t span psets'}, id=j1id)
  1450. # Default scheduler can span as do_not_span_psets is not set
  1451. a = {'Resource_List.select': '4:ncpus=2'}
  1452. j = Job(TEST_USER, attrs=a)
  1453. j2id = self.server.submit(j)
  1454. self.server.expect(JOB, {'job_state': 'R'}, id=j2id)
  1455. def test_sched_preempt_enforce_resumption(self):
  1456. """
  1457. Test sched_preempt_enforce_resumption can be set to a multi sched
  1458. and that even if topjob_ineligible is set for a preempted job
  1459. and sched_preempt_enforce_resumption is set true , the
  1460. preempted job will be calandered
  1461. """
  1462. self.setup_sc1()
  1463. self.setup_queues_nodes()
  1464. prio = {'Priority': 150, 'partition': 'P1'}
  1465. self.server.manager(MGR_CMD_SET, QUEUE, prio, id='wq4')
  1466. self.server.manager(MGR_CMD_SET, SCHED,
  1467. {'sched_preempt_enforce_resumption': 'true'},
  1468. id='sc1')
  1469. self.server.manager(MGR_CMD_SET, SERVER, {'backfill_depth': '2'})
  1470. # Submit a job
  1471. j = Job(TEST_USER, {'Resource_List.walltime': '120',
  1472. 'Resource_List.ncpus': '2'})
  1473. jid1 = self.server.submit(j)
  1474. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  1475. j = Job(TEST_USER, {'Resource_List.walltime': '120',
  1476. 'Resource_List.ncpus': '2',
  1477. ATTR_queue: 'wq1'})
  1478. jid2 = self.server.submit(j)
  1479. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  1480. self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
  1481. # Alter topjob_ineligible for running job
  1482. self.server.alterjob(jid1, {ATTR_W: "topjob_ineligible = true"},
  1483. runas=ROOT_USER, logerr=True)
  1484. self.server.alterjob(jid1, {ATTR_W: "topjob_ineligible = true"},
  1485. runas=ROOT_USER, logerr=True)
  1486. # Create a high priority queue
  1487. a = {'queue_type': 'e', 'started': 't',
  1488. 'enabled': 't', 'Priority': '150'}
  1489. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id="highp")
  1490. # Submit 2 jobs to high priority queue
  1491. j = Job(TEST_USER, {'queue': 'highp', 'Resource_List.walltime': '60',
  1492. 'Resource_List.ncpus': '2'})
  1493. jid3 = self.server.submit(j)
  1494. j = Job(TEST_USER, {'queue': 'wq4', 'Resource_List.walltime': '60',
  1495. 'Resource_List.ncpus': '2'})
  1496. jid4 = self.server.submit(j)
  1497. self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
  1498. self.server.expect(JOB, {'job_state': 'R'}, id=jid4)
  1499. # Verify that job1 is not calendered
  1500. self.server.expect(JOB, 'estimated.start_time',
  1501. op=UNSET, id=jid1)
  1502. # Verify that job2 is calendared
  1503. self.server.expect(JOB, 'estimated.start_time',
  1504. op=SET, id=jid2)
  1505. qstat = self.server.status(JOB, 'estimated.start_time',
  1506. id=jid2)
  1507. est_time = qstat[0]['estimated.start_time']
  1508. self.assertNotEqual(est_time, None)
  1509. self.scheds['sc1'].log_match(jid2 + ";Job is a top job",
  1510. starttime=self.server.ctime)
  1511. def set_primetime(self, ptime_start, ptime_end, scid='default'):
  1512. """
  1513. This function will set the prime time
  1514. in holidays file
  1515. """
  1516. p_day = 'weekday'
  1517. p_hhmm = time.strftime('%H%M', time.localtime(ptime_start))
  1518. np_hhmm = time.strftime('%H%M', time.localtime(ptime_end))
  1519. self.scheds[scid].holidays_set_day(p_day, p_hhmm, np_hhmm)
  1520. p_day = 'saturday'
  1521. self.scheds[scid].holidays_set_day(p_day, p_hhmm, np_hhmm)
  1522. p_day = 'sunday'
  1523. self.scheds[scid].holidays_set_day(p_day, p_hhmm, np_hhmm)
  1524. def test_prime_time_backfill(self):
  1525. """
  1526. Test opt_backfill_fuzzy can be set to a multi sched and
  1527. while calandering primetime/nonprimetime will be considered
  1528. """
  1529. self.setup_sc2()
  1530. self.setup_queues_nodes()
  1531. a = {'strict_ordering': "True ALL"}
  1532. self.scheds['sc2'].set_sched_config(a)
  1533. # set primetime which will start after 30min
  1534. prime_start = int(time.time()) + 1800
  1535. prime_end = int(time.time()) + 3600
  1536. self.set_primetime(prime_start, prime_end, scid='sc2')
  1537. self.server.manager(MGR_CMD_SET, SCHED,
  1538. {'opt_backfill_fuzzy': 'high'}, id='sc2')
  1539. self.server.manager(MGR_CMD_SET, SERVER, {'backfill_depth': '2'})
  1540. # Submit a job
  1541. j = Job(TEST_USER, {'Resource_List.walltime': '60',
  1542. 'Resource_List.ncpus': '2',
  1543. ATTR_queue: 'wq2'})
  1544. jid1 = self.server.submit(j)
  1545. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  1546. j = Job(TEST_USER1, {'Resource_List.ncpus': '2',
  1547. ATTR_queue: 'wq2'})
  1548. jid2 = self.server.submit(j)
  1549. self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
  1550. # Verify that job2 is calendared to start at primetime start
  1551. self.server.expect(JOB, 'estimated.start_time',
  1552. op=SET, id=jid2)
  1553. qstat = self.server.status(JOB, 'estimated.start_time',
  1554. id=jid2)
  1555. est_time = qstat[0]['estimated.start_time']
  1556. est_epoch = est_time
  1557. if self.server.get_op_mode() == PTL_CLI:
  1558. est_epoch = int(time.mktime(time.strptime(est_time, '%c')))
  1559. prime_mod = prime_start % 60 # ignoring the seconds
  1560. self.assertEqual((prime_start - prime_mod), est_epoch)
  1561. def test_prime_time_multisched(self):
  1562. """
  1563. Test prime time queue can be set partition and multi sched
  1564. considers prime time queue for jobs submitted to the p_queue
  1565. """
  1566. self.setup_sc2()
  1567. self.setup_queues_nodes()
  1568. # set primetime which will start after 30min
  1569. prime_start = int(time.time()) + 1800
  1570. prime_end = int(time.time()) + 3600
  1571. self.set_primetime(prime_start, prime_end, scid='sc2')
  1572. a = {'queue_type': 'e', 'started': 't',
  1573. 'enabled': 't', 'partition': 'P2'}
  1574. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id="p_queue")
  1575. j = Job(TEST_USER1, {'Resource_List.ncpus': '1',
  1576. ATTR_queue: 'wq2'})
  1577. jid1 = self.server.submit(j)
  1578. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  1579. j = Job(TEST_USER1, {'Resource_List.ncpus': '1',
  1580. ATTR_queue: 'p_queue'})
  1581. jid2 = self.server.submit(j)
  1582. self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
  1583. msg = 'Job will run in primetime only'
  1584. self.server.expect(JOB, {ATTR_comment: "Not Running: " + msg}, id=jid2)
  1585. self.scheds['sc2'].log_match(jid2 + ";Job only runs in primetime",
  1586. starttime=self.server.ctime)
  1587. def test_dedicated_time_multisched(self):
  1588. """
  1589. Test dedicated time queue can be set partition and multi sched
  1590. considers dedicated time for jobs submitted to the ded_queue
  1591. """
  1592. self.setup_sc2()
  1593. self.setup_queues_nodes()
  1594. # Create a dedicated time queue
  1595. ded_start = int(time.time()) + 1800
  1596. ded_end = int(time.time()) + 3600
  1597. self.scheds['sc2'].add_dedicated_time(start=ded_start, end=ded_end)
  1598. a = {'queue_type': 'e', 'started': 't',
  1599. 'enabled': 't', 'partition': 'P2'}
  1600. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id="ded_queue")
  1601. j = Job(TEST_USER1, {'Resource_List.ncpus': '1',
  1602. ATTR_queue: 'wq2'})
  1603. jid1 = self.server.submit(j)
  1604. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  1605. j = Job(TEST_USER1, {'Resource_List.ncpus': '1',
  1606. ATTR_queue: 'ded_queue'})
  1607. jid2 = self.server.submit(j)
  1608. self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
  1609. msg = 'Dedicated time conflict'
  1610. self.server.expect(JOB, {ATTR_comment: "Not Running: " + msg}, id=jid2)
  1611. self.scheds['sc2'].log_match(jid2 + ";Dedicated Time",
  1612. starttime=self.server.ctime)
  1613. def test_auto_sched_off_due_to_fds_limit(self):
  1614. """
  1615. Test to make sure scheduling should be turned off automatically
  1616. when number of open files per process are exhausted
  1617. """
  1618. if os.getuid() != 0 or sys.platform in ('cygwin', 'win32'):
  1619. self.skipTest("Test need to run as root")
  1620. try:
  1621. # get the number of open files per process
  1622. (open_files_soft_limit, open_files_hard_limit) =\
  1623. resource.getrlimit(resource.RLIMIT_NOFILE)
  1624. # set the soft limit of number of open files per process to 10
  1625. resource.setrlimit(resource.RLIMIT_NOFILE,
  1626. (10, open_files_hard_limit))
  1627. except (ValueError, resource.error):
  1628. self.assertFalse(True, "Error in accessing system RLIMIT_ "
  1629. "variables, test fails.")
  1630. self.setup_sc3()
  1631. self.server.manager(MGR_CMD_SET, SCHED, {'scheduler_iteration': 1},
  1632. id="sc3", expect=True)
  1633. self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
  1634. id="sc3", expect=True)
  1635. self.logger.info('The sleep is 15 seconds which will trigger required '
  1636. 'number of scheduling cycles that are needed to '
  1637. 'exhaust open files per process which is 10 in our '
  1638. 'case')
  1639. time.sleep(15)
  1640. # scheduling should not go to false once all fds per process
  1641. # are exhausted.
  1642. self.server.expect(SCHED, {'scheduling': 'True'},
  1643. id='sc3', max_attempts=10)
  1644. try:
  1645. resource.setrlimit(resource.RLIMIT_NOFILE, (open_files_soft_limit,
  1646. open_files_hard_limit))
  1647. except (ValueError, resource.error):
  1648. self.assertFalse(True, "Error in accessing system RLIMIT_ "
  1649. "variables, test fails.")
  1650. def test_set_msched_attr_sched_log_with_sched_off(self):
  1651. """
  1652. Test to set Multisched attributes even when its scheduling is off
  1653. and check whether they are actually be effective
  1654. """
  1655. self.setup_sc3()
  1656. self.scheds['sc3'].set_sched_config({'log_filter': 2048})
  1657. self.server.manager(MGR_CMD_SET, SCHED,
  1658. {'scheduling': 'False'}, id="sc3")
  1659. new_sched_log = os.path.join(self.server.pbs_conf['PBS_HOME'],
  1660. 'sc3_new_logs')
  1661. if os.path.exists(new_sched_log):
  1662. self.du.rm(path=new_sched_log, recursive=True,
  1663. sudo=True, force=True)
  1664. self.du.mkdir(path=new_sched_log, sudo=True)
  1665. self.server.manager(MGR_CMD_SET, SCHED,
  1666. {'sched_log': new_sched_log},
  1667. id="sc3", expect=True)
  1668. a = {'sched_log': new_sched_log}
  1669. self.server.expect(SCHED, a, id='sc3', max_attempts=10)
  1670. # This is required since we need to call log_match only when
  1671. # the new log file is created.
  1672. time.sleep(1)
  1673. self.scheds['sc3'].log_match(
  1674. "scheduler log directory is changed to " + new_sched_log,
  1675. max_attempts=10, starttime=self.server.ctime)
  1676. def test_set_msched_attr_sched_priv_with_sched_off(self):
  1677. """
  1678. Test to set Multisched attributes even when its scheduling is off
  1679. and check whether they are actually be effective
  1680. """
  1681. self.setup_sc3()
  1682. self.scheds['sc3'].set_sched_config({'log_filter': 2048})
  1683. self.server.manager(MGR_CMD_SET, SCHED,
  1684. {'scheduling': 'False'}, id="sc3")
  1685. # create and set-up a new priv directory for sc3
  1686. new_sched_priv = os.path.join(self.server.pbs_conf['PBS_HOME'],
  1687. 'sched_priv_new')
  1688. if os.path.exists(new_sched_priv):
  1689. self.du.rm(path=new_sched_priv, recursive=True,
  1690. sudo=True, force=True)
  1691. dflt_sched_priv = os.path.join(self.server.pbs_conf['PBS_HOME'],
  1692. 'sched_priv')
  1693. self.du.run_copy(src=dflt_sched_priv, dest=new_sched_priv,
  1694. recursive=True, sudo=True)
  1695. self.server.manager(MGR_CMD_SET, SCHED, {'sched_priv': new_sched_priv},
  1696. id="sc3", expect=True)
  1697. a = {'sched_priv': new_sched_priv}
  1698. self.server.expect(SCHED, a, id='sc3', max_attempts=10)
  1699. # This is required since we need to call log_match only when
  1700. # the new log file is created.
  1701. time.sleep(1)
  1702. self.scheds['sc3'].log_match(
  1703. "scheduler priv directory has changed to " + new_sched_priv,
  1704. max_attempts=10, starttime=self.server.ctime)
  1705. def test_set_msched_update_inbuilt_attrs_accrue_type(self):
  1706. """
  1707. Test to make sure Multisched is able to update any one of the builtin
  1708. attributes like accrue_type
  1709. """
  1710. a = {'eligible_time_enable': 'True'}
  1711. self.server.manager(MGR_CMD_SET, SERVER, a)
  1712. self.setup_sc3()
  1713. self.setup_queues_nodes()
  1714. a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq3'}
  1715. J1 = Job(TEST_USER1, attrs=a)
  1716. J2 = Job(TEST_USER1, attrs=a)
  1717. jid1 = self.server.submit(J1)
  1718. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  1719. jid2 = self.server.submit(J2)
  1720. self.server.expect(JOB, {ATTR_state: 'Q'}, id=jid2)
  1721. # accrue_type = 2 is eligible_time
  1722. self.server.expect(JOB, {ATTR_accrue_type: 2}, id=jid2)
  1723. self.server.delete(jid1, wait=True)
  1724. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
  1725. # This makes sure that accrue_type is indeed getting changed
  1726. self.server.expect(JOB, {ATTR_accrue_type: 3}, id=jid2)
  1727. def test_multisched_not_crash(self):
  1728. """
  1729. Test to make sure Multisched does not crash when all nodes in partition
  1730. are not associated with the corresponding queue
  1731. """
  1732. self.setup_sc1()
  1733. self.setup_queues_nodes()
  1734. # Assign a queue with the partition P1. This queue association is not
  1735. # required as per the current Multisched feature. But this is just to
  1736. # verify even if we associate a queue to one of the nodes in partition
  1737. # the scheduler won't crash.
  1738. # Ex: Here we are associating wq1 to vnode[0] but vnode[4] has no
  1739. # queue associated to it. Expectation is in this case scheduler won't
  1740. # crash
  1741. a = {ATTR_queue: 'wq1'}
  1742. self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[0]', expect="True")
  1743. self.scheds['sc1'].terminate()
  1744. self.scheds['sc1'].start()
  1745. # Ideally the following statement is not requried. start() method
  1746. # itself should take care of updating the PID in its cache. I have
  1747. # created a new bug to fix in this framework. For the time being
  1748. # the following statement is required as a work around.
  1749. self.scheds['sc1']._update_pid(self.scheds['sc1'])
  1750. j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
  1751. 'Resource_List.select': '1:ncpus=1'})
  1752. jid1 = self.server.submit(j)
  1753. # If job goes to R state means scheduler is still alive.
  1754. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  1755. def test_multi_sched_job_sort_key(self):
  1756. """
  1757. Test to make sure that jobs are sorted as per
  1758. job_sort_key in a multi sched
  1759. """
  1760. self.setup_sc1()
  1761. self.setup_queues_nodes()
  1762. a = {'job_sort_key': '"ncpus LOW"'}
  1763. self.scheds['sc1'].set_sched_config(a)
  1764. self.server.manager(MGR_CMD_SET, SCHED,
  1765. {'scheduling': 'False'}, id="sc1")
  1766. j = Job(TEST_USER, {'Resource_List.ncpus': '2',
  1767. ATTR_queue: 'wq1'})
  1768. jid1 = self.server.submit(j)
  1769. j = Job(TEST_USER, {'Resource_List.ncpus': '1',
  1770. ATTR_queue: 'wq1'})
  1771. jid2 = self.server.submit(j)
  1772. self.server.manager(MGR_CMD_SET, SCHED,
  1773. {'scheduling': 'True'}, id="sc1")
  1774. self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
  1775. self.server.expect(JOB, {'job_state': 'Q'}, id=jid1)
  1776. def test_multi_sched_node_sort_key(self):
  1777. """
  1778. Test to make sure nodes are sorted in the order
  1779. as per node_sort_key in a multi sched
  1780. """
  1781. self.setup_sc1()
  1782. self.setup_queues_nodes()
  1783. a = {'partition': 'P1'}
  1784. self.server.manager(MGR_CMD_SET, NODE, a, id='@default', expect=True)
  1785. a = {'node_sort_key': '"ncpus HIGH " ALL'}
  1786. self.scheds['sc1'].set_sched_config(a)
  1787. a = {'resources_available.ncpus': 1}
  1788. self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[0]', expect=True)
  1789. a = {'resources_available.ncpus': 2}
  1790. self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[1]', expect=True)
  1791. a = {'resources_available.ncpus': 3}
  1792. self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[2]', expect=True)
  1793. a = {'resources_available.ncpus': 4}
  1794. self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[3]', expect=True)
  1795. # Offlining the node as we do not need for the test
  1796. a = {'state': 'offline'}
  1797. self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[4]', expect=True)
  1798. a = {'Resource_List.select': '1:ncpus=1',
  1799. 'Resource_List.place': 'excl',
  1800. ATTR_queue: 'wq1'}
  1801. j = Job(TEST_USER1, a)
  1802. jid = self.server.submit(j)
  1803. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  1804. self.check_vnodes(j, ['vnode[3]'], jid)
  1805. j = Job(TEST_USER1, a)
  1806. jid1 = self.server.submit(j)
  1807. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  1808. self.check_vnodes(j, ['vnode[2]'], jid1)
  1809. j = Job(TEST_USER1, a)
  1810. jid2 = self.server.submit(j)
  1811. self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
  1812. self.check_vnodes(j, ['vnode[1]'], jid2)
  1813. j = Job(TEST_USER1, a)
  1814. jid3 = self.server.submit(j)
  1815. self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
  1816. self.check_vnodes(j, ['vnode[0]'], jid3)
  1817. def test_multi_sched_priority_sockets(self):
  1818. """
  1819. Test scheduler socket connections from all the schedulers
  1820. are processed on priority
  1821. """
  1822. self.common_setup()
  1823. self.server.manager(MGR_CMD_SET, SERVER, {'log_events': 2047})
  1824. for name in self.scheds:
  1825. self.server.manager(MGR_CMD_SET, SCHED,
  1826. {'scheduling': 'False'}, id=name, expect=True)
  1827. a = {ATTR_queue: 'wq1',
  1828. 'Resource_List.select': '1:ncpus=2',
  1829. 'Resource_List.walltime': 60}
  1830. j = Job(TEST_USER1, attrs=a)
  1831. self.server.submit(j)
  1832. t = int(time.time())
  1833. self.server.manager(MGR_CMD_SET, SCHED,
  1834. {'scheduling': 'True'}, id='sc1', expect=True)
  1835. self.server.log_match("processing priority socket", starttime=t)
  1836. a = {ATTR_queue: 'wq2',
  1837. 'Resource_List.select': '1:ncpus=2',
  1838. 'Resource_List.walltime': 60}
  1839. j = Job(TEST_USER1, attrs=a)
  1840. self.server.submit(j)
  1841. t = int(time.time())
  1842. self.server.manager(MGR_CMD_SET, SCHED,
  1843. {'scheduling': 'True'}, id='sc2', expect=True)
  1844. self.server.log_match("processing priority socket", starttime=t)