pbs_admin_suspend.py 34 KB


  1. # coding: utf-8
  2. # Copyright (C) 1994-2018 Altair Engineering, Inc.
  3. # For more information, contact Altair at www.altair.com.
  4. #
  5. # This file is part of the PBS Professional ("PBS Pro") software.
  6. #
  7. # Open Source License Information:
  8. #
  9. # PBS Pro is free software. You can redistribute it and/or modify it under the
  10. # terms of the GNU Affero General Public License as published by the Free
  11. # Software Foundation, either version 3 of the License, or (at your option) any
  12. # later version.
  13. #
  14. # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
  15. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16. # FOR A PARTICULAR PURPOSE.
  17. # See the GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. #
  22. # Commercial License Information:
  23. #
  24. # For a copy of the commercial license terms and conditions,
  25. # go to: (http://www.pbspro.com/UserArea/agreement.html)
  26. # or contact the Altair Legal Department.
  27. #
  28. # Altair’s dual-license business model allows companies, individuals, and
  29. # organizations to create proprietary derivative works of PBS Pro and
  30. # distribute them - whether embedded or bundled with other software -
  31. # under a commercial license agreement.
  32. #
  33. # Use of Altair’s trademarks, including but not limited to "PBS™",
  34. # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
  35. # trademark licensing policies.
  36. import time
  37. from tests.functional import *
  38. class TestAdminSuspend(TestFunctional):
  39. """
  40. Test the admin-suspend/admin-resume feature for node maintenance
  41. """
  42. def setUp(self):
  43. TestFunctional.setUp(self)
  44. a = {'resources_available.ncpus': 4, 'resources_available.mem': '4gb'}
  45. self.server.create_vnodes('vn', a, 1, self.mom)
  46. def test_basic(self):
  47. """
  48. Test basic admin-suspend functionality
  49. """
  50. j1 = Job(TEST_USER)
  51. jid1 = self.server.submit(j1)
  52. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid1)
  53. j2 = Job(TEST_USER)
  54. jid2 = self.server.submit(j2)
  55. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid2)
  56. # admin-suspend job 1.
  57. self.server.sigjob(jid1, 'admin-suspend', runas=ROOT_USER)
  58. self.server.expect(JOB, {'job_state': 'S'}, id=jid1)
  59. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  60. self.server.expect(NODE, {'maintenance_jobs': jid1})
  61. # admin-suspend job 2
  62. self.server.sigjob(jid2, 'admin-suspend', runas=ROOT_USER)
  63. self.server.expect(JOB, {'job_state': 'S'}, id=jid2)
  64. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  65. self.server.expect(NODE, {'maintenance_jobs': jid1 + "," + jid2})
  66. # admin-resume job 1. Make sure the node is still in state maintenance
  67. self.server.sigjob(jid1, 'admin-resume', runas=ROOT_USER)
  68. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  69. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  70. self.server.expect(NODE, {'maintenance_jobs': jid2})
  71. # admin-resume job 2. Make sure the node retuns to state free
  72. self.server.sigjob(jid2, 'admin-resume', runas=ROOT_USER)
  73. self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
  74. self.server.expect(NODE, {'state': 'free'}, id='vn[0]')
  75. def test_basic_ja(self):
  76. """
  77. Test basic admin-suspend functionality for job arrays
  78. """
  79. jA = Job(TEST_USER)
  80. jA.set_attributes({'Resource_List.select': '1:ncpus=1', ATTR_J: '1-2'})
  81. jidA = self.server.submit(jA)
  82. self.server.expect(JOB, {'job_state': 'B'}, id=jidA)
  83. subjobs = self.server.status(JOB, id=jidA, extend='t')
  84. # subjobs[0] is the array itself. Need the subjobs
  85. jid1 = subjobs[1]['id']
  86. jid2 = subjobs[2]['id']
  87. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid1)
  88. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid2)
  89. # admin-suspend job 1.
  90. self.server.sigjob(jid1, 'admin-suspend', runas=ROOT_USER)
  91. self.server.expect(JOB, {'job_state': 'S'}, id=jid1)
  92. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  93. self.server.expect(NODE, {'maintenance_jobs': jid1})
  94. # admin-suspend job 2
  95. self.server.sigjob(jid2, 'admin-suspend', runas=ROOT_USER)
  96. self.server.expect(JOB, {'job_state': 'S'}, id=jid2)
  97. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  98. self.server.expect(NODE, {'maintenance_jobs': jid1 + "," + jid2})
  99. # admin-resume job 1. Make sure the node is still in state maintenance
  100. self.server.sigjob(jid1, 'admin-resume', runas=ROOT_USER)
  101. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  102. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  103. self.server.expect(NODE, {'maintenance_jobs': jid2})
  104. # admin-resume job 2. Make sure the node retuns to state free
  105. self.server.sigjob(jid2, 'admin-resume', runas=ROOT_USER)
  106. self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
  107. self.server.expect(NODE, {'state': 'free'}, id='vn[0]')
  108. def test_basic_restart(self):
  109. """
  110. Test basic admin-suspend functionality with server restart
  111. The restart will test if the node recovers properly in maintenance
  112. """
  113. j1 = Job(TEST_USER)
  114. jid = self.server.submit(j1)
  115. self.server.expect(
  116. JOB, {'job_state': 'R', 'substate': 42}, attrop=PTL_AND, id=jid)
  117. # admin-suspend job
  118. self.server.sigjob(jid, 'admin-suspend', runas=ROOT_USER)
  119. self.server.expect(JOB, {'job_state': 'S'}, id=jid)
  120. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  121. self.server.expect(NODE, {'maintenance_jobs': jid})
  122. self.server.restart()
  123. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  124. self.server.expect(NODE, {'maintenance_jobs': jid})
  125. # Adding sleep to avoid failure at resume since PBS licenses
  126. # might not be available and as a result resume fails
  127. time.sleep(2)
  128. # admin-resume job
  129. self.server.sigjob(jid, 'admin-resume', runas=ROOT_USER)
  130. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  131. self.server.expect(NODE, {'state': 'free'}, id='vn[0]')
  132. def test_cmd_perm(self):
  133. """
  134. Test permissions on admin-suspend, admin-resume, maintenance_jobs
  135. and the maintenace node state.
  136. """
  137. # Test to make sure we can't set the maintenance node state
  138. try:
  139. self.server.manager(
  140. MGR_CMD_SET, NODE,
  141. {'state': 'maintenance'}, id='vn[0]', runas=ROOT_USER)
  142. except PbsManagerError as e:
  143. self.assertTrue('Illegal value for node state' in e.msg[0])
  144. self.server.expect(NODE, {'state': 'free'}, id='vn[0]')
  145. # Test to make sure we can't set the 'maintenance_jobs' attribute
  146. try:
  147. self.server.manager(
  148. MGR_CMD_SET, NODE,
  149. {'maintenance_jobs': 'foo'}, id='vn[0]', runas=ROOT_USER)
  150. except PbsManagerError as e:
  151. self.assertTrue(
  152. 'Cannot set attribute, read only or insufficient permission'
  153. in e.msg[0])
  154. self.server.expect(NODE, 'maintenance_jobs', op=UNSET, id='vn[0]')
  155. # Test to make sure regular users can't admin-suspend jobs
  156. j = Job(TEST_USER)
  157. jid = self.server.submit(j)
  158. self.server.expect(
  159. JOB, {'job_state': 'R', 'substate': 42}, attrop=PTL_AND, id=jid)
  160. try:
  161. self.server.sigjob(jid, 'admin-suspend', runas=TEST_USER)
  162. except PbsSignalError as e:
  163. self.assertTrue('Unauthorized Request' in e.msg[0])
  164. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid)
  165. # Test to make sure regular users can't admin-resume jobs
  166. self.server.sigjob(jid, 'admin-suspend', runas=ROOT_USER)
  167. self.server.expect(JOB, {'job_state': 'S'}, id=jid)
  168. try:
  169. self.server.sigjob(jid, 'admin-resume', runas=TEST_USER)
  170. except PbsSignalError as e:
  171. self.assertTrue('Unauthorized Request' in e.msg[0])
  172. self.server.expect(JOB, {'job_state': 'S'}, id=jid)
  173. def test_wrong_state1(self):
  174. """
  175. Test using wrong resume signal is correctly rejected
  176. """
  177. j1 = Job(TEST_USER)
  178. jid1 = self.server.submit(j1)
  179. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid1)
  180. self.server.sigjob(jid1, "suspend", runas=ROOT_USER)
  181. self.server.expect(JOB, {'job_state': 'S'}, id=jid1)
  182. try:
  183. self.server.sigjob(jid1, "admin-resume", runas=ROOT_USER)
  184. except PbsSignalError as e:
  185. self.assertTrue(
  186. 'Job can not be resumed with the requested resume signal'
  187. in e.msg[0])
  188. self.server.expect(JOB, {'job_state': 'S'}, id=jid1)
  189. def test_wrong_state2(self):
  190. """
  191. Test using wrong resume signal is correctly rejected
  192. """
  193. j1 = Job(TEST_USER)
  194. jid1 = self.server.submit(j1)
  195. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid1)
  196. self.server.sigjob(jid1, "admin-suspend", runas=ROOT_USER)
  197. self.server.expect(JOB, {'job_state': 'S'}, id=jid1)
  198. self.server.expect(JOB, {'substate': 43}, id=jid1)
  199. try:
  200. self.server.sigjob(jid1, "resume", runas=ROOT_USER)
  201. except PbsSignalError as e:
  202. self.assertTrue(
  203. 'Job can not be resumed with the requested resume signal'
  204. in e.msg[0])
  205. # If resume had worked, the job would be in substate 45
  206. self.server.expect(JOB, {'substate': 43}, id=jid1)
  207. def test_deljob(self):
  208. """
  209. Test whether a node leaves the maintenance state when
  210. an admin-suspendedd job is deleted
  211. """
  212. j = Job(TEST_USER)
  213. jid = self.server.submit(j)
  214. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid)
  215. self.server.sigjob(jid, 'admin-suspend', runas=ROOT_USER)
  216. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  217. self.server.deljob(jid, wait=True)
  218. self.server.expect(NODE, {'state': 'free'}, id='vn[0]')
  219. def test_deljob_force(self):
  220. """
  221. Test whether a node leaves the maintenance state when
  222. an admin-suspendedd job is deleted with -Wforce
  223. """
  224. j = Job(TEST_USER)
  225. jid = self.server.submit(j)
  226. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid)
  227. self.server.sigjob(jid, 'admin-suspend', runas=ROOT_USER)
  228. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  229. self.server.deljob(jid, extend='force', wait=True)
  230. self.server.expect(NODE, {'state': 'free'}, id='vn[0]')
  231. def test_rerunjob(self):
  232. """
  233. Test whether a node leaves the maintenance state when
  234. an admin-suspended job is requeued
  235. """
  236. j = Job(TEST_USER)
  237. jid = self.server.submit(j)
  238. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid)
  239. self.server.sigjob(jid, 'admin-suspend', runas=ROOT_USER)
  240. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  241. self.server.rerunjob(jid, extend='force')
  242. # Job eventually goes to R state after being requeued for short time
  243. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  244. self.server.expect(NODE, {'state': 'free'}, id='vn[0]')
  245. def test_multivnode(self):
  246. """
  247. Submit a job to multiple vnodes. Send an admin-suspend signal
  248. and see all nodes go into maintenance
  249. """
  250. a = {'resources_available.ncpus': 4, 'resources_available.mem': '4gb'}
  251. self.server.create_vnodes('vn', a, 3, self.mom, usenatvnode=True)
  252. j = Job(TEST_USER)
  253. j.set_attributes({'Resource_List.select': '3:ncpus=1',
  254. 'Resource_List.place': 'vscatter'})
  255. jid = self.server.submit(j)
  256. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid)
  257. self.server.sigjob(jid, 'admin-suspend', runas=ROOT_USER)
  258. self.server.expect(NODE, {'state=maintenance': 3})
  259. self.server.expect(JOB, {'job_state': 'S'}, id=jid)
  260. self.server.sigjob(jid, 'admin-resume', runas=ROOT_USER)
  261. self.server.expect(NODE, {'state=free': 3})
  262. def test_multivnode2(self):
  263. """
  264. Submit a job to multiple vnodes. Send an admin-suspend signal
  265. and see all nodes go into maintenance
  266. Submit a single node job to one of the nodes. Resume the multinode
  267. Job and see the single node job's node stil in maintenance
  268. """
  269. a = {'resources_available.ncpus': 4, 'resources_available.mem': '4gb'}
  270. self.server.create_vnodes('vn', a, 3, self.mom, usenatvnode=True)
  271. # Submit multinode job 1
  272. j1 = Job(TEST_USER)
  273. j1.set_attributes({'Resource_List.select': '3:ncpus=1',
  274. 'Resource_List.place': 'vscatter'})
  275. jid1 = self.server.submit(j1)
  276. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid1)
  277. # Submit Job 2 to specific node
  278. j2 = Job(TEST_USER)
  279. j2.set_attributes({'Resource_List.select': '1:ncpus=1:vnode=vn[0]'})
  280. jid2 = self.server.submit(j2)
  281. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid2)
  282. # admin-suspend job 1 and see all three nodes go into maintenance
  283. self.server.sigjob(jid1, 'admin-suspend')
  284. self.server.expect(JOB, {'job_state': 'S'}, id=jid1)
  285. self.server.expect(NODE, {'state=maintenance': 3})
  286. # admin-suspend job 2
  287. self.server.sigjob(jid2, 'admin-suspend', runas=ROOT_USER)
  288. self.server.expect(JOB, {'job_state': 'S'}, id=jid2)
  289. # admin-resume job1 and see one node stay in maintenance
  290. self.server.sigjob(jid1, 'admin-resume', runas=ROOT_USER)
  291. self.server.expect(NODE, {'state=free': 2})
  292. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  293. def test_multivnode_excl(self):
  294. """
  295. Submit an excl job to multiple vnodes. Send an admin-suspend
  296. signal and see all nodes go into maintenance
  297. """
  298. a = {'resources_available.ncpus': 4, 'resources_available.mem': '4gb'}
  299. self.server.create_vnodes('vn', a, 3, self.mom, usenatvnode=True)
  300. j = Job(TEST_USER)
  301. j.set_attributes({'Resource_List.select': '3:ncpus=1',
  302. 'Resource_List.place': 'vscatter:excl'})
  303. jid = self.server.submit(j)
  304. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid)
  305. self.server.expect(NODE, {'state=job-exclusive': 3})
  306. self.server.sigjob(jid, 'admin-suspend', runas=ROOT_USER)
  307. self.server.expect(NODE, {'state=maintenance': 3})
  308. self.server.expect(JOB, {'job_state': 'S'}, id=jid)
  309. self.server.sigjob(jid, 'admin-resume', runas=ROOT_USER)
  310. self.server.expect(NODE, {'state=job-exclusive': 3})
  311. def test_degraded_resv(self):
  312. """
  313. Test if a reservation goes into the degraded state after its node is
  314. put into maintenance
  315. """
  316. # Submit a reservation
  317. r = Reservation(TEST_USER)
  318. r.set_attributes({'Resource_List.select': '1:ncpus=1',
  319. 'reserve_start': time.time() + 3600,
  320. 'reserve_end': time.time() + 7200})
  321. rid = self.server.submit(r)
  322. # See reservation is confirmed
  323. a = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  324. d = self.server.expect(RESV, a, rid)
  325. # Submit a job and see it run
  326. j = Job(TEST_USER)
  327. j.set_attributes({'Resource_List.select': '1:ncpus=1',
  328. 'Resource_List.walltime': 120})
  329. jid = self.server.submit(j)
  330. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid)
  331. # Admin-suspend job
  332. self.server.sigjob(jid, 'admin-suspend', runas=ROOT_USER)
  333. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  334. # See reservation in degreaded state
  335. a = {'reserve_state': (MATCH_RE, 'RESV_DEGRADED|10')}
  336. d = self.server.expect(RESV, a, rid)
  337. @timeout(400)
  338. def test_resv_jobend(self):
  339. """
  340. Test if a node goes back to free state when reservation ends and
  341. admin-suspended job is killed
  342. """
  343. # Submit a reservation
  344. r = Reservation(TEST_USER)
  345. r.set_attributes({'Resource_List.select': '1:ncpus=1',
  346. 'reserve_start': time.time() + 30,
  347. 'reserve_end': time.time() + 60})
  348. rid = self.server.submit(r)
  349. # See reservation is confirmed
  350. a = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  351. d = self.server.expect(RESV, a, id=rid)
  352. # Submit a job
  353. j = Job(TEST_USER)
  354. rque = rid.split(".")
  355. j.set_attributes({'queue': rque[0]})
  356. jid = self.server.submit(j)
  357. self.server.expect(JOB, {'job_state': 'Q'}, id=jid)
  358. # Wait for reservation to start
  359. a = {'reserve_state': (MATCH_RE, 'RESV_RUNNING|3')}
  360. d = self.server.expect(RESV, a, rid, offset=30)
  361. # job is running as well
  362. self.server.expect(
  363. JOB, {'job_state': 'R', 'substate': 42},
  364. id=jid, max_attempts=30)
  365. # Admin-suspend job
  366. self.server.sigjob(jid, 'admin-suspend', runas=ROOT_USER)
  367. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  368. # Submit another job outside of reservation
  369. j = Job(TEST_USER)
  370. jid2 = self.server.submit(j)
  371. self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
  372. # Wait for the reservation to get over
  373. # Job also gets deleted and node state goes back to free
  374. self.server.expect(JOB, 'queue', op=UNSET, id=jid, offset=120)
  375. self.server.expect(NODE, {'state': 'free'}, id='vn[0]')
  376. # job2 starts running
  377. self.server.expect(JOB, {'job_state': 'R'}, id=jid2, max_attempts=60)
  378. def test_que(self):
  379. """
  380. Test to check that job gets suspended on non-default queue
  381. """
  382. # create a high priority workq2 and a routeq
  383. a = {'queue_type': 'execution', 'started': 't', 'enabled': 't',
  384. 'priority': 150}
  385. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='workq2')
  386. a = {'queue_type': 'route', 'started': 't', 'enabled': 't',
  387. 'route_destinations': 'workq2'}
  388. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='route')
  389. # submit a normal job
  390. j = Job(TEST_USER)
  391. j.set_attributes({'Resource_List.select': '1:ncpus=3'})
  392. jid1 = self.server.submit(j)
  393. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid1)
  394. # submit a high priority job. Make sure job1 is suspended.
  395. j = Job(TEST_USER)
  396. j.set_attributes(
  397. {'Resource_List.select': '1:ncpus=3', 'queue': 'route'})
  398. jid2 = self.server.submit(j)
  399. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid2)
  400. self.server.expect(JOB, {'job_state': 'S'}, id=jid1)
  401. # Above will not cause node state to go to maintenance
  402. self.server.expect(
  403. NODE, {'state': (MATCH_RE, 'free|job-exclusive')}, id='vn[0]')
  404. # admin suspend job2
  405. self.server.sigjob(jid2, 'admin-suspend', runas=ROOT_USER)
  406. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  407. self.server.expect(JOB, {'job_state=S': 2})
  408. # Releasing job1 will fail and not change node state
  409. rv = self.server.sigjob(jid1, 'resume', runas=ROOT_USER, logerr='True')
  410. self.assertFalse(rv)
  411. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  412. # deleting job1 will not change node state either
  413. self.server.deljob(jid1, wait=True)
  414. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  415. # Admin-resume job2
  416. self.server.sigjob(jid2, 'admin-resume', runas=ROOT_USER)
  417. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid2)
  418. self.server.expect(NODE, {'state': 'free'}, id='vn[0]')
  419. # suspend the job
  420. self.server.sigjob(jid2, 'suspend', runas=ROOT_USER)
  421. self.server.expect(JOB, {'job_state': 'S'}, id=jid2)
  422. self.server.expect(
  423. NODE, {'state': (MATCH_RE, 'free|job-exclusive')}, id='vn[0]')
  424. def test_resume(self):
  425. """
  426. Test node state remains in maintenance until
  427. all jobs are not resumed
  428. """
  429. a = {'resources_available.ncpus': 4, 'resources_available.mem': '4gb'}
  430. self.server.create_vnodes('vn', a, 3, self.mom, usenatvnode=True)
  431. j = Job(TEST_USER)
  432. j.set_attributes({'Resource_List.select': '3:ncpus=1',
  433. 'Resource_List.place': 'vscatter'})
  434. jid1 = self.server.submit(j)
  435. jid2 = self.server.submit(j)
  436. jid3 = self.server.submit(j)
  437. self.server.expect(JOB, {'job_state=R': 3, 'substate=42': 3})
  438. self.server.expect(NODE, {'state=free': 3})
  439. # admin suspend first 2 jobs and let 3rd job run
  440. # First only suspend job1 and verify that it will
  441. # put all the nodes to maintenance state
  442. self.server.sigjob(jid1, 'admin-suspend', runas=ROOT_USER)
  443. self.server.expect(NODE, {'state=maintenance': 3})
  444. self.server.sigjob(jid2, 'admin-suspend', runas=ROOT_USER)
  445. self.server.expect(JOB, {'job_state=S': 2})
  446. self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
  447. # submit a new job and it will be queued
  448. j = Job(TEST_USER)
  449. jid4 = self.server.submit(j)
  450. self.server.expect(JOB, {'job_state': 'Q'}, id=jid4)
  451. # List all maintenance_jobs
  452. self.server.expect(NODE, {'maintenance_jobs': jid1 + "," + jid2})
  453. # resume 1 job that will not change node state
  454. self.server.sigjob(jid1, 'admin-resume', runas=ROOT_USER)
  455. self.server.expect(NODE, {'state=maintenance': 3})
  456. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid1)
  457. self.server.expect(JOB, {'job_state': 'S'}, id=jid2)
  458. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid3)
  459. # resume the remaining job
  460. self.server.sigjob(jid2, 'admin-resume', runas=ROOT_USER)
  461. self.server.expect(NODE, {'state=free': 3})
  462. self.server.expect(JOB, {'job_state=R': 4})
  463. def test_admin_resume_loop(self):
  464. """
  465. Test that running admin-resume in a loop will have no impact on PBS
  466. """
  467. # submit a job
  468. j = Job(TEST_USER)
  469. jid1 = self.server.submit(j)
  470. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid1)
  471. # admin suspend and resume job in a loop
  472. for x in range(15):
  473. self.server.sigjob(jid1, 'admin-suspend', runas=ROOT_USER)
  474. self.server.expect(JOB, {'job_state': 'S'}, id=jid1)
  475. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  476. # sleep for sometime
  477. time.sleep(3)
  478. # resume the job
  479. self.server.sigjob(jid1, 'admin-resume', runas=ROOT_USER)
  480. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  481. self.server.expect(NODE, {'state': 'free'}, id='vn[0]')
  482. def test_custom_res(self):
  483. """
  484. Test that job will not run on a node in
  485. maintenance state if explicitly asking
  486. for a resource on that node
  487. """
  488. # create multiple vnodes
  489. a = {'resources_available.ncpus': 4, 'resources_available.mem': '4gb'}
  490. self.server.create_vnodes('vn', a, 3, self.mom, usenatvnode=True)
  491. # create a node level resource
  492. self.server.manager(
  493. MGR_CMD_CREATE, RSC, {'type': 'float', 'flag': 'nh'}, id="foo",
  494. runas=ROOT_USER)
  495. # set foo on vn[1]
  496. self.server.manager(
  497. MGR_CMD_SET, NODE, {'resources_available.foo': 5}, id='vn[1]',
  498. runas=ROOT_USER)
  499. # set foo in sched_config
  500. self.scheduler.add_resource('foo')
  501. # submit a few jobs
  502. j = Job(TEST_USER)
  503. j.set_attributes({'Resource_List.select': 'vnode=vn[1]'})
  504. jid1 = self.server.submit(j)
  505. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid1)
  506. # admin suspend the job to put the node to maintenance
  507. self.server.sigjob(jid1, 'admin-suspend', runas=ROOT_USER)
  508. self.server.expect(JOB, {'job_state': 'S'}, id=jid1)
  509. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[1]')
  510. # submit other jobs asking for specific resources on vn[1]
  511. j = Job(TEST_USER)
  512. j.set_attributes({'Resource_List.foo': '2'})
  513. jid2 = self.server.submit(j)
  514. self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
  515. # submit more jobs. They should be running
  516. j = Job(TEST_USER)
  517. jid3 = self.server.submit(j)
  518. jid4 = self.server.submit(j)
  519. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid3)
  520. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid4)
  521. # verify that vn[1] is still in maintenance and
  522. # job3 and job4 not running on vn[1]
  523. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[1]')
  524. try:
  525. self.server.expect(JOB, {'exec_vnode': (MATCH_RE, 'vn[1]')},
  526. id=jid3, max_attempts=20)
  527. self.server.expect(JOB, {'exec_vnode': (MATCH_RE, 'vn[1]')},
  528. id=jid4, max_attempts=20)
  529. except Exception as e:
  530. self.assertFalse(e.rv)
  531. print ("jid3 and jid4 not running on vn[1] as expected")
  532. def test_list_jobs_1(self):
  533. """
  534. Test to list and set maintenance_jobs as various users
  535. """
  536. # This test is run with CLI mode only
  537. _m = self.server.get_op_mode()
  538. if _m != PTL_CLI:
  539. self.skipTest("Not all commands can be run with API mode")
  540. # submit a few jobs
  541. j = Job(TEST_USER)
  542. jid1 = self.server.submit(j)
  543. jid2 = self.server.submit(j)
  544. jid3 = self.server.submit(j)
  545. # verify that all are running
  546. self.server.expect(JOB, {'job_state=R': 3, 'substate=42': 3})
  547. # admin-suspend 2 of them
  548. self.server.sigjob(jid2, 'admin-suspend', runas=ROOT_USER)
  549. self.server.sigjob(jid3, 'admin-suspend', runas=ROOT_USER)
  550. # node state is in maintenance
  551. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  552. # list maintenance_jobs as root
  553. self.server.expect(NODE, {'maintenance_jobs': jid2 + "," + jid3},
  554. runas=ROOT_USER)
  555. # list maintenance jobs as user
  556. self.server.expect(NODE, {'maintenance_jobs': jid2 + "," + jid3},
  557. runas=TEST_USER)
  558. # set an operator
  559. self.server.manager(MGR_CMD_SET, SERVER, {'operators': 'pbsoper@*'})
  560. # List all jobs in maintenance mode as operator
  561. self.server.expect(
  562. NODE, {'maintenance_jobs': jid2 + "," + jid3}, runas='pbsoper')
  563. # set maintenance_jobs as root
  564. try:
  565. self.server.manager(MGR_CMD_SET, NODE,
  566. {'maintenance_jobs': jid1}, id='vn[0]',
  567. runas=ROOT_USER)
  568. except PbsManagerError as e:
  569. self.assertFalse(e.rv)
  570. msg = "Cannot set attribute, read only" +\
  571. " or insufficient permission maintenance_jobs"
  572. self.assertTrue(msg in e.msg[0])
  573. # Set maintenance_jobs as operator
  574. try:
  575. self.server.manager(MGR_CMD_SET, NODE,
  576. {'maintenance_jobs': jid1}, id='vn[0]',
  577. runas='pbsoper')
  578. except PbsManagerError as e:
  579. self.assertFalse(e.rv)
  580. msg = "Cannot set attribute, read only" +\
  581. " or insufficient permission maintenance_jobs"
  582. self.assertTrue(msg in e.msg[0])
  583. # Set maintenance_jobs as user
  584. try:
  585. self.server.manager(MGR_CMD_SET, NODE,
  586. {'maintenance_jobs': jid1}, id='vn[0]',
  587. runas=TEST_USER)
  588. except PbsManagerError as e:
  589. self.assertFalse(e.rv)
  590. self.assertTrue("Unauthorized Request" in e.msg[0])
  591. def test_list_jobs_2(self):
  592. """
  593. Test to list maintenance_jobs when no job is admin-suspended
  594. """
  595. # Submit a few jobs
  596. j = Job(TEST_USER)
  597. jid1 = self.server.submit(j)
  598. jid2 = self.server.submit(j)
  599. jid3 = self.server.submit(j)
  600. # verify that all are running
  601. self.server.expect(JOB, {'job_state=R': 3, 'substate=42': 3})
  602. # list maintenance_jobs. It should be empty
  603. self.server.expect(NODE, 'maintenance_jobs', op=UNSET, id='vn[0]')
  604. # Regular suspend a job
  605. self.server.sigjob(jid2, 'suspend', runas=ROOT_USER)
  606. # List maintenance_jobs again
  607. self.server.expect(NODE, 'maintenance_jobs', op=UNSET, id='vn[0]')
  608. def test_preempt_order(self):
  609. """
  610. Test that scheduler preempt_order has no impact
  611. on admin-suspend
  612. """
  613. # create a high priority queue
  614. a = {'queue_type': 'e', 'enabled': 't', 'started': 't',
  615. 'priority': 150}
  616. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id="highp")
  617. # set preempt_order to R
  618. self.scheduler.set_sched_config({'preempt_order': 'R'})
  619. # submit a job
  620. j = Job(TEST_USER)
  621. j.set_attributes({'Resource_List.select': 'vnode=vn[0]'})
  622. jid1 = self.server.submit(j)
  623. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid1)
  624. # submit a high priority job
  625. j = Job(TEST_USER)
  626. j.set_attributes({'queue': 'highp', 'Resource_List.select':
  627. '1:ncpus=4:vnode=vn[0]'})
  628. jid2 = self.server.submit(j)
  629. # job2 is running and job1 is requeued
  630. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid2)
  631. self.server.expect(JOB, {'job_state': 'Q'}, id=jid1)
  632. # admin-suspend job1. It will fail
  633. try:
  634. self.server.sigjob(jid1, 'admin-suspend', logerr=False)
  635. except Exception as e:
  636. self.assertFalse(e.rv)
  637. # admin suspend job2
  638. self.server.sigjob(jid2, 'admin-suspend')
  639. self.server.expect(JOB, {'job_state': 'S'}, id=jid2)
  640. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  641. # admin-resume job2. node state will become job-busy.
  642. self.server.sigjob(jid2, 'admin-resume')
  643. self.server.expect(NODE, {'state': 'job-busy'}, id='vn[0]')
  644. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid2)
  645. self.server.expect(JOB, {'job_state': 'Q'}, id=jid1)
  646. def test_hook(self):
  647. """
  648. List maintenance_jobs via hook
  649. """
  650. # Create and import a hook
  651. hook_name = "test"
  652. hook_body = """
  653. import pbs
  654. vn = pbs.server().vnode('vn[0]')
  655. pbs.logmsg(pbs.LOG_DEBUG,\
  656. "list of maintenance_jobs are %s" % vn.maintenance_jobs)
  657. """
  658. a = {'event': 'exechost_periodic', 'enabled': 'True', 'freq': 5}
  659. self.server.create_import_hook(hook_name, a, hook_body)
  660. # submit few jobs
  661. j = Job(TEST_USER)
  662. jid1 = self.server.submit(j)
  663. jid2 = self.server.submit(j)
  664. self.server.expect(JOB, {'job_state=R': 2})
  665. # wait for the periodic hook
  666. time.sleep(5)
  667. # look for the log message
  668. self.mom.log_match("list of maintenance_jobs are None")
  669. # admin-suspend jobs
  670. self.server.sigjob(jid1, 'admin-suspend')
  671. self.server.sigjob(jid2, 'admin-suspend')
  672. # wait for periodic hook and check mom_log
  673. time.sleep(5)
  674. self.mom.log_match("list of maintenance_jobs are %s" %
  675. ((jid1 + "," + jid2),))
  676. # admin-resume job1
  677. self.server.sigjob(jid1, 'admin-resume')
  678. # wait for periodic hook and check mom_log
  679. time.sleep(5)
  680. self.mom.log_match(
  681. "list of maintenance_jobs are %s" % (jid2,))
  682. def test_offline(self):
  683. """
  684. Test that if a node is put to offline
  685. and removed from maintenance state it
  686. remains offlined
  687. """
  688. # submit a job and admin-suspend it
  689. j1 = Job(TEST_USER)
  690. jid1 = self.server.submit(j1)
  691. j2 = Job(TEST_USER)
  692. jid2 = self.server.submit(j2)
  693. self.server.expect(JOB, {'job_state': "R", 'substate': 42}, id=jid1)
  694. self.server.expect(JOB, {'job_state': "R", 'substate': 42}, id=jid2)
  695. self.server.sigjob(jid1, 'admin-suspend')
  696. self.server.sigjob(jid2, 'admin-suspend')
  697. # node state is in maintenance
  698. self.server.expect(NODE, {'state': 'maintenance'}, id='vn[0]')
  699. # submit another job. It will be queued
  700. j3 = Job(TEST_USER)
  701. jid3 = self.server.submit(j3)
  702. self.server.expect(JOB, {'job_state': 'Q'}, id=jid3)
  703. # mark the node as offline too
  704. self.server.manager(MGR_CMD_SET, NODE, {'state': 'offline'},
  705. id='vn[0]')
  706. # delete job1 as user and resume job2
  707. self.server.deljob(jid1, wait=True, runas=TEST_USER)
  708. self.server.sigjob(jid2, 'admin-resume')
  709. # verify that node state is offline and
  710. # job3 is still queued
  711. self.server.expect(NODE, {'state': 'offline'}, id='vn[0]')
  712. self.server.expect(JOB, {'job_state': 'R', 'substate': 42}, id=jid2)
  713. self.server.expect(JOB, {'job_state': 'Q'}, id=jid3)