pbs_cray_suspend_resume.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. # coding: utf-8
  2. # Copyright (C) 1994-2018 Altair Engineering, Inc.
  3. # For more information, contact Altair at www.altair.com.
  4. #
  5. # This file is part of the PBS Professional ("PBS Pro") software.
  6. #
  7. # Open Source License Information:
  8. #
  9. # PBS Pro is free software. You can redistribute it and/or modify it under the
  10. # terms of the GNU Affero General Public License as published by the Free
  11. # Software Foundation, either version 3 of the License, or (at your option) any
  12. # later version.
  13. #
  14. # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
  15. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16. # FOR A PARTICULAR PURPOSE.
  17. # See the GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. #
  22. # Commercial License Information:
  23. #
  24. # For a copy of the commercial license terms and conditions,
  25. # go to: (http://www.pbspro.com/UserArea/agreement.html)
  26. # or contact the Altair Legal Department.
  27. #
  28. # Altair’s dual-license business model allows companies, individuals, and
  29. # organizations to create proprietary derivative works of PBS Pro and
  30. # distribute them - whether embedded or bundled with other software -
  31. # under a commercial license agreement.
  32. #
  33. # Use of Altair’s trademarks, including but not limited to "PBS™",
  34. # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
  35. # trademark licensing policies.
  36. import time
  37. from tests.functional import *
  38. from ptl.utils.pbs_crayutils import CrayUtils
  39. @tags('cray')
  40. class TestSuspendResumeOnCray(TestFunctional):
  41. """
  42. Test special cases where suspend/resume functionality differs on cray
  43. as compared to other platforms.
  44. This test suite expects the platform to be 'cray' and assumes that
  45. suspend/resume feature is enabled on it.
  46. """
  47. cu = CrayUtils()
  48. def setUp(self):
  49. if not self.du.get_platform().startswith('cray'):
  50. self.skipTest("Test suite only meant to run on a Cray")
  51. TestFunctional.setUp(self)
  52. @tags('cray', 'smoke')
  53. def test_default_restrict_res_to_release_on_suspend_setting(self):
  54. """
  55. Check that on Cray restrict_res_to_release_on_suspend is always set
  56. to 'ncpus' by default
  57. """
  58. # Set restrict_res_to_release_on_suspend server attribute
  59. a = {ATTR_restrict_res_to_release_on_suspend: 'ncpus'}
  60. self.server.expect(SERVER, a)
  61. def test_exclusive_job_not_suspended(self):
  62. """
  63. If a running job is a job with exclusive placement then this job can
  64. not be suspended.
  65. This test is checking for a log message which is an unstable
  66. interface and may need change in future when interface changes.
  67. """
  68. msg_expected = "BASIL;ERROR: ALPS error: apsched: \
  69. at least resid .* is exclusive"
  70. # Submit a job
  71. j = Job(TEST_USER, {ATTR_l + '.select': '1:ncpus=1',
  72. ATTR_l + '.place': 'excl'})
  73. check_after = int(time.time())
  74. jid = self.server.submit(j)
  75. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid)
  76. # suspend job
  77. try:
  78. self.server.sigjob(jobid=jid, signal="suspend")
  79. except PbsSignalError as e:
  80. self.assertTrue("Switching ALPS reservation failed" in e.msg[0])
  81. self.server.expect(JOB, 'exec_host', id=jid, op=SET)
  82. job_stat = self.server.status(JOB, id=jid)
  83. ehost = job_stat[0]['exec_host'].partition('/')[0]
  84. run_mom = self.moms[ehost]
  85. s = run_mom.log_match(msg_expected, starttime=check_after, regexp=True,
  86. max_attempts=10)
  87. self.assertTrue(s)
  88. @tags('cray')
  89. def test_basic_admin_suspend_restart(self):
  90. """
  91. Test basic admin-suspend funcionality for jobs and array jobs with
  92. restart on Cray. The restart will test if the node recovers properly
  93. in maintenance. After turning off scheduling and a server restart, a
  94. subjob is always requeued and node shows up as free.
  95. """
  96. j1 = Job(TEST_USER)
  97. jid1 = self.server.submit(j1)
  98. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  99. qstat = self.server.status(JOB, 'exec_vnode', id=jid1)
  100. vname = qstat[0]['exec_vnode'].partition(':')[0].strip('(')
  101. # admin-suspend regular job
  102. self.server.sigjob(jid1, 'admin-suspend', runas=ROOT_USER)
  103. self.server.expect(JOB, {ATTR_state: 'S'}, id=jid1)
  104. self.server.expect(NODE, {'state': 'maintenance'}, id=vname)
  105. self.server.expect(NODE, {'maintenance_jobs': jid1})
  106. self.server.restart()
  107. self.server.expect(NODE, {'state': 'maintenance'}, id=vname)
  108. self.server.expect(NODE, {'maintenance_jobs': jid1})
  109. # Adding sleep to avoid failure at resume since PBS licenses
  110. # might not be available and as a result resume fails
  111. time.sleep(2)
  112. # admin-resume regular job. Make sure the node retuns to state
  113. # job-exclusive.
  114. self.server.sigjob(jid1, 'admin-resume', runas=ROOT_USER)
  115. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  116. self.server.expect(NODE, {'state': 'job-exclusive'}, id=vname)
  117. self.server.cleanup_jobs()
  118. # admin-suspend job array
  119. jA = Job(TEST_USER, {ATTR_l + '.select': '1:ncpus=1', ATTR_J: '1-2'})
  120. jidA = self.server.submit(jA)
  121. self.server.expect(JOB, {ATTR_state: 'B'}, id=jidA)
  122. subjobs = self.server.status(JOB, id=jidA, extend='t')
  123. # subjobs[0] is the array itself. Need the subjobs
  124. jid1 = subjobs[1]['id']
  125. jid2 = subjobs[2]['id']
  126. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  127. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
  128. qstat = self.server.status(JOB, 'exec_vnode', id=jid1)
  129. vname1 = qstat[0]['exec_vnode'].partition(':')[0].strip('(')
  130. qstat = self.server.status(JOB, 'exec_vnode', id=jid2)
  131. vname2 = qstat[0]['exec_vnode'].partition(':')[0].strip('(')
  132. # admin-suspend subjob 1
  133. self.server.sigjob(jid1, 'admin-suspend', runas=ROOT_USER)
  134. self.server.expect(JOB, {ATTR_state: 'S'}, id=jid1)
  135. self.server.expect(NODE, {'state': 'maintenance'}, id=vname1)
  136. self.server.expect(NODE, {'maintenance_jobs': jid1})
  137. # admin-resume subjob 1 . Make sure the node retuns to state
  138. # job-exclusive.
  139. self.server.sigjob(jid1, 'admin-resume', runas=ROOT_USER)
  140. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  141. self.server.expect(NODE, {'state': 'job-exclusive'}, id=vname1)
  142. # admin-suspend subjob 2
  143. self.server.sigjob(jid2, 'admin-suspend', runas=ROOT_USER)
  144. self.server.expect(JOB, {ATTR_state: 'S'}, id=jid2)
  145. self.server.expect(NODE, {'state': 'maintenance'}, id=vname2)
  146. self.server.expect(NODE, {'maintenance_jobs': jid2})
  147. # Turn off scheduling and restart server
  148. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'})
  149. self.server.restart()
  150. # Check that nodes are now free
  151. self.server.expect(NODE, {'state': 'free'}, id=vname1)
  152. self.server.expect(NODE, {'state': 'free'}, id=vname2)
  153. def test_admin_suspend_wrong_state(self):
  154. """
  155. Check that wrong 'resume' signal is correctly rejected.
  156. """
  157. j1 = Job(TEST_USER)
  158. jid1 = self.server.submit(j1)
  159. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  160. self.server.sigjob(jid1, "suspend", runas=ROOT_USER)
  161. self.server.expect(JOB, {ATTR_state: 'S'}, id=jid1)
  162. try:
  163. self.server.sigjob(jid1, "admin-resume", runas=ROOT_USER)
  164. except PbsSignalError as e:
  165. self.assertTrue(
  166. 'Job can not be resumed with the requested resume signal'
  167. in e.msg[0])
  168. self.server.expect(JOB, {ATTR_state: 'S'}, id=jid1)
  169. j2 = Job(TEST_USER)
  170. jid2 = self.server.submit(j2)
  171. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
  172. self.server.sigjob(jid2, "admin-suspend", runas=ROOT_USER)
  173. self.server.expect(JOB, {ATTR_state: 'S', ATTR_substate: 43}, id=jid2)
  174. try:
  175. self.server.sigjob(jid2, "resume", runas=ROOT_USER)
  176. except PbsSignalError as e:
  177. self.assertTrue(
  178. 'Job can not be resumed with the requested resume signal'
  179. in e.msg[0])
  180. # The job should be in the same state as it was prior to the signal
  181. self.server.expect(JOB, {ATTR_state: 'S', ATTR_substate: 43}, id=jid2)
  182. def submit_resv(self, resv_start, chunks, resv_dur):
  183. """
  184. Function to request a PBS reservation with start time, chunks and
  185. duration as arguments.
  186. """
  187. a = {'Resource_List.select': '%d:ncpus=1:vntype=cray_compute' % chunks,
  188. 'Resource_List.place': 'scatter',
  189. 'reserve_start': int(resv_start),
  190. 'reserve_duration': int(resv_dur)
  191. }
  192. r = Reservation(TEST_USER, attrs=a)
  193. rid = self.server.submit(r)
  194. try:
  195. a = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  196. d = self.server.expect(RESV, a, id=rid)
  197. except PtlExpectError, e:
  198. d = e.rv
  199. return d
  200. @timeout(300)
  201. def test_preempt_STF(self):
  202. """
  203. Test shrink to fit by creating a reservation for all compute nodes
  204. starting in 100 sec. with a duration of two hours. A preempted STF job
  205. with min_walltime of 1 min. and max_walltime of 2 hours will stay
  206. suspended after higher priority job goes away if its
  207. min_walltime can't be satisfied.
  208. """
  209. qname = 'highp'
  210. a = {'queue_type': 'execution'}
  211. self.server.manager(MGR_CMD_CREATE, QUEUE, a, qname)
  212. a = {'enabled': 'True', 'started': 'True', 'priority': '150'}
  213. self.server.manager(MGR_CMD_SET, QUEUE, a, qname)
  214. # Reserve all the compute nodes
  215. nv = self.cu.num_compute_vnodes(self.server)
  216. self.assertNotEqual(nv, 0, "There are no cray_compute vnodes present.")
  217. now = time.time()
  218. resv_start = now + 100
  219. resv_dur = 7200
  220. d = self.submit_resv(resv_start, nv, resv_dur)
  221. self.assertTrue(d)
  222. j = Job(TEST_USER, {ATTR_l + '.select': '%d:ncpus=1' % nv,
  223. ATTR_l + '.place': 'scatter',
  224. ATTR_l + '.min_walltime': '00:01:00',
  225. ATTR_l + '.max_walltime': '02:00:00'})
  226. jid = self.server.submit(j)
  227. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid)
  228. self.server.expect(
  229. JOB, {ATTR_l + '.walltime': (LE, '00:01:40')}, id=jid)
  230. self.server.expect(
  231. JOB, {ATTR_l + '.walltime': (GE, '00:01:00')}, id=jid)
  232. # The sleep below will leave less than 1 minute window for jid
  233. # after j2id is deleted. The min_walltime of jid can't be
  234. # satisfied and jid will stay in S state.
  235. time.sleep(35)
  236. j2 = Job(TEST_USER, {ATTR_l + '.select': '%d:ncpus=1' % nv,
  237. ATTR_l + '.walltime': '00:01:00',
  238. ATTR_l + '.place': 'scatter',
  239. ATTR_q: 'highp'})
  240. j2id = self.server.submit(j2)
  241. self.server.expect(JOB, {ATTR_state: 'R'}, id=j2id)
  242. self.server.expect(JOB, {ATTR_state: 'S'}, id=jid)
  243. # The sleep below will leave less than 1 minute window for jid
  244. time.sleep(50)
  245. self.server.delete(j2id)
  246. a = {'scheduling': 'True'}
  247. self.server.manager(MGR_CMD_SET, SERVER, a)
  248. self.server.expect(SERVER, {'server_state': 'Active'})
  249. self.server.expect(JOB, {ATTR_state: 'S'}, id=jid)
  250. def test_multi_express(self):
  251. """
  252. Test of multiple express queues of different priorities.
  253. See that jobs from the higher express queues preempt jobs
  254. from lower express queues. Also see when express jobs finish
  255. (or are deleted), suspended jobs restart.
  256. Make sure loadLimit is set to 4 on the server node:
  257. # apmgr config loadLimit 4
  258. """
  259. _t = ('\"express_queue, normal_jobs, server_softlimits,' +
  260. ' queue_softlimits\"')
  261. a = {'preempt_prio': _t}
  262. self.scheduler.set_sched_config(a)
  263. a = {'queue_type': 'e',
  264. 'started': 'True',
  265. 'enabled': 'True',
  266. 'Priority': 150}
  267. self.server.manager(MGR_CMD_CREATE, QUEUE, a, "expressq")
  268. a['Priority'] = 160
  269. self.server.manager(MGR_CMD_CREATE, QUEUE, a, "expressq2")
  270. a['Priority'] = 170
  271. self.server.manager(MGR_CMD_CREATE, QUEUE, a, "expressq3")
  272. # Count the compute nodes
  273. nv = self.cu.num_compute_vnodes(self.server)
  274. self.assertNotEqual(nv, 0, "There are no cray_compute vnodes present.")
  275. j1 = Job(TEST_USER, {ATTR_l + '.select': '%d:ncpus=1' % nv,
  276. ATTR_l + '.place': 'scatter',
  277. ATTR_l + '.walltime': 3600})
  278. j1id = self.server.submit(j1)
  279. self.server.expect(JOB, {ATTR_state: 'R'}, id=j1id)
  280. j2 = Job(TEST_USER, {ATTR_l + '.select': '%d:ncpus=1' % nv,
  281. ATTR_l + '.place': 'scatter',
  282. ATTR_l + '.walltime': 3600,
  283. ATTR_q: 'expressq'})
  284. j2id = self.server.submit(j2)
  285. self.server.expect(JOB, {ATTR_state: 'S'}, id=j1id)
  286. self.server.expect(JOB, {ATTR_state: 'R'}, id=j2id)
  287. j3 = Job(TEST_USER, {ATTR_l + '.select': '%d:ncpus=1' % nv,
  288. ATTR_l + '.place': 'scatter',
  289. ATTR_l + '.walltime': 3600,
  290. ATTR_q: 'expressq2'})
  291. j3id = self.server.submit(j3)
  292. self.server.expect(JOB, {ATTR_state: 'S'}, id=j2id)
  293. self.server.expect(JOB, {ATTR_state: 'R'}, id=j3id)
  294. j4 = Job(TEST_USER, {ATTR_l + '.select': '%d:ncpus=1' % nv,
  295. ATTR_l + '.place': 'scatter',
  296. ATTR_l + '.walltime': 3600,
  297. ATTR_q: 'expressq3'})
  298. j4id = self.server.submit(j4)
  299. self.server.expect(JOB, {ATTR_state: 'S'}, id=j3id)
  300. self.server.expect(JOB, {ATTR_state: 'R'}, id=j4id)
  301. self.server.delete(j4id)
  302. self.server.expect(JOB, {ATTR_state: 'R'}, id=j3id)
  303. def test_preempted_topjob_calendared(self):
  304. """
  305. That even if topjob_ineligible is set for
  306. a preempted job and sched_preempt_enforce_resumption
  307. is set true, the preempted job will be calendared
  308. """
  309. self.server.manager(MGR_CMD_SET, SCHED,
  310. {'sched_preempt_enforce_resumption': 'true'})
  311. self.server.manager(MGR_CMD_SET, SERVER, {'backfill_depth': '2'})
  312. # Count the compute nodes
  313. nv = self.cu.num_compute_vnodes(self.server)
  314. self.assertNotEqual(nv, 0, "There are no cray_compute vnodes present.")
  315. # Submit a job
  316. j = Job(TEST_USER, {ATTR_l + '.select': '%d:ncpus=1' % nv,
  317. ATTR_l + '.place': 'scatter',
  318. ATTR_l + '.walltime': '120'})
  319. jid1 = self.server.submit(j)
  320. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  321. # Alter topjob_ineligible for runnng job
  322. self.server.alterjob(jid1, {ATTR_W: "topjob_ineligible = true"},
  323. runas=ROOT_USER, logerr=True)
  324. # Create a high priority queue
  325. a = {'queue_type': 'e', 'started': 't',
  326. 'enabled': 'True', 'priority': '150'}
  327. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id="highp")
  328. # Submit a job to high priority queue
  329. j = Job(TEST_USER, {ATTR_queue: 'highp', ATTR_l + '.walltime': '60'})
  330. jid2 = self.server.submit(j)
  331. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
  332. # Verify that job1 is calendared
  333. self.server.expect(JOB, 'estimated.start_time',
  334. op=SET, id=jid1)
  335. qstat = self.server.status(JOB, 'estimated.start_time',
  336. id=jid1)
  337. est_time = qstat[0]['estimated.start_time']
  338. self.assertNotEqual(est_time, None)
  339. self.scheduler.log_match(jid1 + ";Job is a top job",
  340. starttime=self.server.ctime,
  341. max_attempts=10)