pbs_reservations.py 35 KB


  1. # coding: utf-8
  2. # Copyright (C) 1994-2018 Altair Engineering, Inc.
  3. # For more information, contact Altair at www.altair.com.
  4. #
  5. # This file is part of the PBS Professional ("PBS Pro") software.
  6. #
  7. # Open Source License Information:
  8. #
  9. # PBS Pro is free software. You can redistribute it and/or modify it under the
  10. # terms of the GNU Affero General Public License as published by the Free
  11. # Software Foundation, either version 3 of the License, or (at your option) any
  12. # later version.
  13. #
  14. # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
  15. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16. # FOR A PARTICULAR PURPOSE.
  17. # See the GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. #
  22. # Commercial License Information:
  23. #
  24. # For a copy of the commercial license terms and conditions,
  25. # go to: (http://www.pbspro.com/UserArea/agreement.html)
  26. # or contact the Altair Legal Department.
  27. #
  28. # Altair’s dual-license business model allows companies, individuals, and
  29. # organizations to create proprietary derivative works of PBS Pro and
  30. # distribute them - whether embedded or bundled with other software -
  31. # under a commercial license agreement.
  32. #
  33. # Use of Altair’s trademarks, including but not limited to "PBS™",
  34. # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
  35. # trademark licensing policies.
  36. from tests.functional import *
  37. import time
  38. class TestReservations(TestFunctional):
  39. """
  40. Various tests to verify behavior of PBS scheduler in handling
  41. reservations
  42. """
  43. def submit_standing_reservation(self, user, select, rrule, start, end,
  44. place='free'):
  45. """
  46. helper method to submit a standing reservation
  47. """
  48. if 'PBS_TZID' in self.conf:
  49. tzone = self.conf['PBS_TZID']
  50. elif 'PBS_TZID' in os.environ:
  51. tzone = os.environ['PBS_TZID']
  52. else:
  53. self.logger.info('Missing timezone, using America/Los_Angeles')
  54. tzone = 'America/Los_Angeles'
  55. a = {'Resource_List.select': select,
  56. 'Resource_List.place': place,
  57. ATTR_resv_rrule: rrule,
  58. ATTR_resv_timezone: tzone,
  59. 'reserve_start': start,
  60. 'reserve_end': end,
  61. }
  62. r = Reservation(user, a)
  63. return self.server.submit(r)
  64. def submit_asap_reservation(self, user, jid):
  65. """
  66. Helper method to submit an ASAP reservation
  67. """
  68. a = {ATTR_convert: jid}
  69. r = Reservation(user, a)
  70. # PTL's Reservation class sets the default ATTR_resv_start
  71. # and ATTR_resv_end.
  72. # But pbs_rsub: -Wqmove is not compatible with -R or -E option
  73. # So, unset these attributes from the reservation instance.
  74. r.unset_attributes(['reserve_start', 'reserve_end'])
  75. return self.server.submit(r)
  76. def test_degraded_standing_reservations(self):
  77. """
  78. Verify that degraded standing reservations are reconfirmed on
  79. other avaialable vnodes
  80. """
  81. a = {'reserve_retry_init': 5, 'reserve_retry_cutoff': 1}
  82. self.server.manager(MGR_CMD_SET, SERVER, a)
  83. a = {'resources_available.ncpus': 4}
  84. self.server.create_vnodes('vn', a, num=2, mom=self.mom)
  85. now = int(time.time())
  86. # submitting 25 seconds from now to allow some of the older testbed
  87. # systems time to process (discovered empirically)
  88. rid = self.submit_standing_reservation(user=TEST_USER,
  89. select='1:ncpus=4',
  90. rrule='FREQ=HOURLY;COUNT=3',
  91. start=now + 25,
  92. end=now + 45)
  93. a = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  94. self.server.expect(RESV, a, id=rid)
  95. self.server.status(RESV, 'resv_nodes', id=rid)
  96. resv_node = self.server.reservations[rid].get_vnodes()[0]
  97. a = {'reserve_state': (MATCH_RE, 'RESV_RUNNING|5')}
  98. offset = 25 - (int(time.time()) - now)
  99. self.server.expect(RESV, a, id=rid, offset=offset, interval=1)
  100. a = {'state': 'offline'}
  101. self.server.manager(MGR_CMD_SET, NODE, a, id=resv_node)
  102. a = {'reserve_state': (MATCH_RE, 'RESV_RUNNING|5'),
  103. 'reserve_substate': 10}
  104. self.server.expect(RESV, a, attrop=PTL_AND, id=rid)
  105. a = {'resources_available.ncpus': (GT, 0)}
  106. free_nodes = self.server.filter(NODE, a)
  107. nodes = free_nodes.values()[0]
  108. other_node = [nodes[0], nodes[1]][resv_node == nodes[0]]
  109. a = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2'),
  110. 'resv_nodes': (MATCH_RE, re.escape(other_node))}
  111. offset = 45 - (int(time.time()) - now)
  112. self.server.expect(RESV, a, id=rid, interval=1, offset=offset,
  113. attrop=PTL_AND)
  114. def test_not_honoring_resvs(self):
  115. """
  116. PBS schedules jobs on nodes without accounting
  117. for the reservation on the node
  118. """
  119. a = {'resources_available.ncpus': 4}
  120. self.server.create_vnodes('vn', a, 1, self.mom, usenatvnode=True)
  121. r1 = Reservation(TEST_USER)
  122. a = {'Resource_List.select': '1:ncpus=1', 'reserve_start': int(
  123. time.time() + 5), 'reserve_end': int(time.time() + 15)}
  124. r1.set_attributes(a)
  125. r1id = self.server.submit(r1)
  126. a = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  127. self.server.expect(RESV, a, r1id)
  128. r2 = Reservation(TEST_USER)
  129. a = {'Resource_List.select': '1:ncpus=4', 'reserve_start': int(
  130. time.time() + 600), 'reserve_end': int(time.time() + 7800)}
  131. r2.set_attributes(a)
  132. r2id = self.server.submit(r2)
  133. a = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  134. self.server.expect(RESV, a, r2id)
  135. r1_que = r1id.split('.')[0]
  136. for i in range(20):
  137. j = Job(TEST_USER)
  138. a = {'Resource_List.select': '1:ncpus=1',
  139. 'Resource_List.walltime': 10, 'queue': r1_que}
  140. j.set_attributes(a)
  141. self.server.submit(j)
  142. j1 = Job(TEST_USER)
  143. a = {'Resource_List.select': '1:ncpus=1',
  144. 'Resource_List.walltime': 7200}
  145. j1.set_attributes(a)
  146. j1id = self.server.submit(j1)
  147. j2 = Job(TEST_USER)
  148. a = {'Resource_List.select': '1:ncpus=1',
  149. 'Resource_List.walltime': 7200}
  150. j2.set_attributes(a)
  151. j2id = self.server.submit(j2)
  152. a = {'reserve_state': (MATCH_RE, "RESV_BEING_DELETED|7")}
  153. self.server.expect(RESV, a, id=r1id, interval=1)
  154. a = {'scheduling': 'True'}
  155. self.server.manager(MGR_CMD_SET, SERVER, a)
  156. self.server.expect(JOB, {'job_state': 'Q'}, id=j1id)
  157. self.server.expect(JOB, {'job_state': 'Q'}, id=j2id)
  158. def test_sched_cycle_starts_on_resv_end(self):
  159. """
  160. This test checks whether the sched cycle gets started
  161. when the advance reservation ends.
  162. """
  163. a = {'resources_available.ncpus': 2}
  164. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname,
  165. expect=True, sudo=True)
  166. now = int(time.time())
  167. a = {'Resource_List.select': "1:ncpus=2",
  168. 'reserve_start': now + 10,
  169. 'reserve_end': now + 30,
  170. }
  171. r = Reservation(TEST_USER, a)
  172. rid = self.server.submit(r)
  173. a = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  174. self.server.expect(RESV, a, rid)
  175. attr = {'Resource_List.walltime': '00:00:20'}
  176. j = Job(TEST_USER, attr)
  177. jid = self.server.submit(j)
  178. self.server.expect(JOB, {ATTR_state: 'Q'},
  179. id=jid)
  180. msg = "Job would conflict with reservation or top job"
  181. self.server.expect(JOB, {ATTR_comment: "Not Running: " + msg}, id=jid)
  182. self.scheduler.log_match(
  183. jid + ";" + msg,
  184. max_attempts=30)
  185. a = {'reserve_state': (MATCH_RE, 'RESV_RUNNING|2')}
  186. self.server.expect(RESV, a, rid)
  187. resid = rid.split('.')[0]
  188. self.server.log_match(resid + ";deleted at request of pbs_server",
  189. id=resid, interval=5)
  190. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid)
  191. def test_exclusive_state(self):
  192. """
  193. Test that the resv-exclusive and job-exclusive
  194. states are approprately set
  195. """
  196. a = {'resources_available.ncpus': 1}
  197. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname,
  198. sudo=True)
  199. now = int(time.time())
  200. a = {'Resource_List.select': '1:ncpus=1',
  201. 'Resource_List.place': 'excl', 'reserve_start': now + 30,
  202. 'reserve_end': now + 3600}
  203. r = Reservation(TEST_USER, attrs=a)
  204. rid = self.server.submit(r)
  205. exp_attr = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  206. self.server.expect(RESV, exp_attr, id=rid)
  207. self.logger.info('Waiting 30s for reservation to start')
  208. exp_attr['reserve_state'] = (MATCH_RE, 'RESV_RUNNING|5')
  209. self.server.expect(RESV, exp_attr, id=rid, offset=30)
  210. self.server.expect(NODE, {'state': 'resv-exclusive'},
  211. id=self.server.shortname)
  212. a = {'Resource_List.select': '1:ncpus=1',
  213. 'Resource_List.place': 'excl', 'queue': rid.split('.')[0]}
  214. j = Job(TEST_USER, attrs=a)
  215. jid = self.server.submit(j)
  216. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  217. n = self.server.status(NODE)
  218. states = n[0]['state'].split(',')
  219. self.assertIn('resv-exclusive', states)
  220. self.assertIn('job-exclusive', states)
  221. def test_resv_excl_future_resv(self):
  222. """
  223. Test to see that exclusive reservations in the near term do not
  224. interfere with longer term reservations
  225. """
  226. a = {'resources_available.ncpus': 1}
  227. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname,
  228. sudo=True)
  229. now = int(time.time())
  230. a = {'Resource_List.select': '1:ncpus=1',
  231. 'Resource_List.place': 'excl', 'reserve_start': now + 30,
  232. 'reserve_end': now + 3600}
  233. r1 = Reservation(TEST_USER, attrs=a)
  234. rid1 = self.server.submit(r1)
  235. exp_attr = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  236. self.server.expect(RESV, exp_attr, id=rid1)
  237. a['reserve_start'] = now + 7200
  238. a['reserve_end'] = now + 10800
  239. r2 = Reservation(TEST_USER, attrs=a)
  240. rid2 = self.server.submit(r2)
  241. self.server.expect(RESV, exp_attr, id=rid2)
  242. def test_job_exceed_resv_end(self):
  243. """
  244. Test to see that a job when submitted to a reservation without the
  245. walltime would not show up as exceeding the reservation and
  246. making the scheduler reject future reservations.
  247. """
  248. a = {'resources_available.ncpus': 1}
  249. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname,
  250. sudo=True)
  251. now = int(time.time())
  252. a = {'Resource_List.select': '1:ncpus=1',
  253. 'Resource_List.place': 'excl',
  254. 'reserve_start': now + 30,
  255. 'reserve_end': now + 300}
  256. r = Reservation(TEST_USER, attrs=a)
  257. rid = self.server.submit(r)
  258. exp_attr = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  259. self.server.expect(RESV, exp_attr, id=rid)
  260. self.logger.info('Waiting 30s for reservation to start')
  261. exp_attr['reserve_state'] = (MATCH_RE, 'RESV_RUNNING|5')
  262. self.server.expect(RESV, exp_attr, id=rid, offset=30)
  263. # Submit a job but do not specify walltime, scheduler will consider
  264. # the walltime of such a job to be 5 years
  265. a = {'Resource_List.select': '1:ncpus=1',
  266. 'Resource_List.place': 'excl',
  267. 'queue': rid.split('.')[0]}
  268. j = Job(TEST_USER, attrs=a)
  269. jid = self.server.submit(j)
  270. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  271. # Submit another reservation that will start after first
  272. a = {'Resource_List.select': '1:ncpus=1',
  273. 'reserve_start': now + 360,
  274. 'reserve_end': now + 3600}
  275. r2 = Reservation(TEST_USER, attrs=a)
  276. rid2 = self.server.submit(r2)
  277. exp_attr = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  278. self.server.expect(RESV, exp_attr, id=rid2)
  279. def test_future_resv_conflicts_running_job(self):
  280. """
  281. Test if a running exclusive job without walltime will deny the future
  282. resv from getting confirmed.
  283. """
  284. now = int(time.time())
  285. # Submit a job but do not specify walltime, scheduler will consider
  286. # the walltime of such a job to be 5 years
  287. a = {'Resource_List.select': '1:ncpus=1',
  288. 'Resource_List.place': 'excl'}
  289. j = Job(TEST_USER, attrs=a)
  290. jid = self.server.submit(j)
  291. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  292. # Submit a reservation that will start after the job starts running
  293. a = {'Resource_List.select': '1:ncpus=1',
  294. 'Resource_List.place': 'excl',
  295. 'reserve_start': now + 360,
  296. 'reserve_end': now + 3600}
  297. r1 = Reservation(TEST_USER, attrs=a)
  298. rid1 = self.server.submit(r1)
  299. self.server.log_match(rid1 + ";Reservation denied",
  300. id=rid1, interval=5)
  301. def test_future_resv_confirms_after_running_job(self):
  302. """
  303. Test if a future reservation gets confirmed if its start time starts
  304. after the end time of a job running in an exclusive reservation
  305. """
  306. a = {'resources_available.ncpus': 1}
  307. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname,
  308. sudo=True)
  309. now = int(time.time())
  310. a = {'Resource_List.select': '1:ncpus=1',
  311. 'Resource_List.place': 'excl',
  312. 'reserve_start': now + 30,
  313. 'reserve_end': now + 300}
  314. r = Reservation(TEST_USER, attrs=a)
  315. rid = self.server.submit(r)
  316. exp_attr = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  317. self.server.expect(RESV, exp_attr, id=rid)
  318. self.logger.info('Waiting 30s for reservation to start')
  319. exp_attr['reserve_state'] = (MATCH_RE, 'RESV_RUNNING|5')
  320. self.server.expect(RESV, exp_attr, id=rid, offset=30)
  321. # Submit a job with walltime exceeding reservation duration
  322. a = {'Resource_List.select': '1:ncpus=1',
  323. 'Resource_List.place': 'excl',
  324. 'Resource_List.walltime': 600,
  325. 'queue': rid.split('.')[0]}
  326. j = Job(TEST_USER, attrs=a)
  327. jid = self.server.submit(j)
  328. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  329. # Submit another reservation that will start after the job ends
  330. a = {'Resource_List.select': '1:ncpus=1',
  331. 'reserve_start': now + 630,
  332. 'reserve_end': now + 3600}
  333. r2 = Reservation(TEST_USER, attrs=a)
  334. rid2 = self.server.submit(r2)
  335. exp_attr = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  336. self.server.expect(RESV, exp_attr, id=rid2)
  337. def test_future_resv_confirms_before_non_excl_job(self):
  338. """
  339. Test if a future reservation gets confirmed if its start time starts
  340. before the end time of a non exclusive job running in an exclusive
  341. reservation.
  342. """
  343. a = {'resources_available.ncpus': 1}
  344. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname,
  345. sudo=True)
  346. now = int(time.time())
  347. a = {'Resource_List.select': '1:ncpus=1',
  348. 'Resource_List.place': 'excl',
  349. 'reserve_start': now + 30,
  350. 'reserve_end': now + 300}
  351. r = Reservation(TEST_USER, attrs=a)
  352. rid = self.server.submit(r)
  353. exp_attr = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  354. self.server.expect(RESV, exp_attr, id=rid)
  355. self.logger.info('Waiting 30s for reservation to start')
  356. exp_attr['reserve_state'] = (MATCH_RE, 'RESV_RUNNING|5')
  357. self.server.expect(RESV, exp_attr, id=rid, offset=30)
  358. # Submit a job with walltime exceeding reservation duration
  359. a = {'Resource_List.select': '1:ncpus=1',
  360. 'Resource_List.walltime': 600,
  361. 'queue': rid.split('.')[0]}
  362. j = Job(TEST_USER, attrs=a)
  363. jid = self.server.submit(j)
  364. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  365. # Submit another reservation that will start after the first
  366. # reservation ends
  367. a = {'Resource_List.select': '1:ncpus=1',
  368. 'reserve_start': now + 330,
  369. 'reserve_end': now + 3600}
  370. r2 = Reservation(TEST_USER, attrs=a)
  371. rid2 = self.server.submit(r2)
  372. exp_attr = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  373. self.server.expect(RESV, exp_attr, id=rid2)
  374. def test_future_resv_with_non_excl_jobs(self):
  375. """
  376. Test if future reservations with/without exclusive placement are
  377. confirmed if their start time starts before end time of non exclusive
  378. jobs that are running in reservation.
  379. """
  380. a = {'resources_available.ncpus': 1}
  381. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname,
  382. sudo=True)
  383. now = int(time.time())
  384. a = {'Resource_List.select': '1:ncpus=1',
  385. 'reserve_start': now + 30,
  386. 'reserve_end': now + 300}
  387. r = Reservation(TEST_USER, attrs=a)
  388. rid = self.server.submit(r)
  389. exp_attr = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  390. self.server.expect(RESV, exp_attr, id=rid)
  391. self.logger.info('Waiting 30s for reservation to start')
  392. exp_attr['reserve_state'] = (MATCH_RE, 'RESV_RUNNING|5')
  393. self.server.expect(RESV, exp_attr, id=rid, offset=30)
  394. # Submit a job with walltime exceeding reservation
  395. a = {'Resource_List.select': '1:ncpus=1',
  396. 'Resource_List.walltime': 600,
  397. 'queue': rid.split('.')[0]}
  398. j = Job(TEST_USER, attrs=a)
  399. jid = self.server.submit(j)
  400. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  401. # Submit another non exclusive reservation that will start after
  402. # previous reservation ends but before job's walltime is over.
  403. a = {'Resource_List.select': '1:ncpus=1',
  404. 'reserve_start': now + 330,
  405. 'reserve_end': now + 3600}
  406. r2 = Reservation(TEST_USER, attrs=a)
  407. rid2 = self.server.submit(r2)
  408. exp_attr = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  409. self.server.expect(RESV, exp_attr, id=rid2)
  410. self.server.delete(rid2)
  411. # Submit another exclusive reservation that will start after
  412. # previous reservation ends but before job's walltime is over.
  413. a = {'Resource_List.select': '1:ncpus=1',
  414. 'Resource_List.place': 'excl',
  415. 'reserve_start': now + 330,
  416. 'reserve_end': now + 3600}
  417. r3 = Reservation(TEST_USER, attrs=a)
  418. rid3 = self.server.submit(r3)
  419. exp_attr = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  420. self.server.expect(RESV, exp_attr, id=rid3)
  421. def test_resv_excl_with_jobs(self):
  422. """
  423. Test to see that exclusive reservations in the near term do not
  424. interfere with longer term reservations with jobs inside
  425. """
  426. a = {'resources_available.ncpus': 1}
  427. self.server.manager(MGR_CMD_SET, NODE, a, id=self.server.shortname)
  428. now = int(time.time())
  429. a = {'Resource_List.select': '1:ncpus=1',
  430. 'Resource_List.place': 'excl', 'reserve_start': now + 30,
  431. 'reserve_end': now + 300}
  432. r = Reservation(TEST_USER, attrs=a)
  433. rid = self.server.submit(r)
  434. exp_attr = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  435. self.server.expect(RESV, exp_attr, id=rid)
  436. self.logger.info('Waiting 30s for reservation to start')
  437. exp_attr['reserve_state'] = (MATCH_RE, 'RESV_RUNNING|5')
  438. self.server.expect(RESV, exp_attr, id=rid, offset=30)
  439. a = {'Resource_List.select': '1:ncpus=1',
  440. 'Resource_List.place': 'excl',
  441. 'Resource_List.walltime': '30',
  442. 'queue': rid.split('.')[0]}
  443. j = Job(TEST_USER, attrs=a)
  444. jid = self.server.submit(j)
  445. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  446. # Submit another reservation that will start after first
  447. a = {'Resource_List.select': '1:ncpus=1',
  448. 'Resource_List.place': 'excl', 'reserve_start': now + 360,
  449. 'reserve_end': now + 3600}
  450. r2 = Reservation(TEST_USER, attrs=a)
  451. rid2 = self.server.submit(r2)
  452. exp_attr = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  453. self.server.expect(RESV, exp_attr, id=rid2)
  454. def test_resv_server_restart(self):
  455. """
  456. Test if a reservation correctly goes into the resv-exclusive state
  457. if the server is restarted between when the reservation gets
  458. confirmed and when it starts
  459. """
  460. now = int(time.time())
  461. start = now + 30
  462. a = {'reserve_start': start, 'reserve_end': start + 300,
  463. 'Resource_List.select': '1:ncpus=1:vnode=' +
  464. self.server.shortname, 'Resource_List.place': 'excl'}
  465. r = Reservation(TEST_USER, a)
  466. rid = self.server.submit(r)
  467. a = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  468. self.server.expect(RESV, a, id=rid)
  469. self.server.restart()
  470. sleep_time = start - int(time.time())
  471. self.logger.info('Waiting %d seconds till resv starts' % sleep_time)
  472. a = {'reserve_state': (MATCH_RE, 'RESV_RUNNING|5')}
  473. self.server.expect(RESV, a, id=rid, offset=sleep_time)
  474. self.server.expect(NODE, {'state': 'resv-exclusive'},
  475. id=self.server.shortname)
  476. def test_multiple_asap_resv(self):
  477. """
  478. Test that multiple ASAP reservations are scheduled one after another
  479. """
  480. self.server.manager(MGR_CMD_SET, NODE,
  481. {'resources_available.ncpus': 1},
  482. id=self.server.shortname)
  483. job_attrs = {'Resource_List.select': '1:ncpus=1',
  484. 'Resource_List.walltime': '1:00:00'}
  485. j = Job(TEST_USER, attrs=job_attrs)
  486. jid1 = self.server.submit(j)
  487. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  488. s = self.server.status(JOB, 'stime', id=jid1)
  489. job_stime = int(time.mktime(time.strptime(s[0]['stime'], '%c')))
  490. j = Job(TEST_USER, attrs=job_attrs)
  491. jid2 = self.server.submit(j)
  492. self.server.expect(JOB, 'comment', op=SET, id=jid2)
  493. self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
  494. rid1 = self.submit_asap_reservation(TEST_USER, jid2)
  495. exp_attrs = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  496. self.server.expect(RESV, exp_attrs, id=rid1)
  497. s = self.server.status(RESV, 'reserve_start', id=rid1)
  498. resv1_stime = int(time.mktime(
  499. time.strptime(s[0]['reserve_start'], '%c')))
  500. msg = 'ASAP reservation has incorrect start time'
  501. self.assertEqual(resv1_stime, job_stime + 3600, msg)
  502. j = Job(TEST_USER, attrs=job_attrs)
  503. jid3 = self.server.submit(j)
  504. self.server.expect(JOB, 'comment', op=SET, id=jid3)
  505. self.server.expect(JOB, {'job_state': 'Q'}, id=jid3)
  506. rid2 = self.submit_asap_reservation(TEST_USER, jid3)
  507. exp_attrs = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  508. self.server.expect(RESV, exp_attrs, id=rid2)
  509. s = self.server.status(RESV, 'reserve_start', id=rid2)
  510. resv2_stime = int(time.mktime(
  511. time.strptime(s[0]['reserve_start'], '%c')))
  512. msg = 'ASAP reservation has incorrect start time'
  513. self.assertEqual(resv2_stime, resv1_stime + 3600, msg)
  514. def test_excl_asap_resv_before_longterm_resvs(self):
  515. """
  516. Test if an ASAP reservation created from an exclusive
  517. placement job does not interfere with subsequent long
  518. term advance and standing exclusive reservations
  519. """
  520. a = {'resources_available.ncpus': 1}
  521. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname)
  522. # Submit a job and let it run with available resources
  523. a = {'Resource_List.select': '1:ncpus=1',
  524. 'Resource_List.walltime': 30}
  525. j1 = Job(TEST_USER, attrs=a)
  526. jid1 = self.server.submit(j1)
  527. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  528. # Submit a second job with exclusive node placement
  529. # and let it be queued
  530. a = {'Resource_List.select': '1:ncpus=1',
  531. 'Resource_List.walltime': 300,
  532. 'Resource_List.place': 'excl'}
  533. j2 = Job(TEST_USER, attrs=a)
  534. jid2 = self.server.submit(j2)
  535. self.server.expect(JOB, 'comment', op=SET, id=jid2)
  536. self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
  537. # Convert j2 into an ASAP reservation
  538. rid1 = self.submit_asap_reservation(user=TEST_USER,
  539. jid=jid2)
  540. exp_attr = {'reserve_state': (MATCH_RE, "RESV_CONFIRMED|2")}
  541. self.server.expect(RESV, exp_attr, id=rid1)
  542. # Wait for the reservation to start
  543. self.logger.info('Waiting 30 seconds for reservation to start')
  544. exp_attr = {'reserve_state': (MATCH_RE, "RESV_RUNNING|5")}
  545. self.server.expect(RESV, exp_attr, id=rid1, offset=30)
  546. # Submit a long term reservation with exclusive node
  547. # placement when rid1 is running
  548. # This reservation should be confirmed
  549. now = int(time.time())
  550. a = {'Resource_List.select': '1:ncpus=1',
  551. 'Resource_List.place': 'excl',
  552. 'reserve_start': now + 3600,
  553. 'reserve_end': now + 3605}
  554. r2 = Reservation(TEST_USER, attrs=a)
  555. rid2 = self.server.submit(r2)
  556. exp_attr = {'reserve_state': (MATCH_RE, "RESV_CONFIRMED|2")}
  557. self.server.expect(RESV, exp_attr, id=rid2)
  558. # Submit a long term standing reservation with exclusive node
  559. # placement when rid1 is running
  560. # This reservation should also be confirmed
  561. now = int(time.time())
  562. rid3 = self.submit_standing_reservation(user=TEST_USER,
  563. select='1:ncpus=1',
  564. place='excl',
  565. rrule='FREQ=HOURLY;COUNT=3',
  566. start=now + 7200,
  567. end=now + 7205)
  568. exp_attr = {'reserve_state': (MATCH_RE, "RESV_CONFIRMED|2")}
  569. self.server.expect(RESV, exp_attr, id=rid3)
  570. def test_excl_asap_resv_after_longterm_resvs(self):
  571. """
  572. Test if an exclusive ASAP reservation created from an exclusive
  573. placement job does not interfere with already existing long term
  574. exclusive reservations.
  575. Also, test if future exclusive reservations are successful when
  576. the ASAP reservation is running.
  577. """
  578. a = {'resources_available.ncpus': 1}
  579. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname)
  580. # Submit a long term advance reservation with exclusive node
  581. now = int(time.time())
  582. a = {'Resource_List.select': '1:ncpus=1',
  583. 'Resource_List.place': 'excl',
  584. 'reserve_start': now + 360,
  585. 'reserve_end': now + 365}
  586. r1 = Reservation(TEST_USER, attrs=a)
  587. rid1 = self.server.submit(r1)
  588. exp_attr = {'reserve_state': (MATCH_RE, "RESV_CONFIRMED|2")}
  589. self.server.expect(RESV, exp_attr, id=rid1)
  590. # Submit a long term standing reservation with exclusive node
  591. now = int(time.time())
  592. rid2 = self.submit_standing_reservation(user=TEST_USER,
  593. select='1:ncpus=1',
  594. place='excl',
  595. rrule='FREQ=HOURLY;COUNT=3',
  596. start=now + 3600,
  597. end=now + 3605)
  598. exp_attr = {'reserve_state': (MATCH_RE, "RESV_CONFIRMED|2")}
  599. self.server.expect(RESV, exp_attr, id=rid2)
  600. # Submit a job and let it run with available resources
  601. a = {'Resource_List.select': '1:ncpus=1',
  602. 'Resource_List.walltime': 30}
  603. j1 = Job(TEST_USER, attrs=a)
  604. jid1 = self.server.submit(j1)
  605. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  606. # Submit a second job with exclusive node placement
  607. # and let it be queued
  608. a = {'Resource_List.select': '1:ncpus=1',
  609. 'Resource_List.walltime': 300,
  610. 'Resource_List.place': 'excl'}
  611. j2 = Job(TEST_USER, attrs=a)
  612. jid2 = self.server.submit(j2)
  613. self.server.expect(JOB, 'comment', op=SET, id=jid2)
  614. self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
  615. # Convert j2 into an ASAP reservation
  616. rid1 = self.submit_asap_reservation(user=TEST_USER,
  617. jid=jid2)
  618. exp_attr = {'reserve_state': (MATCH_RE, "RESV_CONFIRMED|2")}
  619. self.server.expect(RESV, exp_attr, id=rid1)
  620. # Wait for the reservation to start
  621. self.logger.info('Waiting 30 seconds for reservation to start')
  622. exp_attr = {'reserve_state': (MATCH_RE, "RESV_RUNNING|5")}
  623. self.server.expect(RESV, exp_attr, id=rid1, offset=30)
  624. # Submit a long term reservation with exclusive node
  625. # placement when rid1 is running
  626. # This reservation should be confirmed
  627. now = int(time.time())
  628. a = {'Resource_List.select': '1:ncpus=1',
  629. 'Resource_List.place': 'excl',
  630. 'reserve_start': now + 3600,
  631. 'reserve_end': now + 3605}
  632. r3 = Reservation(TEST_USER, attrs=a)
  633. rid3 = self.server.submit(r3)
  634. exp_attr = {'reserve_state': (MATCH_RE, "RESV_CONFIRMED|2")}
  635. self.server.expect(RESV, exp_attr, id=rid3)
  636. def test_multi_vnode_excl_advance_resvs(self):
  637. """
  638. Test if long term exclusive reservations do not interfere
  639. with current reservations on a multi-vnoded host
  640. """
  641. a = {'resources_available.ncpus': 4}
  642. self.server.create_vnodes('vn', a, num=3, mom=self.mom)
  643. # Submit a long term standing reservation with
  644. # exclusive nodes.
  645. now = int(time.time())
  646. rid1 = self.submit_standing_reservation(user=TEST_USER,
  647. select='1:ncpus=9',
  648. place='excl',
  649. rrule='FREQ=HOURLY;COUNT=3',
  650. start=now + 7200,
  651. end=now + 7205)
  652. exp_attr = {'reserve_state': (MATCH_RE, "RESV_CONFIRMED|2")}
  653. self.server.expect(RESV, exp_attr, id=rid1)
  654. # Submit a long term advance reservation with exclusive node
  655. now = int(time.time())
  656. a = {'Resource_List.select': '1:ncpus=10',
  657. 'Resource_List.place': 'excl',
  658. 'reserve_start': now + 3600,
  659. 'reserve_end': now + 3605}
  660. r2 = Reservation(TEST_USER, attrs=a)
  661. rid2 = self.server.submit(r2)
  662. exp_attr = {'reserve_state': (MATCH_RE, "RESV_CONFIRMED|2")}
  663. self.server.expect(RESV, exp_attr, id=rid2)
  664. # Submit a short term reservation requesting all the nodes
  665. # exclusively
  666. now = int(time.time())
  667. a = {'Resource_List.select': '1:ncpus=12',
  668. 'Resource_List.place': 'excl',
  669. 'reserve_start': now + 20,
  670. 'reserve_end': now + 100}
  671. r3 = Reservation(TEST_USER, attrs=a)
  672. rid3 = self.server.submit(r3)
  673. exp_attr = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
  674. self.server.expect(RESV, exp_attr, id=rid3)
  675. exp_attr['reserve_state'] = (MATCH_RE, 'RESV_RUNNING|5')
  676. self.server.expect(RESV, exp_attr, id=rid3, offset=30)
  677. def test_multi_vnode_excl_asap_resv(self):
  678. """
  679. Test if an ASAP reservation created from a excl placement
  680. job does not interfere with future multinode exclusive
  681. reservations on a multi-vnoded host
  682. """
  683. a = {'resources_available.ncpus': 4}
  684. self.server.create_vnodes('vn', a, num=3, mom=self.mom)
  685. # Submit 3 exclusive jobs, so all the nodes are busy
  686. # j1 requesting 4 cpus, j2 requesting 4 cpus and j3
  687. # requesting 5 cpus
  688. a = {'Resource_List.select': '1:ncpus=4',
  689. 'Resource_List.place': 'excl',
  690. 'Resource_List.walltime': 30}
  691. j1 = Job(TEST_USER, attrs=a)
  692. jid1 = self.server.submit(j1)
  693. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  694. a['Resource_List.walltime'] = 400
  695. j2 = Job(TEST_USER, attrs=a)
  696. jid2 = self.server.submit(j2)
  697. self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
  698. a = {'Resource_List.select': '1:ncpus=5',
  699. 'Resource_List.place': 'excl',
  700. 'Resource_List.walltime': 100}
  701. j3 = Job(TEST_USER, attrs=a)
  702. jid3 = self.server.submit(j3)
  703. self.server.expect(JOB, 'comment', op=SET, id=jid3)
  704. self.server.expect(JOB, {'job_state': 'Q'}, id=jid3)
  705. # Convert J3 to ASAP reservation
  706. rid1 = self.submit_asap_reservation(user=TEST_USER,
  707. jid=jid3)
  708. exp_attr = {'reserve_state': (MATCH_RE, "RESV_CONFIRMED|2")}
  709. self.server.expect(RESV, exp_attr, id=rid1)
  710. # Wait for the reservation to start
  711. self.logger.info('Waiting 30 seconds for reservation to start')
  712. exp_attr = {'reserve_state': (MATCH_RE, "RESV_RUNNING|5")}
  713. self.server.expect(RESV, exp_attr, id=rid1, offset=30)
  714. # Submit a long term reservation with exclusive node
  715. # placement when rid1 is running (requesting all nodes)
  716. # This reservation should be confirmed
  717. now = int(time.time())
  718. a = {'Resource_List.select': '1:ncpus=12',
  719. 'Resource_List.place': 'excl',
  720. 'reserve_start': now + 3600,
  721. 'reserve_end': now + 3605}
  722. r2 = Reservation(TEST_USER, attrs=a)
  723. rid2 = self.server.submit(r2)
  724. exp_attr = {'reserve_state': (MATCH_RE, "RESV_CONFIRMED|2")}
  725. self.server.expect(RESV, exp_attr, id=rid2)
  726. def test_fail_confirm_resv_message(self):
  727. """
  728. Test if the scheduler fails to reserve a
  729. reservation, the reason will be logged.
  730. """
  731. a = {'resources_available.ncpus': 1}
  732. self.server.manager(MGR_CMD_SET, NODE, a, id=self.mom.shortname)
  733. # Submit a long term advance reservation that will be denied
  734. now = int(time.time())
  735. a = {'Resource_List.select': '1:ncpus=10',
  736. 'reserve_start': now + 360,
  737. 'reserve_end': now + 365}
  738. rid = self.server.submit(Reservation(TEST_USER, attrs=a))
  739. self.server.log_match(rid + ";Reservation denied",
  740. id=rid, interval=5)
  741. # The scheduler should log reason why it was denied
  742. self.scheduler.log_match(rid + ";PBS Failed to confirm resv: " +
  743. "Insufficient amount of resource: ncpus")