pbs_release_limited_res_suspend.py 39 KB


  1. # coding: utf-8
  2. # Copyright (C) 1994-2018 Altair Engineering, Inc.
  3. # For more information, contact Altair at www.altair.com.
  4. #
  5. # This file is part of the PBS Professional ("PBS Pro") software.
  6. #
  7. # Open Source License Information:
  8. #
  9. # PBS Pro is free software. You can redistribute it and/or modify it under the
  10. # terms of the GNU Affero General Public License as published by the Free
  11. # Software Foundation, either version 3 of the License, or (at your option) any
  12. # later version.
  13. #
  14. # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
  15. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16. # FOR A PARTICULAR PURPOSE.
  17. # See the GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. #
  22. # Commercial License Information:
  23. #
  24. # For a copy of the commercial license terms and conditions,
  25. # go to: (http://www.pbspro.com/UserArea/agreement.html)
  26. # or contact the Altair Legal Department.
  27. #
  28. # Altair’s dual-license business model allows companies, individuals, and
  29. # organizations to create proprietary derivative works of PBS Pro and
  30. # distribute them - whether embedded or bundled with other software -
  31. # under a commercial license agreement.
  32. #
  33. # Use of Altair’s trademarks, including but not limited to "PBS™",
  34. # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
  35. # trademark licensing policies.
  36. import time
  37. from tests.functional import *
  38. class TestReleaseLimitedResOnSuspend(TestFunctional):
  39. """
  40. Test that based on admin's input only limited number of resources are
  41. released when suspending a running job.
  42. """
  43. def setUp(self):
  44. TestFunctional.setUp(self)
  45. # Set default resources available on the default mom
  46. a = {ATTR_rescavail + '.ncpus': 4, ATTR_rescavail + '.mem': '2gb'}
  47. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  48. # Create an express queue
  49. b = {ATTR_qtype: 'Execution', ATTR_enable: 'True',
  50. ATTR_start: 'True', ATTR_p: '200'}
  51. self.server.manager(MGR_CMD_CREATE, QUEUE, b, "expressq")
  52. def test_do_not_release_mem_sched_susp(self):
  53. """
  54. During preemption by suspension test that only ncpus are released from
  55. the running job and memory is not released.
  56. """
  57. # Set restrict_res_to_release_on_suspend server attribute
  58. a = {ATTR_restrict_res_to_release_on_suspend: 'ncpus'}
  59. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  60. # Submit a low priority job
  61. j1 = Job(TEST_USER)
  62. j1.set_attributes({ATTR_l + '.select': '1:ncpus=4:mem=512mb'})
  63. jid1 = self.server.submit(j1)
  64. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  65. # Submit a high priority job
  66. j2 = Job(TEST_USER)
  67. j2.set_attributes(
  68. {ATTR_l + '.select': '1:ncpus=2:mem=512mb',
  69. ATTR_q: 'expressq'})
  70. jid2 = self.server.submit(j2)
  71. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
  72. self.server.expect(JOB, {ATTR_state: 'S', ATTR_substate: 45}, id=jid1)
  73. ras_mem = ATTR_rescassn + '.mem'
  74. ras_ncpus = ATTR_rescassn + '.ncpus'
  75. rv = self.server.status(
  76. NODE, [ras_ncpus, ras_mem], id=self.mom.shortname)
  77. self.assertNotEqual(rv, None)
  78. self.assertEqual(rv[0][ras_mem], "1048576kb",
  79. msg="pbs should not release memory")
  80. self.assertEqual(rv[0][ras_ncpus], "2",
  81. msg="pbs did not release ncpus")
  82. def test_do_not_release_mem_qsig_susp(self):
  83. """
  84. If a running job is suspended using qsig, test that only ncpus are
  85. released from the running job and memory is not released.
  86. """
  87. # Set restrict_res_to_release_on_suspend server attribute
  88. a = {ATTR_restrict_res_to_release_on_suspend: 'ncpus'}
  89. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  90. # Submit a low priority job
  91. j1 = Job(TEST_USER)
  92. j1.set_attributes({ATTR_l + '.select': '1:ncpus=4:mem=512mb'})
  93. jid1 = self.server.submit(j1)
  94. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  95. # suspend job
  96. self.server.sigjob(jobid=jid1, signal="suspend")
  97. ras_mem = ATTR_rescassn + '.mem'
  98. ras_ncpus = ATTR_rescassn + '.ncpus'
  99. rv = self.server.status(
  100. NODE, [ras_ncpus, ras_mem], id=self.mom.shortname)
  101. self.assertNotEqual(rv, None)
  102. self.assertEqual(rv[0][ras_mem], "524288kb",
  103. msg="pbs should not release memory")
  104. self.assertEqual(rv[0][ras_ncpus], "0",
  105. msg="pbs did not release ncpus")
  106. def test_change_in_res_to_release_on_suspend(self):
  107. """
  108. set restrict_res_to_release_on_suspend to only ncpus and then suspend
  109. a job after the job is suspended change
  110. restrict_res_to_release_on_suspend to release only memory and check
  111. if the suspended job resumes and do not account for memory twice.
  112. """
  113. # Set restrict_res_to_release_on_suspend server attribute
  114. a = {ATTR_restrict_res_to_release_on_suspend: 'ncpus'}
  115. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  116. # Submit a low priority job
  117. j1 = Job(TEST_USER)
  118. j1.set_attributes({ATTR_l + '.select': '1:ncpus=4:mem=512mb'})
  119. jid1 = self.server.submit(j1)
  120. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  121. # Submit a high priority job
  122. j2 = Job(TEST_USER)
  123. j2.set_attributes(
  124. {ATTR_l + '.select': '1:ncpus=2:mem=256mb',
  125. ATTR_q: 'expressq'})
  126. jid2 = self.server.submit(j2)
  127. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
  128. self.server.expect(JOB, {ATTR_state: 'S', ATTR_substate: 45}, id=jid1)
  129. # Change restrict_res_to_release_on_suspend server attribute
  130. a = {ATTR_restrict_res_to_release_on_suspend: 'mem'}
  131. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  132. rc = 0
  133. try:
  134. rc = self.server.deljob(jid2, wait=True)
  135. except PbsDeljobError, e:
  136. self.assertEqual(rc, 0, e.msg[0])
  137. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  138. ras_mem = ATTR_rescassn + '.mem'
  139. ras_ncpus = ATTR_rescassn + '.ncpus'
  140. rv = self.server.status(
  141. NODE, [ras_ncpus, ras_mem], id=self.mom.shortname)
  142. self.assertNotEqual(rv, None)
  143. self.assertEqual(rv[0][ras_mem], "524288kb",
  144. msg="pbs did not account for memory correctly")
  145. self.assertEqual(rv[0][ras_ncpus], "4",
  146. msg="pbs did not account for ncpus correctly")
  147. def test_res_released_sched_susp(self):
  148. """
  149. Test if job's resources_released attribute is correctly set when
  150. it is suspended.
  151. """
  152. # Set restrict_res_to_release_on_suspend server attribute
  153. a = {ATTR_restrict_res_to_release_on_suspend: 'ncpus'}
  154. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  155. # Submit a low priority job
  156. j1 = Job(TEST_USER)
  157. j1.set_attributes({ATTR_l + '.select': '1:ncpus=4:mem=512mb'})
  158. jid1 = self.server.submit(j1)
  159. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  160. # Submit a high priority job
  161. j2 = Job(TEST_USER)
  162. j2.set_attributes(
  163. {ATTR_l + '.select': '1:ncpus=2:mem=512mb',
  164. ATTR_q: 'expressq'})
  165. jid2 = self.server.submit(j2)
  166. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
  167. self.server.expect(JOB, {ATTR_state: 'S', ATTR_substate: 45}, id=jid1)
  168. job = self.server.status(JOB, id=jid1)
  169. rr = "(%s:ncpus=4)" % self.mom.shortname
  170. self.assertEqual(job[0][ATTR_released], rr,
  171. msg="resources_released incorrect")
  172. def test_res_released_sched_susp_multi_vnode(self):
  173. """
  174. Test if job's resources_released attribute is correctly set when
  175. a multi vnode job is suspended.
  176. """
  177. # Set restrict_res_to_release_on_suspend server attribute
  178. a = {ATTR_restrict_res_to_release_on_suspend: 'ncpus'}
  179. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  180. vn_attrs = {ATTR_rescavail + '.ncpus': 8,
  181. ATTR_rescavail + '.mem': '1024mb'}
  182. self.server.create_vnodes("vnode1", vn_attrs, 1,
  183. self.mom, fname="vnodedef1")
  184. # Append a vnode
  185. vn_attrs = {ATTR_rescavail + '.ncpus': 6,
  186. ATTR_rescavail + '.mem': '1024mb'}
  187. self.server.create_vnodes("vnode2", vn_attrs, 1,
  188. self.mom, additive=True, fname="vnodedef2")
  189. # Submit a low priority job
  190. j1 = Job(TEST_USER)
  191. j1.set_attributes({ATTR_l + '.select':
  192. '1:ncpus=8:mem=512mb+1:ncpus=6:mem=256mb'})
  193. jid1 = self.server.submit(j1)
  194. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  195. # Submit a high priority job
  196. j2 = Job(TEST_USER)
  197. j2.set_attributes(
  198. {ATTR_l + '.select': '1:ncpus=8:mem=256mb',
  199. ATTR_q: 'expressq'})
  200. jid2 = self.server.submit(j2)
  201. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
  202. self.server.expect(JOB, {ATTR_state: 'S', ATTR_substate: 45}, id=jid1)
  203. job = self.server.status(JOB, id=jid1)
  204. rr = "(vnode1[0]:ncpus=8)+(vnode2[0]:ncpus=6)"
  205. print job[0][ATTR_released]
  206. self.assertEqual(job[0][ATTR_released], rr,
  207. msg="resources_released incorrect")
  208. def test_res_released_sched_susp_arrayjob(self):
  209. """
  210. Test if array subjob's resources_released attribute is correctly
  211. set when it is suspended.
  212. """
  213. # Set restrict_res_to_release_on_suspend server attribute
  214. a = {ATTR_restrict_res_to_release_on_suspend: 'ncpus'}
  215. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  216. # Submit a low priority job
  217. j1 = Job(TEST_USER)
  218. j1.set_attributes({ATTR_l + '.select': '1:ncpus=4:mem=512mb',
  219. ATTR_J: '1-3'})
  220. jid1 = self.server.submit(j1)
  221. subjobs = self.server.status(JOB, id=jid1, extend='t')
  222. sub_jid1 = subjobs[1]['id']
  223. self.server.expect(JOB, {ATTR_state: 'R'}, id=sub_jid1)
  224. # Submit a high priority job
  225. j2 = Job(TEST_USER)
  226. j2.set_attributes(
  227. {ATTR_l + '.select': '1:ncpus=2:mem=512mb',
  228. ATTR_q: 'expressq'})
  229. jid2 = self.server.submit(j2)
  230. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
  231. self.server.expect(JOB, {ATTR_state: 'S', ATTR_substate: 45},
  232. id=sub_jid1)
  233. job = self.server.status(JOB, id=sub_jid1)
  234. rr = "(%s:ncpus=4)" % self.mom.shortname
  235. self.assertEqual(job[0][ATTR_released], rr,
  236. msg="resources_released incorrect")
  237. def test_res_released_list_sched_susp_arrayjob(self):
  238. """
  239. Test if array subjob's resources_released_list attribute is correctly
  240. set when it is suspended.
  241. """
  242. # Set restrict_res_to_release_on_suspend server attribute
  243. a = {ATTR_restrict_res_to_release_on_suspend: 'ncpus,mem'}
  244. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  245. # Submit a low priority job
  246. j1 = Job(TEST_USER)
  247. j1.set_attributes({ATTR_l + '.select': '1:ncpus=4:mem=512mb',
  248. ATTR_J: '1-3'})
  249. jid1 = self.server.submit(j1)
  250. subjobs = self.server.status(JOB, id=jid1, extend='t')
  251. sub_jid1 = subjobs[1]['id']
  252. self.server.expect(JOB, {ATTR_state: 'R'}, id=sub_jid1)
  253. # Submit a high priority job
  254. j2 = Job(TEST_USER)
  255. j2.set_attributes(
  256. {ATTR_l + '.select': '1:ncpus=2:mem=256mb',
  257. ATTR_q: 'expressq'})
  258. jid2 = self.server.submit(j2)
  259. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
  260. self.server.expect(JOB, {ATTR_state: 'S', ATTR_substate: 45},
  261. id=sub_jid1)
  262. job = self.server.status(JOB, id=sub_jid1)
  263. rr_l_ncpus = job[0][ATTR_rel_list + ".ncpus"]
  264. self.assertEqual(rr_l_ncpus, "4", msg="ncpus not released")
  265. rr_l_mem = job[0][ATTR_rel_list + ".mem"]
  266. self.assertEqual(rr_l_mem, "524288kb", msg="memory not released")
  267. def test_res_released_list_sched_susp(self):
  268. """
  269. Test if job's resources_released_list attribute is correctly set when
  270. it is suspended.
  271. """
  272. # Set restrict_res_to_release_on_suspend server attribute
  273. a = {ATTR_restrict_res_to_release_on_suspend: 'ncpus,mem'}
  274. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  275. # Submit a low priority job
  276. j1 = Job(TEST_USER)
  277. j1.set_attributes({ATTR_l + '.select': '1:ncpus=4:mem=512mb'})
  278. jid1 = self.server.submit(j1)
  279. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  280. # Submit a high priority job
  281. j2 = Job(TEST_USER)
  282. j2.set_attributes(
  283. {ATTR_l + '.select': '1:ncpus=2:mem=256mb',
  284. ATTR_q: 'expressq'})
  285. jid2 = self.server.submit(j2)
  286. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
  287. self.server.expect(JOB, {ATTR_state: 'S', ATTR_substate: 45}, id=jid1)
  288. job = self.server.status(JOB, id=jid1)
  289. rr_l_ncpus = job[0][ATTR_rel_list + ".ncpus"]
  290. self.assertEqual(rr_l_ncpus, "4", msg="ncpus not released")
  291. rr_l_mem = job[0][ATTR_rel_list + ".mem"]
  292. self.assertEqual(rr_l_mem, "524288kb", msg="memory not released")
  293. def test_res_released_list_sched_susp_multi_vnode(self):
  294. """
  295. Test if job's resources_released_list attribute is correctly set when
  296. a multi vnode job is suspended.
  297. """
  298. # Set restrict_res_to_release_on_suspend server attribute
  299. a = {ATTR_restrict_res_to_release_on_suspend: 'ncpus,mem'}
  300. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  301. vn_attrs = {ATTR_rescavail + '.ncpus': 8,
  302. ATTR_rescavail + '.mem': '1024mb'}
  303. self.server.create_vnodes("vnode1", vn_attrs, 1,
  304. self.mom, fname="vnodedef1")
  305. # Append a vnode
  306. vn_attrs = {ATTR_rescavail + '.ncpus': 6,
  307. ATTR_rescavail + '.mem': '1024mb'}
  308. self.server.create_vnodes("vnode2", vn_attrs, 1,
  309. self.mom, additive=True, fname="vnodedef2")
  310. # Submit a low priority job
  311. j1 = Job(TEST_USER)
  312. j1.set_attributes({ATTR_l + '.select':
  313. '1:ncpus=8:mem=512mb+1:ncpus=6:mem=256mb'})
  314. jid1 = self.server.submit(j1)
  315. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  316. # Submit a high priority job
  317. j2 = Job(TEST_USER)
  318. j2.set_attributes(
  319. {ATTR_l + '.select': '1:ncpus=8:mem=256mb',
  320. ATTR_q: 'expressq'})
  321. jid2 = self.server.submit(j2)
  322. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
  323. self.server.expect(JOB, {ATTR_state: 'S', ATTR_substate: 45}, id=jid1)
  324. job = self.server.status(JOB, id=jid1)
  325. rr_l_ncpus = job[0][ATTR_rel_list + ".ncpus"]
  326. self.assertEqual(rr_l_ncpus, "14", msg="ncpus not released")
  327. rr_l_mem = job[0][ATTR_rel_list + ".mem"]
  328. self.assertNotEqual(rr_l_mem, "2097152kb", msg="memory not released")
  329. def test_node_res_after_deleting_suspended_job(self):
  330. """
  331. Test that once a suspended job is deleted node's resources assigned
  332. are back to 0.
  333. """
  334. # Set restrict_res_to_release_on_suspend server attribute
  335. a = {ATTR_restrict_res_to_release_on_suspend: 'ncpus'}
  336. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  337. # Submit a low priority job
  338. j1 = Job(TEST_USER)
  339. j1.set_attributes({ATTR_l + '.select': '1:ncpus=4:mem=512mb'})
  340. jid1 = self.server.submit(j1)
  341. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  342. # suspend job
  343. self.server.sigjob(jobid=jid1, signal="suspend")
  344. self.server.expect(JOB, {ATTR_state: 'S', ATTR_substate: 43}, id=jid1)
  345. ras_mem = ATTR_rescassn + '.mem'
  346. ras_ncpus = ATTR_rescassn + '.ncpus'
  347. rv = self.server.status(
  348. NODE, [ras_ncpus, ras_mem], id=self.mom.shortname)
  349. self.assertNotEqual(rv, None)
  350. self.assertEqual(
  351. rv[0][ras_mem], "524288kb",
  352. msg="pbs did not retain memory correctly on the node")
  353. self.assertEqual(
  354. rv[0][ras_ncpus], "0",
  355. msg="pbs did not release ncpus correctly on the node")
  356. rc = 0
  357. try:
  358. rc = self.server.deljob(jid1, wait=True)
  359. except PbsDeljobError, e:
  360. self.assertEqual(rc, 0, e.msg[0])
  361. rv = self.server.status(
  362. NODE, [ras_ncpus, ras_mem], id=self.mom.shortname)
  363. self.assertNotEqual(rv, None)
  364. self.assertEqual(
  365. rv[0][ras_mem], "0kb",
  366. msg="pbs did not reassign memory correctly on the node")
  367. self.assertEqual(
  368. rv[0][ras_ncpus], "0",
  369. msg="pbs did not reassign ncpus correctly on the node")
  370. def test_default_restrict_res_released_on_suspend(self):
  371. """
  372. Test the default value of restrict_res_to_release_on_suspend.
  373. It should release all the resources by default.
  374. """
  375. # Submit a low priority job
  376. j1 = Job(TEST_USER)
  377. j1.set_attributes({ATTR_l + '.select': '1:ncpus=4:mem=512mb'})
  378. jid1 = self.server.submit(j1)
  379. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  380. # Submit a high priority job
  381. j2 = Job(TEST_USER)
  382. j2.set_attributes(
  383. {ATTR_l + '.select': '1:ncpus=2:mem=256mb',
  384. ATTR_q: 'expressq'})
  385. jid2 = self.server.submit(j2)
  386. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
  387. self.server.expect(JOB, {ATTR_state: 'S', ATTR_substate: 45}, id=jid1)
  388. ras_mem = ATTR_rescassn + '.mem'
  389. ras_ncpus = ATTR_rescassn + '.ncpus'
  390. rv = self.server.status(
  391. NODE, [ras_ncpus, ras_mem], id=self.mom.shortname)
  392. self.assertNotEqual(rv, None)
  393. self.assertEqual(rv[0][ras_mem], "262144kb",
  394. msg="pbs did not release memory")
  395. self.assertEqual(rv[0][ras_ncpus], "2",
  396. msg="pbs did not release ncpus")
  397. def test_setting_unknown_resc(self):
  398. """
  399. Set a non existing resource in restrict_res_to_release_on_suspend
  400. and expect an unknown resource error
  401. """
  402. # Set restrict_res_to_release_on_suspend server attribute
  403. a = {ATTR_restrict_res_to_release_on_suspend: 'ncpus,abc'}
  404. try:
  405. self.server.manager(MGR_CMD_SET, SERVER, a)
  406. except PbsManagerError as e:
  407. self.assertTrue("Unknown resource" in e.msg[0])
  408. def test_delete_res_busy_on_res_to_release_list(self):
  409. """
  410. Create a resource, set it in restrict_res_to_release_on_suspend
  411. then delete the resource and check for resource busy error
  412. """
  413. # create a custom resource
  414. attr = {ATTR_RESC_TYPE: 'long'}
  415. self.server.manager(MGR_CMD_CREATE, RSC, attr, id='foo')
  416. self.server.manager(MGR_CMD_CREATE, RSC, attr, id='bar')
  417. # Set restrict_res_to_release_on_suspend server attribute
  418. a = {ATTR_restrict_res_to_release_on_suspend: 'ncpus,foo,bar'}
  419. self.server.manager(MGR_CMD_SET, SERVER, a)
  420. # delete the custom resources
  421. try:
  422. self.server.manager(MGR_CMD_DELETE, RSC, id='foo')
  423. except PbsManagerError as e:
  424. self.assertTrue("Resource busy on server" in e.msg[0])
  425. try:
  426. self.server.manager(MGR_CMD_DELETE, RSC, id='bar')
  427. except PbsManagerError as e:
  428. self.assertTrue("Resource busy on server" in e.msg[0])
  429. def test_queue_res_release_upon_suspension(self):
  430. """
  431. Create 2 consumable resources and set it on queue,
  432. set one of those resouces in restrict_res_to_release_on_suspend,
  433. submit a job requesting these resources, check if the resource
  434. set in restrict_res_to_release_on_suspend shows up as released
  435. on the queue
  436. """
  437. # create a custom resource
  438. attr = {ATTR_RESC_TYPE: 'long',
  439. ATTR_RESC_FLAG: 'q'}
  440. self.server.manager(MGR_CMD_CREATE, RSC, attr, id='foo')
  441. self.server.manager(MGR_CMD_CREATE, RSC, attr, id='bar')
  442. # Set foo in restrict_res_to_release_on_suspend server attribute
  443. a = {ATTR_restrict_res_to_release_on_suspend: 'ncpus,foo'}
  444. self.server.manager(MGR_CMD_SET, SERVER, a)
  445. a = {ATTR_rescavail + ".foo": '100',
  446. ATTR_rescavail + ".bar": '100'}
  447. self.server.manager(MGR_CMD_SET, QUEUE, a, id="workq")
  448. j1 = Job(TEST_USER)
  449. j1.set_attributes({ATTR_l + '.ncpus': '4',
  450. ATTR_l + '.foo': '30',
  451. ATTR_l + '.bar': '40'})
  452. jid1 = self.server.submit(j1)
  453. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  454. # suspend job
  455. self.server.sigjob(jobid=jid1, signal="suspend")
  456. ras_foo = ATTR_rescassn + '.foo'
  457. ras_bar = ATTR_rescassn + '.bar'
  458. rv = self.server.status(
  459. QUEUE, [ras_foo, ras_bar], id="workq")
  460. self.assertNotEqual(rv, None)
  461. self.assertEqual(rv[0][ras_foo], "0",
  462. msg="pbs did not release resource foo")
  463. self.assertEqual(rv[0][ras_bar], "40",
  464. msg="pbs should not release resource bar")
  465. def test_server_res_release_upon_suspension_using_qsig(self):
  466. """
  467. Create 2 consumable resources and set it on server,
  468. set one of those resouces in restrict_res_to_release_on_suspend,
  469. submit a job requesting these resources, check if the resource
  470. set in restrict_res_to_release_on_suspend shows up as released
  471. on the server when job is suspended using qsig
  472. """
  473. # create a custom resource
  474. attr = {ATTR_RESC_TYPE: 'long',
  475. ATTR_RESC_FLAG: 'q'}
  476. self.server.manager(MGR_CMD_CREATE, RSC, attr, id='foo')
  477. self.server.manager(MGR_CMD_CREATE, RSC, attr, id='bar')
  478. # Set foo in restrict_res_to_release_on_suspend server attribute
  479. a = {ATTR_restrict_res_to_release_on_suspend: 'ncpus,foo'}
  480. self.server.manager(MGR_CMD_SET, SERVER, a)
  481. a = {ATTR_rescavail + ".foo": '100',
  482. ATTR_rescavail + ".bar": '100'}
  483. self.server.manager(MGR_CMD_SET, SERVER, a)
  484. j1 = Job(TEST_USER)
  485. j1.set_attributes({ATTR_l + '.ncpus': '4',
  486. ATTR_l + '.foo': '30',
  487. ATTR_l + '.bar': '40'})
  488. jid1 = self.server.submit(j1)
  489. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  490. # suspend job
  491. self.server.sigjob(jobid=jid1, signal="suspend")
  492. ras_foo = ATTR_rescassn + '.foo'
  493. ras_bar = ATTR_rescassn + '.bar'
  494. rv = self.server.status(
  495. SERVER, [ras_foo, ras_bar])
  496. self.assertNotEqual(rv, None)
  497. self.assertEqual(rv[0][ras_foo], "0",
  498. msg="pbs did not release resource foo")
  499. self.assertEqual(rv[0][ras_bar], "40",
  500. msg="pbs should not release resource bar")
  501. def test_server_res_release_upon_suspension_using_preemption(self):
  502. """
  503. Create 2 consumable resources and set it on server,
  504. set one of those resouces in restrict_res_to_release_on_suspend,
  505. submit a job requesting these resources, check if the resource
  506. set in restrict_res_to_release_on_suspend shows up as released
  507. on the server when preemption happens
  508. """
  509. # create a custom resource
  510. attr = {ATTR_RESC_TYPE: 'long',
  511. ATTR_RESC_FLAG: 'q'}
  512. self.server.manager(MGR_CMD_CREATE, RSC, attr, id='foo')
  513. self.server.manager(MGR_CMD_CREATE, RSC, attr, id='bar')
  514. # Set foo in restrict_res_to_release_on_suspend server attribute
  515. a = {ATTR_restrict_res_to_release_on_suspend: 'ncpus,foo'}
  516. self.server.manager(MGR_CMD_SET, SERVER, a)
  517. # Add foo and bar to the resources scheduler checks for
  518. resources = self.scheduler.sched_config['resources']
  519. resources = resources[:-1] + ', foo, bar\"'
  520. self.scheduler.set_sched_config({'resources': resources})
  521. a = {ATTR_rescavail + ".foo": '100',
  522. ATTR_rescavail + ".bar": '100'}
  523. self.server.manager(MGR_CMD_SET, SERVER, a)
  524. # Submit 2 normal priority jobs
  525. j1 = Job(TEST_USER)
  526. j1.set_attributes({ATTR_l + '.ncpus': '1',
  527. ATTR_l + '.foo': '40',
  528. ATTR_l + '.bar': '20'})
  529. jid1 = self.server.submit(j1)
  530. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  531. j2 = Job(TEST_USER)
  532. j2.set_attributes({ATTR_l + '.ncpus': '1',
  533. ATTR_l + '.foo': '40',
  534. ATTR_l + '.bar': '20'})
  535. jid2 = self.server.submit(j2)
  536. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
  537. # Submit a high priority job
  538. j3 = Job(TEST_USER)
  539. j3.set_attributes({ATTR_l + '.ncpus': '1',
  540. ATTR_l + '.foo': '70',
  541. ATTR_l + '.bar': '20',
  542. ATTR_q: 'expressq'})
  543. jid3 = self.server.submit(j3)
  544. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid3)
  545. self.server.expect(JOB, {ATTR_state: 'S'}, id=jid2)
  546. self.server.expect(JOB, {ATTR_state: 'S'}, id=jid1)
  547. ras_foo = ATTR_rescassn + '.foo'
  548. ras_bar = ATTR_rescassn + '.bar'
  549. rv = self.server.status(
  550. SERVER, [ras_foo, ras_bar])
  551. self.assertNotEqual(rv, None)
  552. self.assertEqual(rv[0][ras_foo], "70",
  553. msg="pbs did not release resource foo")
  554. self.assertEqual(rv[0][ras_bar], "60",
  555. msg="pbs should not release resource bar")
  556. def test_node_custom_res_release_upon_suspension(self):
  557. """
  558. Create 2 consumable resources and set it on node,
  559. set one of those resouces in restrict_res_to_release_on_suspend,
  560. submit a job requesting these resources, check if the resource
  561. set in restrict_res_to_release_on_suspend shows up as released
  562. on the node
  563. """
  564. # create a custom resource
  565. attr = {ATTR_RESC_TYPE: 'long',
  566. ATTR_RESC_FLAG: 'nh'}
  567. self.server.manager(MGR_CMD_CREATE, RSC, attr, id='foo')
  568. self.server.manager(MGR_CMD_CREATE, RSC, attr, id='bar')
  569. # Set foo in restrict_res_to_release_on_suspend server attribute
  570. a = {ATTR_restrict_res_to_release_on_suspend: 'ncpus,foo'}
  571. self.server.manager(MGR_CMD_SET, SERVER, a)
  572. self.scheduler.add_resource("foo,bar")
  573. a = {ATTR_rescavail + ".foo": '100',
  574. ATTR_rescavail + ".bar": '100'}
  575. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname)
  576. j1 = Job(TEST_USER)
  577. j1.set_attributes({ATTR_l + '.ncpus': '4',
  578. ATTR_l + '.foo': '30',
  579. ATTR_l + '.bar': '40'})
  580. jid1 = self.server.submit(j1)
  581. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  582. # suspend job
  583. self.server.sigjob(jobid=jid1, signal="suspend")
  584. ras_foo = ATTR_rescassn + '.foo'
  585. ras_bar = ATTR_rescassn + '.bar'
  586. rv = self.server.status(
  587. NODE, [ras_foo, ras_bar], id=self.mom.shortname)
  588. self.assertNotEqual(rv, None)
  589. self.assertEqual(rv[0][ras_foo], "0",
  590. msg="pbs did not release resource foo")
  591. self.assertEqual(rv[0][ras_bar], "40",
  592. msg="pbs should not release resource bar")
  593. def test_resuming_with_no_res_released(self):
  594. """
  595. Set restrict_res_to_release_on_suspend to a resource that a job
  596. does not request and then suspend this running job using qsig
  597. check if such a job resumes when qsig -s resume is issued
  598. """
  599. # Set mem in restrict_res_to_release_on_suspend server attribute
  600. a = {ATTR_restrict_res_to_release_on_suspend: 'mem'}
  601. self.server.manager(MGR_CMD_SET, SERVER, a)
  602. j1 = Job(TEST_USER)
  603. j1.set_attributes({ATTR_l + '.ncpus': '4'})
  604. jid1 = self.server.submit(j1)
  605. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  606. # suspend job
  607. self.server.sigjob(jobid=jid1, signal="suspend")
  608. job = self.server.status(JOB, id=jid1)
  609. rr = "(%s:ncpus=0)" % self.mom.shortname
  610. self.assertEqual(job[0][ATTR_released], rr,
  611. msg="resources_released incorrect")
  612. # resume job
  613. self.server.sigjob(jobid=jid1, signal="resume")
  614. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  615. def test_resuming_with_no_res_released_multi_vnode(self):
  616. """
  617. Set restrict_res_to_release_on_suspend to a resource that multi-vnode
  618. job does not request and then suspend this running job using qsig
  619. check if such a job resumes when qsig -s resume is issued
  620. """
  621. # Set mem in restrict_res_to_release_on_suspend server attribute
  622. a = {ATTR_restrict_res_to_release_on_suspend: 'mem'}
  623. self.server.manager(MGR_CMD_SET, SERVER, a)
  624. vn_attrs = {ATTR_rescavail + '.ncpus': 2,
  625. ATTR_rescavail + '.mem': '1024mb'}
  626. self.server.create_vnodes("vnode1", vn_attrs, 1,
  627. self.mom, fname="vnodedef1")
  628. # Append a vnode
  629. vn_attrs = {ATTR_rescavail + '.ncpus': 6,
  630. ATTR_rescavail + '.mem': '1024mb'}
  631. self.server.create_vnodes("vnode2", vn_attrs, 1,
  632. self.mom, additive=True, fname="vnodedef2")
  633. j1 = Job(TEST_USER)
  634. j1.set_attributes({ATTR_l + '.select':
  635. '1:ncpus=2+1:ncpus=6',
  636. ATTR_l + '.place': 'vscatter'})
  637. jid1 = self.server.submit(j1)
  638. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  639. # suspend job
  640. self.server.sigjob(jobid=jid1, signal="suspend")
  641. job = self.server.status(JOB, id=jid1)
  642. rr = "(vnode1[0]:ncpus=0)+(vnode2[0]:ncpus=0)"
  643. self.assertEqual(job[0][ATTR_released], rr,
  644. msg="resources_released incorrect")
  645. # resume job
  646. self.server.sigjob(jobid=jid1, signal="resume")
  647. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  648. def test_resuming_excljob_with_no_res_released(self):
  649. """
  650. Set restrict_res_to_release_on_suspend to a resource that an node excl
  651. job does not request and then suspend this running job using peemption
  652. check if such a job resumes when high priority job is deleted
  653. """
  654. # Set mem in restrict_res_to_release_on_suspend server attribute
  655. a = {ATTR_restrict_res_to_release_on_suspend: 'mem'}
  656. self.server.manager(MGR_CMD_SET, SERVER, a)
  657. j1 = Job(TEST_USER)
  658. j1.set_attributes({ATTR_l + '.select': '1:ncpus=1',
  659. ATTR_l + '.place': 'excl'})
  660. jid1 = self.server.submit(j1)
  661. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  662. # Submit a high priority job
  663. j2 = Job(TEST_USER)
  664. j2.set_attributes(
  665. {ATTR_l + '.select': '1:ncpus=2',
  666. ATTR_q: 'expressq'})
  667. jid2 = self.server.submit(j2)
  668. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
  669. self.server.expect(JOB, {ATTR_state: 'S', ATTR_substate: 45}, id=jid1)
  670. job = self.server.status(JOB, id=jid1)
  671. rr = "(%s:ncpus=0)" % self.mom.shortname
  672. self.assertEqual(job[0][ATTR_released], rr,
  673. msg="resources_released incorrect")
  674. # resume job
  675. self.server.deljob(jid2, wait=True)
  676. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  677. def test_normal_user_unable_to_see_res_released(self):
  678. """
  679. Check if normal user (non-operator, non-manager) has privileges to see
  680. resources_released and resource_released_list attribute in job status
  681. """
  682. # Set mem in restrict_res_to_release_on_suspend server attribute
  683. a = {ATTR_restrict_res_to_release_on_suspend: 'mem'}
  684. self.server.manager(MGR_CMD_SET, SERVER, a)
  685. j1 = Job(TEST_USER)
  686. j1.set_attributes({ATTR_l + '.select': '1:ncpus=4:mem=512mb'})
  687. jid1 = self.server.submit(j1)
  688. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  689. # suspend job
  690. self.server.sigjob(jobid=jid1, signal="suspend")
  691. self.server.expect(JOB, {ATTR_state: 'S'}, id=jid1)
  692. # stat the job as a normal user
  693. attrs = self.server.status(JOB, id=jid1, runas=TEST_USER)
  694. self.assertFalse("resources_released" in attrs[0],
  695. "Normal user can see resources_released "
  696. "which is not expected")
  697. self.assertFalse("resource_released_list.mem" in attrs[0],
  698. "Normal user can see resources_released_list "
  699. "which is not expected")
  700. def test_if_node_gets_oversubscribed(self):
  701. """
  702. Check if the node gets oversubscribed if a filler job runs
  703. on resources left on the node after suspension.
  704. """
  705. # Set mem in restrict_res_to_release_on_suspend server attribute
  706. a = {ATTR_restrict_res_to_release_on_suspend: 'mem'}
  707. self.server.manager(MGR_CMD_SET, SERVER, a)
  708. a = {ATTR_sched_preempt_enforce_resumption: True}
  709. self.server.manager(MGR_CMD_SET, SCHED, a)
  710. j1 = Job(TEST_USER)
  711. j1.set_attributes({ATTR_l + '.select': '1:ncpus=2:mem=512mb'})
  712. jid1 = self.server.submit(j1)
  713. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  714. # Submit a filler job
  715. j2 = Job(TEST_USER)
  716. j2.set_attributes({ATTR_l + '.select': '1:ncpus=3',
  717. ATTR_l + '.walltime': 50})
  718. jid2 = self.server.submit(j2)
  719. self.server.expect(JOB, {ATTR_state: 'Q'}, id=jid2)
  720. # Submit a high priority job
  721. j3 = Job(TEST_USER)
  722. j3.set_attributes({ATTR_l + '.select': '1:ncpus=1:mem=2gb',
  723. ATTR_q: 'expressq',
  724. ATTR_l + '.walltime': 100})
  725. jid3 = self.server.submit(j3)
  726. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid3)
  727. self.server.expect(JOB, {ATTR_state: 'S'}, id=jid1)
  728. # Check that resources_assigned is not exceeding resources_available
  729. ras_ncpus = ATTR_rescassn + '.ncpus'
  730. rav_ncpus = ATTR_rescavail + '.ncpus'
  731. rv = self.server.status(
  732. NODE, [ras_ncpus, rav_ncpus], id=self.mom.shortname)
  733. self.assertNotEqual(rv, None)
  734. self.assertLessEqual(rv[0][ras_ncpus], rv[0][rav_ncpus],
  735. msg="pbs released resource ncpus incorrectly")
  736. # Expect filler job to be in queued state because
  737. # suspended job did not release any ncpus
  738. self.server.expect(JOB, {ATTR_state: 'Q'}, id=jid2)
  739. def test_suspended_job_gets_calendered(self):
  740. """
  741. Check if a job which releases limited amount of resources gets
  742. calendared in the same cycle when it gets suspended.
  743. """
  744. # Set mem in restrict_res_to_release_on_suspend server attribute
  745. a = {ATTR_restrict_res_to_release_on_suspend: 'mem'}
  746. self.server.manager(MGR_CMD_SET, SERVER, a)
  747. a = {ATTR_sched_preempt_enforce_resumption: True}
  748. self.server.manager(MGR_CMD_SET, SCHED, a)
  749. # Set 5 ncpus available on the node
  750. a = {ATTR_rescavail + '.ncpus': 5}
  751. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  752. j1 = Job(TEST_USER)
  753. j1.set_attributes({ATTR_l + '.select': '1:ncpus=3:mem=1512mb'})
  754. jid1 = self.server.submit(j1)
  755. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  756. # Submit a high priority job
  757. j2 = Job(TEST_USER)
  758. j2.set_attributes({ATTR_l + '.select': '1:ncpus=2:mem=2gb',
  759. ATTR_q: 'expressq',
  760. ATTR_l + '.walltime': 100})
  761. jid2 = self.server.submit(j2)
  762. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
  763. self.server.expect(JOB, {ATTR_state: 'S'}, id=jid1)
  764. # Check if the job is calendared
  765. self.scheduler.log_match(
  766. jid1 + ";Can't find start time estimate", existence=False,
  767. max_attempts=2)
  768. def helper_test_preempt_release_all(self, preempt_method):
  769. """
  770. Helper function to test that when preempting jobs, all resources
  771. are released during preemption simulation for R and C methods
  772. """
  773. if preempt_method == "R":
  774. schedlog_msg = "Job preempted by requeuing"
  775. elif preempt_method == "C":
  776. schedlog_msg = "Job preempted by checkpointing"
  777. else:
  778. raise Exception("Unexpected value of argument preempt_method: %s"
  779. % (preempt_method))
  780. a = {ATTR_restrict_res_to_release_on_suspend: 'mem'}
  781. self.server.manager(MGR_CMD_SET, SERVER, a)
  782. self.scheduler.set_sched_config({'preempt_order': preempt_method})
  783. # Set 1gb mem available on the node
  784. a = {ATTR_rescavail + '.ncpus': "2"}
  785. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  786. # Submit a low priority jobs which takes up all of the ncpus
  787. j1 = Job(TEST_USER)
  788. j1.set_attributes({ATTR_l + '.select': '1:ncpus=2'})
  789. jid1 = self.server.submit(j1)
  790. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  791. # Submit a high priority job which requests 1 ncpus
  792. j2 = Job(TEST_USER)
  793. j2.set_attributes({ATTR_l + '.select': '1:ncpus=1',
  794. ATTR_q: 'expressq'})
  795. jid2 = self.server.submit(j2)
  796. # Even though server is configured to only release mem for suspend,
  797. # for requeue and checkpointing, we should have released ncpus as well
  798. # and correctly preempted the low priority job
  799. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
  800. self.scheduler.log_match(jid1 + ";" + schedlog_msg)
  801. def test_preempt_requeue_release_all(self):
  802. """
  803. Test that when preempting jobs via Requeue, all resources
  804. are release during the preemption simulation
  805. """
  806. self.helper_test_preempt_release_all("R")
  807. def test_preempt_checkpoint_release_all(self):
  808. """
  809. Test that when preempting jobs via Checkpointing, all resources
  810. are release during the preemption simulation
  811. """
  812. # Create checkpoint
  813. chk_script = """#!/bin/bash
  814. kill $1
  815. exit 0
  816. """
  817. self.chk_file = self.du.create_temp_file(body=chk_script)
  818. self.du.chmod(path=self.chk_file, mode=0o755)
  819. self.du.chown(path=self.chk_file, uid=0, gid=0, sudo=True)
  820. c = {'$action': 'checkpoint_abort 30 !' + self.chk_file + ' %sid'}
  821. self.mom.add_config(c)
  822. self.helper_test_preempt_release_all("C")