pbs_job_array.py 27 KB


  1. # coding: utf-8
  2. # Copyright (C) 1994-2018 Altair Engineering, Inc.
  3. # For more information, contact Altair at www.altair.com.
  4. #
  5. # This file is part of the PBS Professional ("PBS Pro") software.
  6. #
  7. # Open Source License Information:
  8. #
  9. # PBS Pro is free software. You can redistribute it and/or modify it under the
  10. # terms of the GNU Affero General Public License as published by the Free
  11. # Software Foundation, either version 3 of the License, or (at your option) any
  12. # later version.
  13. #
  14. # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
  15. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16. # FOR A PARTICULAR PURPOSE.
  17. # See the GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. #
  22. # Commercial License Information:
  23. #
  24. # For a copy of the commercial license terms and conditions,
  25. # go to: (http://www.pbspro.com/UserArea/agreement.html)
  26. # or contact the Altair Legal Department.
  27. #
  28. # Altair’s dual-license business model allows companies, individuals, and
  29. # organizations to create proprietary derivative works of PBS Pro and
  30. # distribute them - whether embedded or bundled with other software -
  31. # under a commercial license agreement.
  32. #
  33. # Use of Altair’s trademarks, including but not limited to "PBS™",
  34. # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
  35. # trademark licensing policies.
  36. from tests.functional import *
  37. class TestJobArray(TestFunctional):
  38. """
  39. Test suite for PBSPro's job array feature
  40. """
  41. def test_arrayjob_Erecord_startval(self):
  42. """
  43. Check that an arrayjob's E record's 'start' value is not set to 0
  44. """
  45. j = Job(TEST_USER, attrs={
  46. ATTR_J: '1-2',
  47. 'Resource_List.select': 'ncpus=1'
  48. })
  49. j.set_sleep_time(1)
  50. j_id = self.server.submit(j)
  51. # Check for the E record for the arrayjob
  52. acct_string = ";E;" + str(j_id)
  53. _, record = self.server.accounting_match(acct_string, max_attempts=10,
  54. interval=1)
  55. # Extract the 'start' value from the E record
  56. values = record.split(";", 3)[3]
  57. start_str = " start="
  58. values_temp = values.split(start_str, 1)[1]
  59. start_val = int(values_temp.split()[0])
  60. # Verify that the value of 'start' isn't 0
  61. self.assertNotEqual(start_val, 0,
  62. "E record value of 'start' for arrayjob is 0")
  63. def kill_and_restart_svr(self):
  64. try:
  65. self.server.stop('-KILL')
  66. except PbsServiceError as e:
  67. # The server failed to stop
  68. raise self.failureException("Server failed to stop:" + e.msg)
  69. try:
  70. self.server.start()
  71. except PbsServiceError as e:
  72. # The server failed to start
  73. raise self.failureException("Server failed to start:" + e.msg)
  74. self.server.isUp()
  75. rv = self.is_server_licensed(self.server)
  76. _msg = 'No license found on server %s' % (self.server.shortname)
  77. self.assertTrue(rv, _msg)
  78. def test_running_subjob_survive_restart(self):
  79. """
  80. Test to check if a running subjob of an array job survive a
  81. pbs_server restart
  82. """
  83. a = {'resources_available.ncpus': 1}
  84. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  85. j = Job(TEST_USER, attrs={
  86. ATTR_J: '1-3', 'Resource_List.select': 'ncpus=1'})
  87. j.set_sleep_time(20)
  88. j_id = self.server.submit(j)
  89. subjid_2 = j.create_subjob_id(j_id, 2)
  90. # 1. check job array has begun
  91. self.server.expect(JOB, {'job_state': 'B'}, j_id)
  92. # 2. wait till subjob 2 starts running
  93. self.server.expect(JOB, {'job_state': 'R'}, subjid_2, offset=20)
  94. # 3. Kill and restart the server
  95. self.kill_and_restart_svr()
  96. # 4. array job should be B
  97. self.server.expect(JOB, {'job_state': 'B'}, j_id, max_attempts=1)
  98. # 5. subjob 1 should be X
  99. self.server.expect(JOB, {'job_state': 'X'},
  100. j.create_subjob_id(j_id, 1), max_attempts=1)
  101. # 6. subjob 2 should be R
  102. self.server.expect(JOB, {'job_state': 'R'}, subjid_2, max_attempts=1)
  103. # 7. subjob 3 should be Q
  104. self.server.expect(JOB, {'job_state': 'Q'},
  105. j.create_subjob_id(j_id, 3), max_attempts=1)
  106. def test_running_subjob_survive_restart_with_history(self):
  107. """
  108. Test to check if a running subjob of an array job survive a
  109. pbs_server restart when history is enabled
  110. """
  111. attr = {'job_history_enable': 'true'}
  112. self.server.manager(MGR_CMD_SET, SERVER, attr)
  113. self.test_running_subjob_survive_restart()
  114. def test_suspended_subjob_survive_restart(self):
  115. """
  116. Test to check if a suspended subjob of an array job survive a
  117. pbs_server restart
  118. """
  119. a = {'resources_available.ncpus': 1}
  120. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  121. j = Job(TEST_USER, attrs={
  122. ATTR_J: '1-3', 'Resource_List.select': 'ncpus=1'})
  123. j.set_sleep_time(10)
  124. j_id = self.server.submit(j)
  125. subjid_2 = j.create_subjob_id(j_id, 2)
  126. # 1. check job array has begun
  127. self.server.expect(JOB, {'job_state': 'B'}, j_id)
  128. # 2. wait till subjob_2 starts running
  129. self.server.expect(JOB, {'job_state': 'R'}, subjid_2)
  130. try:
  131. self.server.sigjob(subjid_2, 'suspend')
  132. except PbsSignalError as e:
  133. raise self.failureException("Failed to suspend subjob:" + e.msg)
  134. self.server.expect(JOB, {'job_state': 'S'}, subjid_2, max_attempts=1)
  135. # 3. Kill and restart the server
  136. self.kill_and_restart_svr()
  137. # 4. array job should be B
  138. self.server.expect(JOB, {'job_state': 'B'}, j_id, max_attempts=1)
  139. # 5. subjob_2 should be S
  140. self.server.expect(JOB, {'job_state': 'S'}, subjid_2, max_attempts=1)
  141. try:
  142. self.server.sigjob(subjid_2, 'resume')
  143. except PbsSignalError as e:
  144. raise self.failureException("Failed to resume subjob:" + e.msg)
  145. def test_suspended_subjob_survive_restart_with_history(self):
  146. """
  147. Test to check if a suspended subjob of an array job survive a
  148. pbs_server restart when history is enabled
  149. """
  150. attr = {'job_history_enable': 'true'}
  151. self.server.manager(MGR_CMD_SET, SERVER, attr)
  152. self.test_suspended_subjob_survive_restart()
  153. def test_deleted_q_subjob_survive_restart(self):
  154. """
  155. Test to check if a deleted queued subjob of an array job survive a
  156. pbs_server restart when history is disabled
  157. """
  158. a = {'resources_available.ncpus': 1}
  159. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  160. j = Job(TEST_USER, attrs={
  161. ATTR_J: '1-3', 'Resource_List.select': 'ncpus=1'})
  162. j.set_sleep_time(10)
  163. j_id = self.server.submit(j)
  164. subjid_3 = j.create_subjob_id(j_id, 3)
  165. self.server.expect(JOB, {'job_state': 'B'}, j_id)
  166. self.server.deljob(subjid_3)
  167. self.server.expect(JOB, {'job_state': 'X'}, subjid_3)
  168. self.kill_and_restart_svr()
  169. self.server.expect(JOB, {'job_state': 'B'}, j_id, max_attempts=1)
  170. self.server.expect(JOB, {'job_state': 'X'}, subjid_3, max_attempts=1)
  171. def test_deleted_q_subjob_survive_restart_w_history(self):
  172. """
  173. Test to check if a deleted queued subjob of an array job survive a
  174. pbs_server restart when history is enabled
  175. """
  176. attr = {'job_history_enable': 'true'}
  177. self.server.manager(MGR_CMD_SET, SERVER, attr)
  178. self.test_deleted_q_subjob_survive_restart()
  179. def test_deleted_r_subjob_survive_restart(self):
  180. """
  181. Test to check if a deleted running subjob of an array job survive a
  182. pbs_server restart when history is disabled
  183. """
  184. a = {'resources_available.ncpus': 1}
  185. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  186. j = Job(TEST_USER, attrs={
  187. ATTR_J: '1-3', 'Resource_List.select': 'ncpus=1'})
  188. j.set_sleep_time(10)
  189. j_id = self.server.submit(j)
  190. subjid_1 = j.create_subjob_id(j_id, 1)
  191. self.server.expect(JOB, {'job_state': 'B'}, j_id)
  192. self.server.expect(JOB, {'job_state': 'R'}, subjid_1)
  193. self.server.deljob(subjid_1)
  194. self.server.expect(JOB, {'job_state': 'X'}, subjid_1)
  195. self.kill_and_restart_svr()
  196. self.server.expect(JOB, {'job_state': 'B'}, j_id, max_attempts=1)
  197. self.server.expect(JOB, {'job_state': 'X'}, subjid_1, max_attempts=1)
  198. def test_deleted_r_subjob_survive_restart_w_history(self):
  199. """
  200. Test to check if a deleted running subjob of an array job survive a
  201. pbs_server restart when history is enabled
  202. """
  203. attr = {'job_history_enable': 'true'}
  204. self.server.manager(MGR_CMD_SET, SERVER, attr)
  205. self.test_deleted_q_subjob_survive_restart()
  206. def test_qdel_expired_subjob(self):
  207. """
  208. Test to check if qdel of a subjob is disallowed
  209. """
  210. attr = {'job_history_enable': 'true'}
  211. self.server.manager(MGR_CMD_SET, SERVER, attr)
  212. a = {'resources_available.ncpus': 1}
  213. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  214. j = Job(TEST_USER, attrs={
  215. ATTR_J: '1-3', 'Resource_List.select': 'ncpus=1'})
  216. j.set_sleep_time(5)
  217. j_id = self.server.submit(j)
  218. subjid_1 = j.create_subjob_id(j_id, 1)
  219. # 1. check job array has begun
  220. self.server.expect(JOB, {'job_state': 'B'}, j_id)
  221. # 2. wait till subjob 1 becomes expired
  222. self.server.expect(JOB, {'job_state': 'X'}, subjid_1)
  223. try:
  224. self.server.deljob(subjid_1)
  225. except PbsDeljobError as e:
  226. err_msg = "Request invalid for finished array subjob"
  227. self.assertTrue(err_msg in e.msg[0],
  228. "Error message is not expected")
  229. else:
  230. raise self.failureException("subjob in X state can be deleted")
  231. try:
  232. self.server.deljob(subjid_1, extend="deletehist")
  233. except PbsDeljobError as e:
  234. err_msg = "Request invalid for finished array subjob"
  235. self.assertTrue(err_msg in e.msg[0],
  236. "Error message is not expected")
  237. else:
  238. raise self.failureException("subjob in X state can be deleted")
  239. def test_subjob_comments(self):
  240. """
  241. Test subjob comments for finished and terminated subjobs
  242. """
  243. a = {'resources_available.ncpus': 1}
  244. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  245. j = Job(TEST_USER, attrs={
  246. ATTR_J: '1-30', 'Resource_List.select': 'ncpus=1'})
  247. j.set_sleep_time(8)
  248. j_id = self.server.submit(j)
  249. subjid_1 = j.create_subjob_id(j_id, 1)
  250. subjid_2 = j.create_subjob_id(j_id, 2)
  251. self.server.expect(JOB, {'comment': 'Subjob finished'}, subjid_1,
  252. offset=8)
  253. self.server.delete(subjid_2, extend='force')
  254. self.server.expect(JOB, {'comment': 'Subjob terminated'}, subjid_2)
  255. self.kill_and_restart_svr()
  256. self.server.expect(
  257. JOB, {'comment': 'Subjob finished'}, subjid_1, max_attempts=1)
  258. def test_subjob_comments_with_history(self):
  259. """
  260. Test subjob comments for finished, failed and terminated subjobs
  261. when history is enabled
  262. """
  263. a = {'resources_available.ncpus': 1}
  264. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  265. a = {'job_history_enable': 'True'}
  266. self.server.manager(MGR_CMD_SET, SERVER, a)
  267. j = Job(TEST_USER, attrs={
  268. ATTR_J: '1-2', 'Resource_List.select': 'ncpus=1'})
  269. j.set_sleep_time(5)
  270. j_id = self.server.submit(j)
  271. subjid_1 = j.create_subjob_id(j_id, 1)
  272. subjid_2 = j.create_subjob_id(j_id, 2)
  273. self.server.delete(subjid_2, extend='force')
  274. self.server.expect(
  275. JOB, {'comment': (MATCH_RE, 'terminated')}, subjid_2, extend='x')
  276. self.server.expect(JOB, {'comment': (
  277. MATCH_RE, 'Job run at.*and finished')}, subjid_1, extend='x')
  278. self.kill_and_restart_svr()
  279. self.server.expect(JOB, {'comment': (
  280. MATCH_RE, 'Job run at.*and finished')}, subjid_1, extend='x',
  281. max_attempts=1)
  282. script_body = "exit 1"
  283. j = Job(TEST_USER, attrs={
  284. ATTR_J: '1-2', 'Resource_List.select': 'ncpus=1'})
  285. j.create_script(body=script_body)
  286. j_id = self.server.submit(j)
  287. subjid_1 = j.create_subjob_id(j_id, 1)
  288. subjid_2 = j.create_subjob_id(j_id, 2)
  289. self.server.expect(
  290. JOB, {'comment': (MATCH_RE, 'Job run at.*and failed')}, subjid_1,
  291. extend='x')
  292. self.server.expect(
  293. JOB, {'comment': (MATCH_RE, 'Job run at.*and failed')}, subjid_2,
  294. extend='x')
  295. self.kill_and_restart_svr()
  296. self.server.expect(
  297. JOB, {'comment': (MATCH_RE, 'Job run at.*and failed')}, subjid_1,
  298. extend='x', max_attempts=1)
  299. self.server.expect(
  300. JOB, {'comment': (MATCH_RE, 'Job run at.*and failed')}, subjid_2,
  301. extend='x')
  302. def test_multiple_server_restarts(self):
  303. """
  304. Test subjobs wont rerun after multiple server restarts
  305. """
  306. a = {'resources_available.ncpus': 1}
  307. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  308. a = {'job_history_enable': 'True'}
  309. self.server.manager(MGR_CMD_SET, SERVER, a)
  310. j = Job(TEST_USER, attrs={
  311. ATTR_J: '1-2', 'Resource_List.select': 'ncpus=1'})
  312. j_id = self.server.submit(j)
  313. subjid_1 = j.create_subjob_id(j_id, 1)
  314. a = {'job_state': 'R', 'run_count': 1}
  315. self.server.expect(JOB, a, subjid_1, attrop=PTL_AND)
  316. for _ in range(5):
  317. self.kill_and_restart_svr()
  318. self.server.expect(
  319. JOB, a, subjid_1, attrop=PTL_AND, max_attempts=1)
  320. def test_job_array_history_duration(self):
  321. """
  322. Test that job array and subjobs are purged after history duration
  323. """
  324. a = {'resources_available.ncpus': 1}
  325. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  326. a = {'job_history_enable': 'True'}
  327. self.server.manager(MGR_CMD_SET, SERVER, a)
  328. a = {'job_history_duration': 30}
  329. self.server.manager(MGR_CMD_SET, SERVER, a)
  330. j = Job(TEST_USER, attrs={
  331. ATTR_J: '1-2', 'Resource_List.select': 'ncpus=1'})
  332. j.set_sleep_time(5)
  333. j_id = self.server.submit(j)
  334. subjid_1 = j.create_subjob_id(j_id, 1)
  335. subjid_2 = j.create_subjob_id(j_id, 2)
  336. a = {'job_state': 'R', 'run_count': 1}
  337. self.server.expect(JOB, a, subjid_1, attrop=PTL_AND)
  338. self.server.delete(subjid_1, extend='force')
  339. b = {'job_state': 'X'}
  340. self.server.expect(JOB, b, subjid_1)
  341. self.server.expect(JOB, a, subjid_2, attrop=PTL_AND)
  342. msg = "Waiting for 150 secs as server will purge db once"
  343. msg += " in 2 mins plus 30 sec of history duration"
  344. self.logger.info(msg)
  345. self.server.expect(JOB, 'job_state', op=UNSET,
  346. id=subjid_1, offset=150, extend='x')
  347. self.server.expect(JOB, 'job_state', op=UNSET,
  348. id=subjid_2, extend='x')
  349. def test_queue_deletion_after_terminated_subjob(self):
  350. """
  351. Test that queue can be deleted after the job array is
  352. terminated and server is restarted.
  353. """
  354. a = {'resources_available.ncpus': 1}
  355. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  356. j = Job(TEST_USER, attrs={
  357. ATTR_J: '1-2', 'Resource_List.select': 'ncpus=1'})
  358. j_id = self.server.submit(j)
  359. subjid_1 = j.create_subjob_id(j_id, 1)
  360. a = {'job_state': 'R', 'run_count': 1}
  361. self.server.expect(JOB, a, subjid_1, attrop=PTL_AND)
  362. self.server.delete(subjid_1, extend='force')
  363. self.kill_and_restart_svr()
  364. self.server.delete(j_id, wait=True)
  365. self.server.manager(MGR_CMD_DELETE, QUEUE, id='workq')
  366. def test_held_job_array_survive_server_restart(self):
  367. """
  368. Test held job array can be released after server restart
  369. """
  370. a = {'resources_available.ncpus': 1}
  371. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  372. j = Job(TEST_USER, attrs={
  373. ATTR_J: '1-2', 'Resource_List.select': 'ncpus=1'})
  374. j.set_sleep_time(60)
  375. j_id = self.server.submit(j)
  376. j_id2 = self.server.submit(j)
  377. subjid_1 = j.create_subjob_id(j_id, 1)
  378. subjid_3 = j.create_subjob_id(j_id2, 1)
  379. a = {'job_state': 'R', 'run_count': 1}
  380. self.server.expect(JOB, a, subjid_1, attrop=PTL_AND)
  381. self.server.holdjob(j_id2, USER_HOLD)
  382. self.server.expect(JOB, {'job_state': 'H'}, j_id2)
  383. self.kill_and_restart_svr()
  384. self.server.delete(j_id, wait=True)
  385. self.server.expect(JOB, {'job_state': 'H'}, j_id2)
  386. self.server.rlsjob(j_id2, USER_HOLD)
  387. self.server.expect(JOB, {'job_state': 'B'}, j_id2)
  388. self.server.expect(JOB, a, subjid_3, attrop=PTL_AND)
  389. def test_held_job_array_survive_server_restart_w_history(self):
  390. """
  391. Test held job array can be released after server restart
  392. when history is enabled
  393. """
  394. a = {'job_history_enable': 'True'}
  395. self.server.manager(MGR_CMD_SET, SERVER, a)
  396. self.test_held_job_array_survive_server_restart()
  397. def test_subjobs_qrun(self):
  398. """
  399. Test that job array's subjobs can be qrun
  400. """
  401. a = {'resources_available.ncpus': 1}
  402. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  403. a = {'scheduling': 'false'}
  404. self.server.manager(MGR_CMD_SET, SERVER, a)
  405. j = Job(TEST_USER, attrs={
  406. ATTR_J: '1-2', 'Resource_List.select': 'ncpus=1'})
  407. j.set_sleep_time(60)
  408. j_id = self.server.submit(j)
  409. subjid_1 = j.create_subjob_id(j_id, 1)
  410. self.server.runjob(subjid_1)
  411. self.server.expect(JOB, {'job_state': 'B'}, j_id)
  412. self.server.expect(JOB, {'job_state': 'R'}, subjid_1)
  413. def test_dependent_job_array_server_restart(self):
  414. """
  415. Check Job array dependency is not released after server restart
  416. """
  417. a = {'job_history_enable': 'true'}
  418. self.server.manager(MGR_CMD_SET, SERVER, a)
  419. a = {'resources_available.ncpus': 2}
  420. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  421. j = Job(TEST_USER, attrs={
  422. ATTR_J: '1-2', 'Resource_List.select': 'ncpus=1'})
  423. j.set_sleep_time(10)
  424. j_id = self.server.submit(j)
  425. subjid_1 = j.create_subjob_id(j_id, 1)
  426. subjid_2 = j.create_subjob_id(j_id, 2)
  427. self.server.expect(JOB, {'job_state': 'B'}, j_id)
  428. self.server.expect(JOB, {'job_state': 'R'}, subjid_1)
  429. self.server.expect(JOB, {'job_state': 'R'}, subjid_2)
  430. depend_value = 'afterok:' + j_id
  431. j = Job(TEST_USER, attrs={
  432. ATTR_J: '1-2', 'Resource_List.select': 'ncpus=1',
  433. ATTR_depend: depend_value})
  434. j_id2 = self.server.submit(j)
  435. self.server.expect(JOB, {'job_state': 'H'}, j_id2)
  436. self.kill_and_restart_svr()
  437. self.server.expect(JOB, {'job_state': 'F'},
  438. j_id, extend='x', interval=5)
  439. self.server.expect(JOB, {'job_state': 'B'}, j_id2, interval=5)
  440. def test_rerun_subjobs_server_restart(self):
  441. """
  442. Test that subjobs which are requeued remain queued after server restart
  443. """
  444. a = {'job_history_enable': 'true'}
  445. self.server.manager(MGR_CMD_SET, SERVER, a)
  446. a = {'resources_available.ncpus': 1}
  447. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  448. j = Job(TEST_USER, attrs={
  449. ATTR_J: '1-2', 'Resource_List.select': 'ncpus=1'})
  450. j.set_sleep_time(60)
  451. j_id = self.server.submit(j)
  452. subjid_1 = j.create_subjob_id(j_id, 1)
  453. self.server.expect(JOB, {'job_state': 'R'}, subjid_1)
  454. a = {'scheduling': 'false'}
  455. self.server.manager(MGR_CMD_SET, SERVER, a)
  456. self.server.rerunjob(subjid_1)
  457. self.server.expect(JOB, {'job_state': 'Q'}, subjid_1)
  458. self.kill_and_restart_svr()
  459. self.server.expect(JOB, {'job_state': 'Q'}, subjid_1)
  460. a = {'scheduling': 'true'}
  461. self.server.manager(MGR_CMD_SET, SERVER, a)
  462. a = {'job_state': 'R'}
  463. self.server.expect(JOB, a, subjid_1)
  464. def test_rerun_node_fail_requeue(self):
  465. """
  466. Test sub jobs gets requeued after node_fail_requeue time
  467. """
  468. a = {'node_fail_requeue': 10}
  469. self.server.manager(MGR_CMD_SET, SERVER, a)
  470. a = {'resources_available.ncpus': 1}
  471. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  472. j = Job(TEST_USER, attrs={
  473. ATTR_J: '1-2', 'Resource_List.select': 'ncpus=1'})
  474. j.set_sleep_time(60)
  475. j_id = self.server.submit(j)
  476. subjid_1 = j.create_subjob_id(j_id, 1)
  477. self.server.expect(JOB, {'job_state': 'R'}, subjid_1)
  478. self.mom.stop()
  479. self.server.expect(JOB, {'job_state': 'Q'}, subjid_1, offset=5)
  480. def test_qmove_job_array(self):
  481. """
  482. Test job array's can be qmoved to a high priority queue
  483. and qmoved job array preempts running subjob
  484. """
  485. a = {'queue_type': 'execution',
  486. 'started': 'True',
  487. 'enabled': 'True',
  488. 'priority': 150}
  489. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq1')
  490. a = {'job_history_enable': 'true'}
  491. self.server.manager(MGR_CMD_SET, SERVER, a)
  492. a = {'resources_available.ncpus': 1}
  493. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  494. j = Job(TEST_USER, attrs={
  495. ATTR_J: '1-2', 'Resource_List.select': 'ncpus=1'})
  496. j.set_sleep_time(60)
  497. j_id = self.server.submit(j)
  498. subjid_1 = j.create_subjob_id(j_id, 1)
  499. self.server.expect(JOB, {'job_state': 'R'}, subjid_1)
  500. j_id2 = self.server.submit(j)
  501. subjid_3 = j.create_subjob_id(j_id2, 1)
  502. self.server.movejob(j_id2, 'wq1')
  503. a = {'scheduling': 'true'}
  504. self.server.manager(MGR_CMD_SET, SERVER, a)
  505. self.server.expect(JOB, {'job_state': 'S'}, subjid_1)
  506. self.server.expect(JOB, {'job_state': 'R'}, subjid_3)
  507. def test_delete_history_subjob_server_restart(self):
  508. """
  509. Test that subjobs can be deleted from history
  510. and they remain deleted after server restart
  511. """
  512. a = {'job_history_enable': 'true'}
  513. self.server.manager(MGR_CMD_SET, SERVER, a)
  514. a = {'resources_available.ncpus': 2}
  515. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  516. j = Job(TEST_USER, attrs={
  517. ATTR_J: '1-2', 'Resource_List.select': 'ncpus=1'})
  518. j.set_sleep_time(5)
  519. j_id = self.server.submit(j)
  520. self.server.expect(JOB, {'job_state': 'F'}, j_id, extend='x', offset=5)
  521. self.server.delete(j_id, extend='deletehist')
  522. self.server.expect(JOB, 'job_state', op=UNSET, extend='x', id=j_id)
  523. self.kill_and_restart_svr()
  524. self.server.expect(JOB, 'job_state', op=UNSET, extend='x', id=j_id)
  525. def test_job_id_duplicate_server_restart(self):
  526. """
  527. Test that after server restart there is no duplication
  528. of job identifiers
  529. """
  530. a = {'resources_available.ncpus': 1}
  531. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  532. j = Job(TEST_USER, attrs={
  533. ATTR_J: '1-2', 'Resource_List.select': 'ncpus=1'})
  534. self.server.submit(j)
  535. j = Job(TEST_USER)
  536. self.server.submit(j)
  537. self.kill_and_restart_svr()
  538. try:
  539. j = Job(TEST_USER, attrs={
  540. ATTR_J: '1-2', 'Resource_List.select': 'ncpus=1'})
  541. self.server.submit(j)
  542. except PbsSubmitError as e:
  543. raise self.failureException("Failed to submit job: " + str(e.msg))
  544. def test_expired_subjobs_not_reported(self):
  545. """
  546. Test if a subjob is finished and moves to expired state,
  547. it is not reported to scheduler in the next scheduling
  548. cycle. Scheduler expects only running subjobs to be reported to it.
  549. """
  550. a = {'job_history_enable': 'True'}
  551. self.server.manager(MGR_CMD_SET, SERVER, a)
  552. req_node = ":host=" + self.mom.shortname
  553. res_req = {'Resource_List.select': '1:ncpus=1' + req_node,
  554. 'array_indices_submitted': '1-16',
  555. 'Resource_List.place': 'excl'}
  556. j1 = Job(TEST_USER, attrs=res_req)
  557. j1.set_sleep_time(2)
  558. jid1 = self.server.submit(j1)
  559. j1_sub1 = j1.create_subjob_id(jid1, 1)
  560. self.server.expect(JOB, {'job_state': 'X'}, j1_sub1)
  561. # Trigger a sched cycle
  562. a = {'scheduling': 'True'}
  563. self.server.manager(MGR_CMD_SET, SERVER, a)
  564. msg = j1_sub1 + ";" + "Subjob found in undesirable state"
  565. msg += ", ignoring this job"
  566. self.scheduler.log_match(msg, existence=False, max_attempts=10)
  567. def test_subjob_stdfile_custom_dir(self):
  568. """
  569. Test that subjobs standard error and out files are generated
  570. in the custom directory provided with oe qsub options
  571. """
  572. tmp_dir = self.du.mkdtemp(uid=TEST_USER.uid)
  573. a = {ATTR_e: tmp_dir, ATTR_o: tmp_dir, ATTR_J: '1-4'}
  574. j = Job(TEST_USER, attrs=a)
  575. j.set_sleep_time(2)
  576. jid = self.server.submit(j)
  577. self.server.expect(JOB, {ATTR_state: 'B'}, id=jid)
  578. self.server.expect(JOB, ATTR_state, op=UNSET, id=jid)
  579. file_list = [name for name in os.listdir(
  580. tmp_dir) if os.path.isfile(os.path.join(tmp_dir, name))]
  581. self.assertEqual(8, len(file_list), 'expected 8 std files')
  582. for ext in ['.OU', '.ER']:
  583. for sub_ind in range(1, 5):
  584. f_name = j.create_subjob_id(jid, sub_ind) + ext
  585. if f_name not in file_list:
  586. raise self.failureException("std file " + f_name
  587. + " not found")
  588. @skipOnCpuSet
  589. @skipOnCray
  590. def test_subjob_wrong_state(self):
  591. """
  592. Test that after submitting a job and restarting the server,
  593. the subjobs are not in the wrong substate and can be scheduled.
  594. """
  595. a = {'resources_available.ncpus': 200}
  596. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  597. j = Job(attrs={ATTR_J: '1-200'})
  598. self.server.submit(j)
  599. # while the server is sending the jobs to the MoM, restart the server
  600. self.server.restart()
  601. # make sure the mom is free so the scheduler can run jobs on it
  602. self.server.expect(NODE, {'state': 'free'}, id=self.mom.shortname)
  603. self.logger.info('Sleeping to ensure licenses are received')
  604. time.sleep(5)
  605. self.server.manager(MGR_CMD_SET, MGR_OBJ_SERVER,
  606. {'scheduling': 'True'})
  607. # ensure the sched cycle is finished
  608. self.server.manager(MGR_CMD_SET, MGR_OBJ_SERVER,
  609. {'scheduling': 'False'}, expect=True)
  610. # ensure all the subjobs are running
  611. self.server.expect(JOB, {'job_state=R': 200}, extend='t')