pbs_provisioning_enhancement.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. # coding: utf-8
  2. # Copyright (C) 1994-2018 Altair Engineering, Inc.
  3. # For more information, contact Altair at www.altair.com.
  4. #
  5. # This file is part of the PBS Professional ("PBS Pro") software.
  6. #
  7. # Open Source License Information:
  8. #
  9. # PBS Pro is free software. You can redistribute it and/or modify it under the
  10. # terms of the GNU Affero General Public License as published by the Free
  11. # Software Foundation, either version 3 of the License, or (at your option) any
  12. # later version.
  13. #
  14. # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
  15. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16. # FOR A PARTICULAR PURPOSE.
  17. # See the GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. #
  22. # Commercial License Information:
  23. #
  24. # For a copy of the commercial license terms and conditions,
  25. # go to: (http://www.pbspro.com/UserArea/agreement.html)
  26. # or contact the Altair Legal Department.
  27. #
  28. # Altair’s dual-license business model allows companies, individuals, and
  29. # organizations to create proprietary derivative works of PBS Pro and
  30. # distribute them - whether embedded or bundled with other software -
  31. # under a commercial license agreement.
  32. #
  33. # Use of Altair’s trademarks, including but not limited to "PBS™",
  34. # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
  35. # trademark licensing policies.
  36. from tests.functional import *
  37. class TestProvisioningJob_Enh(TestFunctional):
  38. """
  39. This testsuite tests newly introduced provisioining capabilities.
  40. With this enhacement, PBS will be able to run job requesting aoe
  41. in subchunks, just like any other custom non consumable resource.
  42. PRE: Have a cluster of PBS with two MOM's installed, with one MOM
  43. on a node other than PBS server host. Pass the provisionable mom
  44. first in pbs_bencpress.
  45. Eg. pbs_bencpress -p moms=second_node:server_node ...
  46. """
  47. fake_prov_hook = """
  48. import pbs
  49. import time
  50. e = pbs.event()
  51. vnode = e.vnode
  52. aoe = e.aoe
  53. if aoe == 'App1':
  54. pbs.logmsg(pbs.LOG_DEBUG, "fake application provisioning script")
  55. e.accept(1)
  56. pbs.logmsg(pbs.LOG_DEBUG, "aoe=%s,vnode=%s" % (aoe,vnode))
  57. pbs.logmsg(pbs.LOG_DEBUG, "fake os provisioning script")
  58. e.accept(0)
  59. """
  60. reject_runjob_hook = """
  61. import pbs
  62. e = pbs.event()
  63. j = e.job
  64. pbs.logmsg(pbs.LOG_DEBUG, "job " + str(j) + " solution " + str(j.exec_vnode))
  65. e.reject()
  66. """
  67. def setUp(self):
  68. if len(self.moms) < 2:
  69. self.skipTest("Provide at least 2 moms while invoking test")
  70. TestFunctional.setUp(self)
  71. # This test suite expects the the first mom given with "-p moms"
  72. # benchpress option to be remote mom. In case this assumption
  73. # is not true then it reverses the order in the setup.
  74. if self.moms.values()[0].shortname == self.server.shortname:
  75. self.momA = self.moms.values()[1]
  76. self.momB = self.moms.values()[0]
  77. else:
  78. self.momA = self.moms.values()[0]
  79. self.momB = self.moms.values()[1]
  80. self.hostA = self.momA.shortname
  81. self.hostB = self.momB.shortname
  82. # Remove all nodes
  83. self.server.manager(MGR_CMD_DELETE, NODE, None, "")
  84. # Restart PBS
  85. self.server.restart()
  86. # Create node
  87. self.server.manager(MGR_CMD_CREATE, NODE, None, self.hostA)
  88. self.server.manager(MGR_CMD_CREATE, NODE, None, self.hostB)
  89. self.server.expect(NODE, {'state': 'free'}, id=self.hostA)
  90. self.server.expect(NODE, {'state': 'free'}, id=self.hostB)
  91. # Set hostA provisioning attributes.
  92. a = {'provision_enable': 'true',
  93. 'resources_available.ncpus': '2',
  94. 'resources_available.aoe': 'App1,osimage1'}
  95. self.server.manager(
  96. MGR_CMD_SET, NODE, a, id=self.hostA, expect=True)
  97. self.server.manager(MGR_CMD_UNSET, NODE, id=self.hostA,
  98. attrib='current_aoe', expect=True)
  99. # Set hostB ncpus to 12
  100. a = {'resources_available.ncpus': '12'}
  101. self.server.manager(
  102. MGR_CMD_SET, NODE, a, id=self.hostB, expect=True)
  103. # Setup provisioning hook.
  104. a = {'event': 'provision', 'enabled': 'True', 'alarm': '300'}
  105. rv = self.server.create_import_hook(
  106. 'fake_prov_hook', a, self.fake_prov_hook, overwrite=True)
  107. self.assertTrue(rv)
  108. self.server.manager(MGR_CMD_SET, SERVER, {'log_events': 2047},
  109. expect=True)
  110. def test_app_provisioning(self):
  111. """
  112. Test application provisioning
  113. """
  114. j = Job(TEST_USER1)
  115. j.set_sleep_time(5)
  116. j.set_attributes({'Resource_List.select': '1:aoe=App1'})
  117. jid = self.server.submit(j)
  118. # Job should start running after provisioining script finish
  119. # executing.
  120. # Since this is application provisioining, mom restart is
  121. # not needed.
  122. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  123. # Current aoe on momA, should be set to the requested aoe in job.
  124. self.server.expect(NODE, {'current_aoe': 'App1'}, id=self.hostA)
  125. self.server.log_match(
  126. "fake application provisioning script",
  127. max_attempts=20,
  128. interval=1)
  129. def test_os_provisioning(self):
  130. """
  131. Test os provisioning
  132. """
  133. j = Job(TEST_USER1)
  134. j.set_sleep_time(10)
  135. j.set_attributes({'Resource_List.select': '1:aoe=osimage1'})
  136. jid = self.server.submit(j)
  137. # Job will start and wait for provisioning to complete.
  138. self.server.expect(JOB, {ATTR_substate: '71'}, id=jid)
  139. self.server.log_match("fake os provisioning script",
  140. max_attempts=60,
  141. interval=1)
  142. # Since this is OS provisioining, mom restart is
  143. # required to finish provisioining. Sending SIGHUP
  144. # to the provisioning mom will also work, but restart
  145. # is more apt to simulate real world scenario.
  146. self.momA.restart()
  147. # Current aoe on momA should be set to the requested aoe in job.
  148. self.server.expect(NODE, {'current_aoe': 'osimage1'}, id=self.hostA)
  149. # After mom restart job execution should start, as
  150. # OS provisioining completes affer mom restart.
  151. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid)
  152. def test_subchunk_application_provisioning(self):
  153. """
  154. Test application provisioning job request consist of subchunks
  155. with and without aoe resource.
  156. """
  157. j = Job(TEST_USER1)
  158. j.set_attributes({'Resource_List.select':
  159. '1:ncpus=1:aoe=App1+1:ncpus=12'})
  160. j.set_sleep_time(5)
  161. jid = self.server.submit(j)
  162. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid)
  163. self.server.expect(JOB, ATTR_execvnode, id=jid, op=SET)
  164. nodes = j.get_vnodes(j.exec_vnode)
  165. self.server.log_match("fake application provisioning script",
  166. max_attempts=20,
  167. interval=1)
  168. self.assertTrue((nodes[0] == self.momA.shortname and
  169. nodes[1] == self.momB.shortname) or
  170. (nodes[0] == self.momB.shortname and
  171. nodes[1] == self.momA.shortname))
  172. # Current aoe on momA, should be set to the requested aoe in job.
  173. self.server.expect(NODE, {'current_aoe': 'App1'}, id=self.hostA)
  174. def test_subchunk_os_provisioning(self):
  175. """
  176. Test os provisioning job request consist of subchunks
  177. with and without aoe resource.
  178. """
  179. a = {'Resource_List.select': '1:aoe=osimage1+1:ncpus=12'}
  180. j = Job(TEST_USER1, a)
  181. j.set_sleep_time(10)
  182. jid = self.server.submit(j)
  183. self.server.expect(JOB, ATTR_execvnode, id=jid, op=SET)
  184. nodes = j.get_vnodes(j.exec_vnode)
  185. self.assertTrue((nodes[0] == self.momA.shortname and
  186. nodes[1] == self.momB.shortname) or
  187. (nodes[0] == self.momB.shortname and
  188. nodes[1] == self.momA.shortname))
  189. self.momA.restart()
  190. # After mom restart job execution should start, as
  191. # OS provisioining completes affer mom restart.
  192. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid)
  193. # Current aoe on momA, should be set to the requested aoe in job.
  194. self.server.expect(NODE, {'current_aoe': 'osimage1'}, id=self.hostA)
  195. def test_job_wide_provisioining_request(self):
  196. """
  197. Test jobs with jobwide aoe resource request.
  198. """
  199. # Below job will not run, since resource requested are job-wide,
  200. # and no single node have all the requested resource.
  201. j = Job(TEST_USER1)
  202. j.set_sleep_time(5)
  203. j.set_attributes({"Resource_List.aoe": "App1",
  204. "Resource_List.ncpus": 12})
  205. jid = self.server.submit(j)
  206. self.server.expect(JOB, {ATTR_state: 'Q',
  207. ATTR_comment:
  208. (MATCH_RE, 'Not Running: Insufficient ' +
  209. 'amount of resource: .*')}, id=jid)
  210. j = Job(TEST_USER1)
  211. j.set_attributes({"Resource_List.aoe": "App1",
  212. "Resource_List.ncpus": 1})
  213. jid = self.server.submit(j)
  214. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid)
  215. # Current aoe on momA, should be set to the requested aoe in job.
  216. self.server.expect(NODE, {'current_aoe': 'App1'}, id=self.hostA)
  217. def test_multiple_aoe_request(self):
  218. """
  219. Test jobs with multiple similar/various aoe request in subchunks.
  220. Job request cosisting of multiple subchunks with different aoe will
  221. fail submission, Whereas job request with same aoe across multiple
  222. subchunks should be succesful.
  223. """
  224. a1 = {'Resource_List.select':
  225. '1:ncpus=1:aoe=App1+1:ncpus=12:aoe=osimage1'}
  226. a2 = {'Resource_List.select':
  227. '1:ncpus=1:aoe=App1+1:ncpus=12:aoe=App1'}
  228. # Below job will fail submission, since different aoe's requested,
  229. # across multiple subchunks.
  230. j = Job(TEST_USER1)
  231. j.set_attributes(a1)
  232. j.set_sleep_time(5)
  233. jid = None
  234. try:
  235. jid = self.server.submit(j)
  236. self.assertTrue(jid is None, 'Job successfully submitted' +
  237. 'when it should have failed')
  238. except PbsSubmitError as e:
  239. self.assertTrue('Invalid provisioning request in chunk(s)'
  240. in e.msg[0],
  241. 'Job submission failed, but due to ' +
  242. 'unexpected reason.\n%s' % e.msg[0])
  243. self.logger.info("Job submission failed, as expected")
  244. # Below job will get submitted, since same aoe requested,
  245. # across multiple subchunks.
  246. j = Job(TEST_USER1)
  247. j.set_attributes(a2)
  248. jid = self.server.submit(j)
  249. self.assertTrue(jid is not None, 'Job submission failed' +
  250. 'when it should have succeeded')
  251. self.logger.info("Job submission succeeded, as expected")
  252. def test_provisioning_with_placement(self):
  253. """
  254. Test provisioining job with various placement options.
  255. """
  256. # Below job will not run, since placement is set to pack.
  257. # and no single node have all the requested resource.
  258. j = Job(TEST_USER1)
  259. j.set_attributes({'Resource_List.select':
  260. '1:ncpus=1:aoe=App1+1:ncpus=12',
  261. 'Resource_List.place': 'pack'})
  262. j.set_sleep_time(5)
  263. jid = self.server.submit(j)
  264. self.server.expect(JOB, {ATTR_state: 'Q',
  265. ATTR_comment:
  266. (MATCH_RE, 'Not Running: Insufficient ' +
  267. 'amount of resource: .*')}, id=jid)
  268. # Below job will run with placement set to pack.
  269. # since there is only one node with the requested resource.
  270. j = Job(TEST_USER1)
  271. j.set_attributes({'Resource_List.select':
  272. '1:ncpus=1:aoe=App1+1:ncpus=1',
  273. 'Resource_List.place': 'pack'})
  274. j.set_sleep_time(5)
  275. jid = self.server.submit(j)
  276. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  277. self.server.expect(JOB, ATTR_execvnode, id=jid, op=SET)
  278. nodes = j.get_vnodes(j.exec_vnode)
  279. self.assertTrue(nodes[0] == self.momA.shortname)
  280. # Current aoe on momA, should be set to the requested aoe in job.
  281. self.server.expect(NODE, {'current_aoe': 'App1'}, id=self.hostA)
  282. # This was needed since sometime the above job takes longer
  283. # to finish and release the resources. This causes delay for
  284. # the next job to start and can probably fail the test.
  285. self.server.cleanup_jobs(extend='force')
  286. # Below job will run on two node with placement set to scatter.
  287. # even though single node can satisfy both the requested chunks.
  288. j = Job(TEST_USER1)
  289. j.set_attributes({'Resource_List.select':
  290. '1:ncpus=1:aoe=App1+1:ncpus=1',
  291. 'Resource_List.place': 'scatter'})
  292. j.set_sleep_time(5)
  293. jid = self.server.submit(j)
  294. self.server.expect(JOB, ATTR_execvnode, id=jid, op=SET)
  295. nodes = j.get_vnodes(j.exec_vnode)
  296. self.assertTrue((nodes[0] == self.momA.shortname and
  297. nodes[1] == self.momB.shortname) or
  298. (nodes[0] == self.momB.shortname and
  299. nodes[1] == self.momA.shortname))
  300. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  301. # Current aoe on momA, should be set to the requested aoe in job.
  302. self.server.expect(NODE, {'current_aoe': 'App1'}, id=self.hostA)
  303. def test_sched_provisioning_response_with_runjob(self):
  304. """
  305. Test that if one provisioning job fails to run then scheduler
  306. correctly provides the node solution for the second job with aoe in
  307. it.
  308. """
  309. # Setup runjob hook.
  310. a = {'event': 'runjob', 'enabled': 'True'}
  311. rv = self.server.create_import_hook(
  312. 'reject_runjob_hook', a, self.reject_runjob_hook, overwrite=True)
  313. self.assertTrue(rv)
  314. # Set current aoe to App1
  315. self.server.manager(MGR_CMD_SET, NODE, id=self.hostA,
  316. attrib={'current_aoe': 'App1'}, expect=True)
  317. # Turn on scheduling
  318. self.server.manager(MGR_CMD_SET,
  319. SERVER, {'scheduling': 'False'})
  320. # submit two provisioning jobs
  321. a = {'Resource_List.select': '1:aoe=osimage1:ncpus=1+1:ncpus=4',
  322. 'Resource_List.place': 'vscatter'}
  323. j = Job(TEST_USER1, attrs=a)
  324. jid1 = self.server.submit(j)
  325. jid2 = self.server.submit(j)
  326. # Turn off scheduling
  327. self.server.manager(MGR_CMD_SET,
  328. SERVER, {'scheduling': 'True'})
  329. # Job will be rejected by runjob hook and it should log
  330. # correct exec_vnode for each job.
  331. msg = "job %s " + "solution (%s:aoe=osimage1:ncpus=1)+(%s:ncpus=4)"
  332. job1_msg = msg % (jid1, self.hostA, self.hostB)
  333. job2_msg = msg % (jid2, self.hostA, self.hostB)
  334. self.server.log_match(job1_msg)
  335. self.server.log_match(job2_msg)
  336. def test_sched_provisioning_response(self):
  337. """
  338. Test that if scheduler could not find node solution for one
  339. provisioning job then it will find the correct solution for the
  340. second one.
  341. """
  342. # Set current aoe to osimage1
  343. self.server.manager(MGR_CMD_SET, NODE, id=self.hostA,
  344. attrib={'current_aoe': 'osimage1'}, expect=True)
  345. # submit one job that will run on local node
  346. a = {'Resource_List.select': '1:ncpus=10'}
  347. j1 = Job(TEST_USER1, attrs=a)
  348. j1.set_sleep_time(200)
  349. jid1 = self.server.submit(j1)
  350. self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
  351. # Turn off scheduling
  352. self.server.manager(MGR_CMD_SET,
  353. SERVER, {'scheduling': 'False'})
  354. # submit two provisioning jobs where first job will not be able
  355. # to run and second one can
  356. a = {'Resource_List.select': '1:aoe=App1:ncpus=1+1:ncpus=3',
  357. 'Resource_List.place': 'vscatter'}
  358. j2 = Job(TEST_USER1, attrs=a)
  359. jid2 = self.server.submit(j2)
  360. a = {'Resource_List.select': '1:aoe=App1:ncpus=1+1:ncpus=2',
  361. 'Resource_List.place': 'vscatter'}
  362. j3 = Job(TEST_USER1, attrs=a)
  363. jid3 = self.server.submit(j3)
  364. # Turn on scheduling
  365. self.server.manager(MGR_CMD_SET,
  366. SERVER, {'scheduling': 'True'})
  367. ev_format = "(%s:aoe=App1:ncpus=1)+(%s:ncpus=2)"
  368. solution = ev_format % (self.hostA, self.hostB)
  369. self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
  370. self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
  371. job_state = self.server.status(JOB, id=jid3)
  372. self.assertEqual(job_state[0]['exec_vnode'], solution)