pbs_soft_walltime.py 39 KB


  1. # coding: utf-8
  2. # Copyright (C) 1994-2018 Altair Engineering, Inc.
  3. # For more information, contact Altair at www.altair.com.
  4. #
  5. # This file is part of the PBS Professional ("PBS Pro") software.
  6. #
  7. # Open Source License Information:
  8. #
  9. # PBS Pro is free software. You can redistribute it and/or modify it under the
  10. # terms of the GNU Affero General Public License as published by the Free
  11. # Software Foundation, either version 3 of the License, or (at your option) any
  12. # later version.
  13. #
  14. # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
  15. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16. # FOR A PARTICULAR PURPOSE.
  17. # See the GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. #
  22. # Commercial License Information:
  23. #
  24. # For a copy of the commercial license terms and conditions,
  25. # go to: (http://www.pbspro.com/UserArea/agreement.html)
  26. # or contact the Altair Legal Department.
  27. #
  28. # Altair’s dual-license business model allows companies, individuals, and
  29. # organizations to create proprietary derivative works of PBS Pro and
  30. # distribute them - whether embedded or bundled with other software -
  31. # under a commercial license agreement.
  32. #
  33. # Use of Altair’s trademarks, including but not limited to "PBS™",
  34. # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
  35. # trademark licensing policies.
  36. from tests.functional import *
  37. def cvt_duration(duration):
  38. """
  39. convert string form of a duration (HH:MM:SS) into seconds
  40. """
  41. h = 0
  42. m = 0
  43. sp = duration.split(':')
  44. if len(sp) == 3:
  45. h = int(sp[0])
  46. m = int(sp[1])
  47. s = int(sp[2])
  48. elif len(sp) == 2:
  49. m = int(sp[0])
  50. s = int(sp[1])
  51. else:
  52. s = int(sp[0])
  53. return h * 3600 + m * 60 + s
  54. class TestSoftWalltime(TestFunctional):
  55. """
  56. Test that the soft_walltime resource is being used properly and
  57. being extended properly when exceeded
  58. """
  59. def setUp(self):
  60. TestFunctional.setUp(self)
  61. self.server.manager(
  62. MGR_CMD_UNSET, SERVER, 'Resources_default.soft_walltime')
  63. # Delete operators if added
  64. self.server.manager(MGR_CMD_UNSET, SERVER, 'operators', expect=True)
  65. def stat_job(self, job):
  66. """
  67. stat a job for its estimated.start_time and soft_walltime or walltime
  68. :param job: Job to stat
  69. :type job: string
  70. """
  71. a = ['estimated.start_time', 'Resource_List.soft_walltime',
  72. 'Resource_List.walltime']
  73. # If we're in CLI mode, qstat returns times in a human readable format
  74. # We need to turn it back into an epoch. API mode will be the epoch.
  75. Jstat = self.server.status(JOB, id=job, attrib=a)
  76. wt = 0
  77. if self.server.get_op_mode() == PTL_CLI:
  78. strp = time.strptime(Jstat[0]['estimated.start_time'], '%c')
  79. est = int(time.mktime(strp))
  80. if 'Resource_List.soft_walltime' in Jstat[0]:
  81. wt = cvt_duration(Jstat[0]['Resource_List.soft_walltime'])
  82. elif 'Resource_List.walltime' in Jstat[0]:
  83. wt = cvt_duration(Jstat[0]['Resource_List.walltime'])
  84. else:
  85. est = int(Jstat[0]['estimated.start_time'])
  86. if 'Resource_List.soft_walltime' in Jstat[0]:
  87. wt = int(Jstat[0]['Resource_List.soft_walltime'])
  88. elif 'Resource_List.walltime' in RN_stat[0]:
  89. wt = int(Jstat[0]['Resource_List.walltime'])
  90. return (est, wt)
  91. def compare_estimates(self, baseline_job, jobs):
  92. """
  93. Check if estimated start times are correct
  94. :param baseline_job: initial top job to base times off of
  95. :type baseline_job: string (job id)
  96. :param jobs: calendared jobs
  97. :type jobs: list of strings (job ids)
  98. """
  99. est, wt = self.stat_job(baseline_job)
  100. for j in jobs:
  101. est2, wt2 = self.stat_job(j)
  102. self.assertEquals(est + wt, est2)
  103. est = est2
  104. wt = wt2
  105. def setup_holidays(self, prime_offset, nonprime_offset):
  106. """
  107. Set up the holidays file for test execution. This function will
  108. first remove all entries in the holidays file and then add a year,
  109. prime, and nonprime for all days. The prime and nonprime entries
  110. will be offsets from the current time.
  111. This all is necessary because there are some holidays set by default.
  112. The test should be able to be run on any day of the year. If it is
  113. run on one of these holidays, it will be nonprime time only.
  114. """
  115. # Delete all entries in the holidays file
  116. self.scheduler.holidays_delete_entry('a')
  117. lt = time.localtime(time.time())
  118. self.scheduler.holidays_set_year(str(lt[0]))
  119. now = int(time.time())
  120. prime = time.strftime('%H%M', time.localtime(now + prime_offset))
  121. nonprime = time.strftime('%H%M', time.localtime(now + nonprime_offset))
  122. # set prime-time and nonprime-time for all days
  123. self.scheduler.holidays_set_day('weekday', prime, nonprime)
  124. self.scheduler.holidays_set_day('saturday', prime, nonprime)
  125. self.scheduler.holidays_set_day('sunday', prime, nonprime)
  126. def test_soft_walltime_perms(self):
  127. """
  128. Test to see if soft_walltime can't be submitted with a job or
  129. altered by a normal user or operator
  130. """
  131. J = Job(TEST_USER, attrs={'Resource_List.soft_walltime': 10})
  132. msg = 'Cannot set attribute, read only or insufficient permission'
  133. jid = None
  134. try:
  135. jid = self.server.submit(J)
  136. except PbsSubmitError as e:
  137. self.assertTrue(msg in e.msg[0])
  138. self.assertEquals(jid, None)
  139. J = Job(TEST_USER)
  140. jid = self.server.submit(J)
  141. try:
  142. self.server.alterjob(jid, {'Resource_List.soft_walltime': 10},
  143. runas=TEST_USER)
  144. except PbsAlterError as e:
  145. self.assertTrue(msg in e.msg[0])
  146. self.server.expect(JOB, 'Resource_List.soft_walltime',
  147. op=UNSET, id=jid)
  148. operator = str(OPER_USER) + '@*'
  149. self.server.manager(MGR_CMD_SET, SERVER,
  150. {'operators': (INCR, operator)},
  151. sudo=True)
  152. try:
  153. self.server.alterjob(jid, {'Resource_List.soft_walltime': 10},
  154. runas=OPER_USER)
  155. except PbsAlterError as e:
  156. self.assertTrue(msg in e.msg[0])
  157. self.server.expect(JOB, 'Resource_List.soft_walltime',
  158. op=UNSET, id=jid)
  159. def test_soft_walltime_STF(self):
  160. """
  161. Test that STF jobs can't have soft_walltime
  162. """
  163. msg = 'soft_walltime is not supported with Shrink to Fit jobs'
  164. J = Job(attrs={'Resource_List.min_walltime': 120, ATTR_h: None})
  165. jid = self.server.submit(J)
  166. try:
  167. self.server.alterjob(jid, {'Resource_List.soft_walltime': 10})
  168. except PbsAlterError as e:
  169. self.assertTrue(msg in e.msg[0])
  170. self.server.expect(JOB, 'Resource_List.soft_walltime',
  171. op=UNSET, id=jid)
  172. J = Job(TEST_USER, attrs={ATTR_h: None})
  173. jid = self.server.submit(J)
  174. self.server.alterjob(jid, {'Resource_List.soft_walltime': 10})
  175. try:
  176. self.server.alterjob(jid, {'Resource_List.min_walltime': 120})
  177. except PbsAlterError as e:
  178. self.assertTrue(msg in e.msg[0])
  179. self.server.expect(JOB, 'Resource_List.min_walltime',
  180. op=UNSET, id=jid)
  181. J = Job(TEST_USER, attrs={ATTR_h: None})
  182. jid = self.server.submit(J)
  183. a = {'Resource_List.soft_walltime': 10,
  184. 'Resource_List.min_walltime': 120}
  185. try:
  186. self.server.alterjob(jid, a)
  187. except PbsAlterError as e:
  188. self.assertTrue(msg in e.msg[0])
  189. al = ['Resource_List.min_walltime', 'Resource_List.soft_walltime']
  190. self.server.expect(JOB, al, op=UNSET, id=jid)
  191. def test_soft_greater_hard(self):
  192. """
  193. Test that a job's soft_walltime can't be greater than its hard walltime
  194. """
  195. msg = 'Illegal attribute or resource value'
  196. J = Job(TEST_USER, attrs={'Resource_List.walltime': 120, ATTR_h: None})
  197. jid = self.server.submit(J)
  198. try:
  199. self.server.alterjob(jid, {'Resource_List.soft_walltime': 240})
  200. except PbsAlterError as e:
  201. self.assertTrue(msg in e.msg[0])
  202. self.server.expect(JOB, 'Resource_List.soft_walltime',
  203. op=UNSET, id=jid)
  204. J = Job(TEST_USER, {ATTR_h: None})
  205. jid = self.server.submit(J)
  206. self.server.alterjob(jid, {'Resource_List.soft_walltime': 240})
  207. try:
  208. self.server.alterjob(jid, {'Resource_List.walltime': 120})
  209. except PbsAlterError:
  210. self.assertTrue(msg in e.msg[0])
  211. self.server.expect(JOB, 'Resource_List.walltime', op=UNSET, id=jid)
  212. J = Job(TEST_USER, {ATTR_h: None})
  213. jid = self.server.submit(J)
  214. try:
  215. self.server.alterjob(jid, {'Resource_List.walltime': 120,
  216. 'Resource_List.soft_walltime': 240})
  217. except PbsAlterError:
  218. self.assertTrue(msg in e.msg[0])
  219. al = ['Resource_List.walltime', 'Resource_List.soft_walltime']
  220. self.server.expect(JOB, al, op=UNSET, id=jid)
  221. def test_direct_set_soft_walltime(self):
  222. """
  223. Test setting soft_walltime directly
  224. """
  225. hook_body = \
  226. """import pbs
  227. e = pbs.event()
  228. j = e.job
  229. j.Resource_List["soft_walltime"] = \
  230. pbs.duration(j.Resource_List["set_soft_walltime"])
  231. e.accept()
  232. """
  233. self.server.manager(MGR_CMD_CREATE, RSC, {'type': 'long'},
  234. id='set_soft_walltime')
  235. a = {'event': 'queuejob', 'enabled': 'True'}
  236. self.server.create_import_hook("que", a, hook_body)
  237. J = Job(TEST_USER, attrs={'Resource_List.set_soft_walltime': 5})
  238. jid = self.server.submit(J)
  239. self.server.expect(JOB, {'Resource_List.soft_walltime': 5}, id=jid)
  240. def test_soft_walltime_extend(self):
  241. """
  242. Test to see that soft_walltime is extended properly
  243. """
  244. J = Job(TEST_USER)
  245. jid = self.server.submit(J)
  246. self.server.alterjob(jid, {'Resource_List.soft_walltime': 6})
  247. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  248. time.sleep(7)
  249. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  250. self.server.expect(JOB, {'estimated.soft_walltime': 6}, op=GT, id=jid)
  251. # Get the current soft_walltime
  252. jstat = self.server.status(JOB, id=jid,
  253. attrib=['estimated.soft_walltime'])
  254. est_soft_walltime = jstat[0]['estimated.soft_walltime']
  255. time.sleep(7)
  256. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  257. # Check if soft_walltime extended
  258. self.server.expect(JOB, {'estimated.soft_walltime':
  259. est_soft_walltime}, op=GT, id=jid)
  260. def test_soft_walltime_extend_hook(self):
  261. """
  262. Test to see that soft_walltime is extended properly when submitted
  263. through a queue job hook
  264. """
  265. hook_body = \
  266. """import pbs
  267. e = pbs.event()
  268. e.job.Resource_List["soft_walltime"] = pbs.duration(5)
  269. e.accept()
  270. """
  271. a = {'event': 'queuejob', 'enabled': 'True'}
  272. self.server.create_import_hook("que", a, hook_body)
  273. J = Job(TEST_USER)
  274. jid = self.server.submit(J)
  275. self.server.expect(JOB, {'Resource_List.soft_walltime': 5}, id=jid)
  276. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  277. time.sleep(6)
  278. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  279. self.server.expect(JOB, {'estimated.soft_walltime': 5}, op=GT, id=jid)
  280. # Get the current soft_walltime
  281. jstat = self.server.status(JOB, id=jid,
  282. attrib=['estimated.soft_walltime'])
  283. est_soft_walltime = jstat[0]['estimated.soft_walltime']
  284. time.sleep(6)
  285. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  286. self.server.expect(JOB, {'estimated.soft_walltime':
  287. est_soft_walltime}, op=GT, id=jid)
  288. def test_soft_then_hard(self):
  289. """
  290. Test to see if a job has both a soft and a hard walltime, that
  291. the job's soft_walltime is not extended past its hard walltime.
  292. It should first extend once and then extend to its hard walltime
  293. """
  294. self.server.manager(MGR_CMD_SET, SERVER,
  295. {'job_history_enable': 'True'})
  296. J = Job(TEST_USER,
  297. attrs={'Resource_List.ncpus': 1, 'Resource_List.walltime': 16})
  298. jid = self.server.submit(J)
  299. self.server.alterjob(jid, {'Resource_List.soft_walltime': 6})
  300. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  301. time.sleep(7)
  302. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  303. self.server.expect(JOB, {'estimated.soft_walltime': 6}, op=GT, id=jid)
  304. time.sleep(7)
  305. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  306. self.server.expect(JOB, {'estimated.soft_walltime': 16},
  307. offset=4, extend='x', id=jid)
  308. self.server.expect(JOB, 'queue', op=UNSET, id=jid)
  309. def test_soft_before_dedicated(self):
  310. """
  311. Make sure that if a job's soft_walltime won't complete before
  312. dedicated time, the job does not start
  313. """
  314. now = int(time.time())
  315. self.scheduler.add_dedicated_time(start=now + 60, end=now + 2500)
  316. J = Job(TEST_USER)
  317. jid = self.server.submit(J)
  318. self.server.alterjob(jid, {'Resource_List.soft_walltime': 90})
  319. comment = 'Not Running: Job would cross dedicated time boundary'
  320. self.server.expect(JOB, {'comment': comment})
  321. def test_soft_extend_dedicated(self):
  322. """
  323. Have a job with a soft_walltime extend into dedicated time and see
  324. the job continue running like normal
  325. """
  326. # Dedicated time is in the granularity of minutes. This can't be set
  327. # any shorter without making it dedicated time right now.
  328. now = int(time.time())
  329. self.scheduler.add_dedicated_time(start=now + 70, end=now + 180)
  330. J = Job(TEST_USER, {'Resource_List.walltime': 180})
  331. jid = self.server.submit(J)
  332. self.server.alterjob(jid, {'Resource_List.soft_walltime': 5})
  333. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  334. self.logger.info("Waiting until dedicated time starts")
  335. time.sleep(61)
  336. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  337. self.server.expect(JOB, {'estimated.soft_walltime': 65},
  338. op=GE, id=jid)
  339. def test_soft_before_prime(self):
  340. """
  341. Make sure that if a job's soft_walltime won't complete before
  342. prime boundry, the job does not start
  343. """
  344. self.scheduler.set_sched_config({'backfill_prime': 'True'})
  345. self.setup_holidays(3600, 7200)
  346. J = Job(TEST_USER)
  347. jid = self.server.submit(J)
  348. self.server.alterjob(jid, {'Resource_List.soft_walltime': 5400})
  349. comment = 'Not Running: Job will cross into primetime'
  350. self.server.expect(JOB, {'comment': comment}, id=jid)
  351. def test_soft_backfill_prime(self):
  352. """
  353. Test if soft_walltime is used to see if a job can run before
  354. the next prime boundry
  355. """
  356. self.scheduler.set_sched_config({'backfill_prime': 'True'})
  357. self.setup_holidays(60, 3600)
  358. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'})
  359. J = Job(TEST_USER, {'Resource_List.walltime': 300})
  360. jid = self.server.submit(J)
  361. self.server.alterjob(jid, {'Resource_List.soft_walltime': 5})
  362. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  363. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid)
  364. self.logger.info("Waiting until prime time starts.")
  365. time.sleep(61)
  366. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  367. self.server.expect(JOB, {'estimated.soft_walltime': 65}, op=GE,
  368. id=jid)
  369. def test_resv_conf_soft(self):
  370. """
  371. Test that there is no change in the reservation behavior with
  372. soft_walltime set on jobs with no hard walltime set
  373. """
  374. a = {'resources_available.ncpus': 4}
  375. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname)
  376. now = int(time.time())
  377. J = Job(TEST_USER, attrs={'Resource_List.ncpus': 4})
  378. jid = self.server.submit(J)
  379. self.server.alterjob(jid, {'Resource_List.soft_walltime': 5})
  380. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  381. a = {'Resource_List.ncpus': 1, 'reserve_start': now + 10,
  382. 'reserve_end': now + 130}
  383. R = Reservation(TEST_USER, attrs=a)
  384. rid = self.server.submit(R)
  385. self.server.log_match(rid + ';reservation deleted', max_attempts=5)
  386. def test_resv_conf_soft_with_hard(self):
  387. """
  388. Test that there is no change in the reservation behavior with
  389. soft_walltime set on jobs with a hard walltime set. The soft_walltime
  390. should be ignored and only the hard walltime should be used.
  391. """
  392. a = {'resources_available.ncpus': 4}
  393. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname)
  394. now = int(time.time())
  395. J = Job(TEST_USER, attrs={'Resource_List.ncpus': 4,
  396. 'Resource_List.walltime': 120})
  397. jid = self.server.submit(J)
  398. self.server.alterjob(jid, {'Resource_List.soft_walltime': 5})
  399. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  400. a = {'Resource_List.ncpus': 1, 'reserve_start': now + 60,
  401. 'reserve_end': now + 250}
  402. R = Reservation(TEST_USER, attrs=a)
  403. rid = self.server.submit(R)
  404. self.server.log_match(rid + ';reservation deleted', max_attempts=5)
  405. def test_resv_job_soft(self):
  406. """
  407. Test to see that a job with a soft walltime which would "end" before
  408. a reservation starts does not start. It would interfere with the
  409. reservation.
  410. """
  411. a = {'resources_available.ncpus': 4}
  412. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname)
  413. now = int(time.time())
  414. a = {'Resource_List.ncpus': 4, 'reserve_start': now + 120,
  415. 'reserve_end': now + 240}
  416. R = Reservation(TEST_USER, attrs=a)
  417. rid = self.server.submit(R)
  418. self.server.expect(RESV,
  419. {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')},
  420. id=rid)
  421. a = {'Resource_List.ncpus': 4, ATTR_h: None}
  422. J = Job(TEST_USER, attrs=a)
  423. jid = self.server.submit(J)
  424. self.server.alterjob(jid, {'Resource_List.soft_walltime': 60})
  425. self.server.rlsjob(jid, 'u')
  426. a = {ATTR_state: 'Q', ATTR_comment:
  427. 'Not Running: Job would conflict with reservation or top job'}
  428. self.server.expect(JOB, a, id=jid, attrop=PTL_AND)
  429. def test_resv_job_soft_hard(self):
  430. """
  431. Test to see that a job with a soft walltime and a hard walltime does
  432. not interfere with a confirmed reservation. The soft walltime would
  433. have the job "end" before the reservation starts, but the hard
  434. walltime would not.
  435. """
  436. a = {'resources_available.ncpus': 4}
  437. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname)
  438. now = int(time.time())
  439. a = {'Resource_List.ncpus': 4, 'reserve_start': now + 120,
  440. 'reserve_end': now + 240}
  441. R = Reservation(TEST_USER, attrs=a)
  442. rid = self.server.submit(R)
  443. self.server.expect(RESV,
  444. {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')},
  445. id=rid)
  446. a = {'Resource_List.ncpus': 4,
  447. 'Resource_List.walltime': 150, ATTR_h: None}
  448. J = Job(TEST_USER, attrs=a)
  449. jid = self.server.submit(J)
  450. self.server.alterjob(jid, {'Resource_List.soft_walltime': 60})
  451. self.server.rlsjob(jid, 'u')
  452. a = {ATTR_state: 'Q', ATTR_comment:
  453. 'Not Running: Job would conflict with reservation or top job'}
  454. self.server.expect(JOB, a, id=jid, attrop=PTL_AND)
  455. def test_topjob(self):
  456. """
  457. Test that soft_walltime is used for calendaring of topjobs
  458. Submit 3 jobs:
  459. Job1 has a soft_walltime=150 and runs now
  460. Job2 has a soft_walltime=150 and gets added to the calendar at now+150
  461. Job3 has a soft_walltime=150 and gets added to the calendar at now+300
  462. Job4 has a soft_walltime=150 and gets added to the calendar at now+450
  463. """
  464. self.scheduler.set_sched_config({'strict_ordering': 'True'})
  465. a = {'resources_available.ncpus': 1}
  466. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname)
  467. self.server.manager(MGR_CMD_SET, SERVER, {ATTR_backfill_depth: 3})
  468. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'})
  469. J = Job(TEST_USER, {'Resource_List.walltime': 300})
  470. jid1 = self.server.submit(J)
  471. self.server.alterjob(jid1, {'Resource_List.soft_walltime': 150})
  472. J = Job(TEST_USER, {'Resource_List.walltime': 300})
  473. jid2 = self.server.submit(J)
  474. self.server.alterjob(jid2, {'Resource_List.soft_walltime': 150})
  475. J = Job(TEST_USER, {'Resource_List.walltime': 300})
  476. jid3 = self.server.submit(J)
  477. self.server.alterjob(jid3, {'Resource_List.soft_walltime': 150})
  478. J = Job(TEST_USER, {'Resource_List.walltime': 300})
  479. jid4 = self.server.submit(J)
  480. self.server.alterjob(jid4, {'Resource_List.soft_walltime': 150})
  481. now = int(time.time())
  482. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  483. self.scheduler.log_match('Leaving Scheduling Cycle', starttime=now,
  484. max_attempts=20)
  485. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  486. self.compare_estimates(jid2, [jid3, jid4])
  487. def test_topjob2(self):
  488. """
  489. Test a mixture of soft_walltime and walltime used in the calendar
  490. Submit 3 jobs:
  491. Job1 has a soft_walltime=150 runs now
  492. Job2 has a soft_walltime=150 and gets added to the calendar at now+150
  493. Job3 has a soft_walltime=150 and gets added to the calendar at now+300
  494. Job4 has a walltime=300 and gets added to the calendar at now+450
  495. Job5 gets added to the calendar at now+750
  496. """
  497. self.scheduler.set_sched_config({'strict_ordering': 'True'})
  498. a = {'resources_available.ncpus': 1}
  499. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname)
  500. self.server.manager(MGR_CMD_SET, SERVER, {ATTR_backfill_depth: 4})
  501. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'})
  502. J = Job(TEST_USER, {'Resource_List.walltime': 300})
  503. jid1 = self.server.submit(J)
  504. self.server.alterjob(jid1, {'Resource_List.soft_walltime': 150})
  505. J = Job(TEST_USER, {'Resource_List.walltime': 300})
  506. jid2 = self.server.submit(J)
  507. self.server.alterjob(jid2, {'Resource_List.soft_walltime': 150})
  508. J = Job(TEST_USER, {'Resource_List.walltime': 300})
  509. jid3 = self.server.submit(J)
  510. self.server.alterjob(jid3, {'Resource_List.soft_walltime': 150})
  511. J = Job(TEST_USER, {'Resource_List.walltime': 300})
  512. jid4 = self.server.submit(J)
  513. J = Job(TEST_USER, {'Resource_List.walltime': 300})
  514. jid5 = self.server.submit(J)
  515. now = int(time.time())
  516. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  517. self.scheduler.log_match('Leaving Scheduling Cycle', starttime=now)
  518. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  519. self.compare_estimates(jid2, [jid3, jid4, jid5])
  520. def test_filler_job(self):
  521. """
  522. Test to see if filler jobs will run based on their soft_walltime
  523. Submit 3 jobs:
  524. Job1 requests 1cpu and runs now
  525. Job2 requests 2cpus gets added to the calendar at now+300
  526. Job3 requests 1cpu and has a soft_walltime=150 and walltime=450
  527. Job3 should run because its soft_walltime will finish before now+300
  528. """
  529. self.scheduler.set_sched_config({'strict_ordering': 'True'})
  530. a = {'resources_available.ncpus': 2}
  531. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname)
  532. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'})
  533. J1 = Job(TEST_USER, {'Resource_List.walltime': 300,
  534. 'Resource_List.ncpus': 1})
  535. jid1 = self.server.submit(J1)
  536. J2 = Job(TEST_USER, {'Resource_List.walltime': 300,
  537. 'Resource_List.ncpus': 2})
  538. jid2 = self.server.submit(J2)
  539. J3 = Job(TEST_USER, {'Resource_List.walltime': 450,
  540. 'Resource_List.ncpus': 1})
  541. jid3 = self.server.submit(J3)
  542. self.server.alterjob(jid3, {'Resource_List.soft_walltime': 150})
  543. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  544. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  545. self.server.expect(JOB, {ATTR_state: 'Q'}, id=jid2)
  546. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid3)
  547. def test_preempt_order(self):
  548. """
  549. Test if soft_walltime is used for preempt_order. It should be used
  550. to calculate percent done and also if the soft_walltime is exceeded,
  551. the percent done should remain at 100%
  552. """
  553. self.scheduler.set_sched_config({'preempt_order': '"R 10 S"'})
  554. a = {'resources_available.ncpus': 2}
  555. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname)
  556. a = {'queue_type': 'Execution', 'enabled': 'True',
  557. 'started': 'True', 'Priority': 150}
  558. self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='expressq')
  559. a = {'Resource_List.walltime': 600}
  560. J1 = Job(TEST_USER, attrs=a)
  561. jid1 = self.server.submit(J1)
  562. a = {'Resource_List.soft_walltime': 45}
  563. self.server.alterjob(jid1, a)
  564. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  565. # test preempt_order with percentage < 90. jid1 should be requeued.
  566. express_a = {'Resource_List.ncpus': 2, ATTR_queue: 'expressq'}
  567. J2 = Job(TEST_USER, attrs=express_a)
  568. jid2 = self.server.submit(J2)
  569. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
  570. self.server.expect(JOB, {ATTR_state: 'Q'}, id=jid1)
  571. self.server.deljob(jid2, wait=True)
  572. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
  573. # preempt_order percentage done is based on resources_used.walltime
  574. # this is only periodically updated. Sleep until half way through
  575. # the extended soft_walltime to make sure we're over 100%
  576. self.logger.info("Sleeping 60 seconds to accumulate "
  577. "resources_used.walltime")
  578. time.sleep(60)
  579. J3 = Job(TEST_USER, attrs=express_a)
  580. jid3 = self.server.submit(J3)
  581. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid3)
  582. self.server.expect(JOB, {ATTR_state: 'S'}, id=jid1)
  583. def test_soft_values_default(self):
  584. """
  585. Test to verify that soft_walltime will only take integer/long type
  586. value
  587. """
  588. msg = 'Illegal attribute or resource value'
  589. try:
  590. self.server.manager(
  591. MGR_CMD_SET, SERVER, {'resources_default.soft_walltime': '0'})
  592. except PbsManagerError as e:
  593. self.assertTrue(msg in e.msg[0])
  594. try:
  595. self.server.manager(
  596. MGR_CMD_SET, SERVER,
  597. {'resources_default.soft_walltime': '00:00:00'})
  598. except PbsManagerError as e:
  599. self.assertTrue(msg in e.msg[0])
  600. try:
  601. self.server.manager(
  602. MGR_CMD_SET, SERVER,
  603. {'resources_default.soft_walltime': 'abc'})
  604. except PbsManagerError as e:
  605. self.assertTrue(msg in e.msg[0])
  606. try:
  607. self.server.manager(
  608. MGR_CMD_SET, SERVER,
  609. {'resources_default.soft_walltime': '01:20:aa'})
  610. except PbsManagerError as e:
  611. self.assertTrue(msg in e.msg[0])
  612. try:
  613. self.server.manager(MGR_CMD_SET, SERVER, {
  614. 'resources_default.soft_walltime':
  615. '1000000000000000000000000'})
  616. except PbsManagerError as e:
  617. self.assertTrue(msg in e.msg[0])
  618. try:
  619. self.server.manager(
  620. MGR_CMD_SET, SERVER,
  621. {'resources_default.soft_walltime': '-1'})
  622. except PbsManagerError as e:
  623. self.assertTrue(msg in e.msg[0])
  624. try:
  625. self.server.manager(
  626. MGR_CMD_SET, SERVER,
  627. {'resources_default.soft_walltime': '00.10'})
  628. except PbsManagerError as e:
  629. self.assertTrue(msg in e.msg[0])
  630. self.server.manager(
  631. MGR_CMD_SET, SERVER,
  632. {'resources_default.soft_walltime': '00:01:00'})
  633. def test_soft_runjob_hook(self):
  634. """
  635. Test that soft walltime is set by runjob hook
  636. """
  637. hook_body = \
  638. """import pbs
  639. e = pbs.event()
  640. e.job.Resource_List["soft_walltime"] = pbs.duration(5)
  641. e.accept()
  642. """
  643. a = {'event': 'runjob', 'enabled': 'True'}
  644. self.server.create_import_hook("que", a, hook_body)
  645. J = Job(TEST_USER)
  646. jid = self.server.submit(J)
  647. self.server.expect(JOB, {'Resource_List.soft_walltime': 5}, id=jid)
  648. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  649. def test_soft_modifyjob_hook(self):
  650. """
  651. Test that soft walltime is set by modifyjob hook
  652. """
  653. hook_body = \
  654. """import pbs
  655. e = pbs.event()
  656. e.job.Resource_List["soft_walltime"] = pbs.duration(15)
  657. e.accept()
  658. """
  659. a = {'event': 'modifyjob', 'enabled': 'True'}
  660. self.server.create_import_hook("que", a, hook_body)
  661. J = Job(TEST_USER)
  662. jid = self.server.submit(J)
  663. self.server.expect(
  664. JOB, 'Resource_List.soft_walltime', op=UNSET, id=jid)
  665. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  666. self.server.alterjob(jid, {'Resource_List.soft_walltime': 5})
  667. self.server.expect(JOB, {'Resource_List.soft_walltime': 15}, id=jid)
  668. def test_walltime_default(self):
  669. """
  670. Test soft walltime behavior with hard walltime is same
  671. even if set under resource_default
  672. """
  673. self.server.manager(MGR_CMD_SET, SERVER,
  674. {'resources_default.soft_walltime': '15'})
  675. J = Job(TEST_USER, attrs={'Resource_List.walltime': 15})
  676. jid = self.server.submit(J)
  677. self.server.expect(JOB, {'Resource_List.soft_walltime': 15}, id=jid)
  678. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  679. self.server.deljob(jid, wait=True)
  680. J = Job(TEST_USER, attrs={'Resource_List.walltime': 16})
  681. jid1 = self.server.submit(J)
  682. self.server.expect(JOB, {'Resource_List.soft_walltime': 15}, id=jid1)
  683. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  684. self.server.deljob(jid1, wait=True)
  685. # following piece is commented due to PP-1058
  686. # try:
  687. # J = Job(TEST_USER, attrs={'Resource_List.walltime': 10})
  688. # jid1 = self.server.submit(J)
  689. # except PtlSubmitError as e:
  690. # self.assertTrue("illegal attribute or resource value" in e.msg[0])
  691. # self.assertEqual(jid1, None)
  692. def test_soft_held(self):
  693. """
  694. Test that if job is held soft_walltime will not get extended
  695. """
  696. J = Job(TEST_USER, attrs={'Resource_List.walltime': '100'})
  697. jid = self.server.submit(J)
  698. self.server.alterjob(jid, {'Resource_List.soft_walltime': 7})
  699. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  700. self.logger.info(
  701. "Sleep to let soft_walltime get extended")
  702. time.sleep(10)
  703. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  704. self.server.expect(JOB, {'estimated.soft_walltime': 7}, op=GT,
  705. id=jid)
  706. # Save the soft_walltime before holding the job
  707. jstat = self.server.status(JOB, id=jid,
  708. attrib=['estimated.soft_walltime'])
  709. est_soft_walltime = jstat[0]['estimated.soft_walltime']
  710. self.server.holdjob(jid, 'u')
  711. self.server.rerunjob(jid)
  712. self.server.expect(JOB, {'job_state': 'H'}, id=jid)
  713. self.logger.info(
  714. "Sleep to verify that soft_walltime: %s"
  715. " doesn't change while job is held" % est_soft_walltime)
  716. time.sleep(10)
  717. self.server.expect(JOB, {'estimated.soft_walltime':
  718. est_soft_walltime}, id=jid)
  719. # release the job and look for the soft_walltime again
  720. self.server.rlsjob(jid, 'u')
  721. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  722. self.server.expect(JOB, {'job_state': 'R', 'estimated.soft_walltime':
  723. est_soft_walltime}, attrop=PTL_AND, id=jid)
  724. # Wait for some more time and verify that soft_walltime
  725. # extending again
  726. self.logger.info(
  727. "Sleep enough to let soft_walltime get extended again"
  728. " since the walltime was reset to 0")
  729. time.sleep(17)
  730. self.server.expect(JOB, {'estimated.soft_walltime': est_soft_walltime},
  731. op=GT, id=jid)
  732. def test_soft_less_cput(self):
  733. """
  734. Test that soft_walltime has no impact on cput enforcement limit
  735. """
  736. script = """
  737. i=0
  738. while [ 1 ]
  739. do
  740. sleep 0.125;
  741. dd if=/dev/zero of=/dev/null;
  742. done
  743. """
  744. j1 = Job(TEST_USER, {'Resource_List.cput': 5})
  745. j1.create_script(body=script)
  746. jid = self.server.submit(j1)
  747. self.server.alterjob(jid, {'Resource_List.soft_walltime': 300})
  748. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  749. # verify that job is deleted when cput limit is reached
  750. time.sleep(10)
  751. self.server.expect(JOB, 'queue', op=UNSET, id=jid)
  752. def test_soft_walltime_resv(self):
  753. """
  754. Submit a job with soft walltime inside a reservation
  755. """
  756. now = int(time.time())
  757. a = {'Resource_List.ncpus': 1, 'reserve_start': now + 5,
  758. 'reserve_end': now + 10}
  759. R = Reservation(TEST_USER, attrs=a)
  760. rid = self.server.submit(R)
  761. self.server.expect(RESV,
  762. {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')},
  763. id=rid)
  764. r1 = rid.split('.')[0]
  765. j1 = Job(TEST_USER, attrs={ATTR_queue: r1})
  766. jid = self.server.submit(j1)
  767. # Set soft walltime to greater than reservation end time
  768. self.server.alterjob(jid, {'Resource_List.soft_walltime': 300})
  769. self.server.expect(JOB, {'Resource_List.soft_walltime': 300}, id=jid)
  770. # verify that the job gets deleted when reservation ends
  771. self.server.expect(
  772. JOB, 'queue', op=UNSET, id=jid, offset=10, max_attempts=10)
  773. def test_restart_server(self):
  774. """
  775. Test that on server restart soft walltime is not reset
  776. """
  777. self.server.manager(MGR_CMD_SET, SERVER,
  778. {'job_history_enable': 'True'})
  779. hook_body = \
  780. """import pbs
  781. e = pbs.event()
  782. e.job.Resource_List["soft_walltime"] = pbs.duration(8)
  783. e.accept()
  784. """
  785. a = {'event': 'queuejob', 'enabled': 'True'}
  786. self.server.create_import_hook("que", a, hook_body)
  787. J = Job(TEST_USER)
  788. jid = self.server.submit(J)
  789. self.server.expect(JOB, {'Resource_List.soft_walltime': 8}, id=jid)
  790. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  791. self.logger.info("Wait till the soft_walltime is extended once")
  792. time.sleep(9)
  793. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  794. self.server.expect(JOB, {'estimated.soft_walltime': 8}, op=GT,
  795. id=jid)
  796. self.server.restart()
  797. self.server.expect(JOB, {'Resource_List.soft_walltime': 8}, id=jid)
  798. self.server.expect(JOB, {'estimated.soft_walltime': 8}, op=GT,
  799. id=jid)
  800. # Get the current soft_walltime
  801. jstat = self.server.status(JOB, id=jid,
  802. attrib=['estimated.soft_walltime'])
  803. est_soft_walltime = jstat[0]['estimated.soft_walltime']
  804. # Delete the job and verify that estimated.soft_walltime is set
  805. # for job history
  806. self.server.deljob(jid, wait=True)
  807. self.server.expect(JOB,
  808. {'job_state': 'F',
  809. 'estimated.soft_walltime':
  810. est_soft_walltime}, op=GE,
  811. extend='x', attrop=PTL_AND, id=jid)
  812. def test_resv_job_soft_hard2(self):
  813. """
  814. Test that a job with soft and hard walltime will not conflict with
  815. reservtion if hard walltime is less that reservation start time.
  816. """
  817. a = {'resources_available.ncpus': 4}
  818. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname)
  819. now = int(time.time())
  820. a = {'Resource_List.ncpus': 4, 'reserve_start': now + 65,
  821. 'reserve_end': now + 240}
  822. R = Reservation(TEST_USER, attrs=a)
  823. rid = self.server.submit(R)
  824. self.server.expect(RESV,
  825. {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')},
  826. id=rid)
  827. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'})
  828. a = {'Resource_List.ncpus': 4,
  829. 'Resource_List.walltime': 60}
  830. J = Job(TEST_USER, attrs=a)
  831. jid = self.server.submit(J)
  832. self.server.alterjob(jid, {'Resource_List.soft_walltime': 60})
  833. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  834. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  835. def test_soft_job_array(self):
  836. """
  837. Test that soft walltime works similar way with subjobs as
  838. regular jobs
  839. """
  840. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'})
  841. J = Job(TEST_USER, attrs={ATTR_J: '1-5',
  842. 'Resource_List.walltime': 15})
  843. jid = self.server.submit(J)
  844. self.server.alterjob(jid, {'Resource_List.soft_walltime': 5})
  845. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  846. self.server.expect(
  847. JOB, {'job_state': 'B', 'Resource_List.soft_walltime': 5}, id=jid)
  848. subjob1 = jid.replace('[]', '[1]')
  849. self.server.expect(
  850. JOB, {'job_state': 'R', 'Resource_List.soft_walltime': 5},
  851. id=subjob1)
  852. self.logger.info("Wait for 6s and make sure that subjob1 is not"
  853. "deleted even past soft_walltime")
  854. time.sleep(6)
  855. self.server.expect(JOB, {'job_state': 'R'}, id=subjob1)
  856. # Make sure the subjob1 is deleted after 15s past walltime limit
  857. self.server.expect(JOB, {'job_state': 'X'}, id=subjob1,
  858. offset=9)