pbs_preemptperformance.py 16 KB


  1. # coding: utf-8
  2. # Copyright (C) 1994-2018 Altair Engineering, Inc.
  3. # For more information, contact Altair at www.altair.com.
  4. #
  5. # This file is part of the PBS Professional ("PBS Pro") software.
  6. #
  7. # Open Source License Information:
  8. #
  9. # PBS Pro is free software. You can redistribute it and/or modify it under the
  10. # terms of the GNU Affero General Public License as published by the Free
  11. # Software Foundation, either version 3 of the License, or (at your option) any
  12. # later version.
  13. #
  14. # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
  15. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16. # FOR A PARTICULAR PURPOSE.
  17. # See the GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. #
  22. # Commercial License Information:
  23. #
  24. # For a copy of the commercial license terms and conditions,
  25. # go to: (http://www.pbspro.com/UserArea/agreement.html)
  26. # or contact the Altair Legal Department.
  27. #
  28. # Altair’s dual-license business model allows companies, individuals, and
  29. # organizations to create proprietary derivative works of PBS Pro and
  30. # distribute them - whether embedded or bundled with other software -
  31. # under a commercial license agreement.
  32. #
  33. # Use of Altair’s trademarks, including but not limited to "PBS™",
  34. # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
  35. # trademark licensing policies.
  36. from tests.performance import *
  37. class TestPreemptPerformance(TestPerformance):
  38. """
  39. Check the preemption performance
  40. """
  41. def setUp(self):
  42. TestPerformance.setUp(self)
  43. # set poll cycle to a high value because mom spends a lot of time
  44. # in gathering job's resources used. We don't need that in this test
  45. self.mom.add_config({'$min_check_poll': 7200, '$max_check_poll': 9600})
  46. def create_workload_and_preempt(self):
  47. a = {
  48. 'queue_type': 'execution',
  49. 'started': 'True',
  50. 'enabled': 'True'
  51. }
  52. self.server.manager(MGR_CMD_CREATE, QUEUE, a, 'workq2')
  53. a = {'max_run_res_soft.ncpus': "[u:PBS_GENERIC=2]"}
  54. self.server.manager(MGR_CMD_SET, QUEUE, a, 'workq', expect=True)
  55. a = {'max_run_res.mem': "[u:" + str(TEST_USER) + "=1500mb]"}
  56. self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
  57. a = {'Resource_List.select': '1:ncpus=3:mem=90mb',
  58. 'Resource_List.walltime': 9999}
  59. for _ in range(8):
  60. j = Job(TEST_USER, attrs=a)
  61. j.set_sleep_time(9999)
  62. self.server.submit(j)
  63. for _ in range(7):
  64. j = Job(TEST_USER1, attrs=a)
  65. j.set_sleep_time(9999)
  66. self.server.submit(j)
  67. sched_off = {'scheduling': 'False'}
  68. self.server.manager(MGR_CMD_SET, SERVER, sched_off, expect=True)
  69. a = {'Resource_List.select': '1:ncpus=3',
  70. 'Resource_List.walltime': 9999}
  71. for _ in range(775):
  72. j = Job(TEST_USER, attrs=a)
  73. j.set_sleep_time(9999)
  74. self.server.submit(j)
  75. for _ in range(800):
  76. j = Job(TEST_USER1, attrs=a)
  77. j.set_sleep_time(9999)
  78. self.server.submit(j)
  79. sched_on = {'scheduling': 'True'}
  80. self.server.manager(MGR_CMD_SET, SERVER, sched_on, expect=True)
  81. self.server.expect(JOB, {'substate=42': 1590},
  82. offset=15, interval=20)
  83. a = {'Resource_List.select': '1:ncpus=90:mem=1350mb',
  84. 'Resource_List.walltime': 9999, ATTR_queue: 'workq2'}
  85. j1 = Job(TEST_USER, attrs=a)
  86. j1.set_sleep_time(9999)
  87. j1id = self.server.submit(j1)
  88. self.server.expect(JOB, {'job_state': 'R'}, id=j1id,
  89. offset=15, interval=5)
  90. self.server.expect(JOB, {'job_state=S': 20}, interval=5)
  91. (_, str1) = self.scheduler.log_match(j1id + ";Considering job to run",
  92. id=j1id, n='ALL',
  93. max_attempts=1, interval=2)
  94. (_, str2) = self.scheduler.log_match(j1id + ";Job run",
  95. id=j1id, n='ALL',
  96. max_attempts=1, interval=2)
  97. date_time1 = str1.split(";")[0]
  98. date_time2 = str2.split(";")[0]
  99. epoch1 = int(time.mktime(time.strptime(
  100. date_time1, '%m/%d/%Y %H:%M:%S')))
  101. epoch2 = int(time.mktime(time.strptime(
  102. date_time2, '%m/%d/%Y %H:%M:%S')))
  103. time_diff = epoch2 - epoch1
  104. self.logger.info('#' * 80)
  105. self.logger.info('#' * 80)
  106. res_str = "RESULT: THE TIME TAKEN IS : " + str(time_diff) + " SECONDS"
  107. self.logger.info(res_str)
  108. self.logger.info('#' * 80)
  109. self.logger.info('#' * 80)
  110. @timeout(3600)
  111. @tags('sched', 'scheduling_policy')
  112. def test_preemption_with_limits(self):
  113. """
  114. Measure the time scheduler takes to preempt when the high priority
  115. job hits soft/hard limits under a considerable amount of workload.
  116. """
  117. a = {'resources_available.ncpus': 4800,
  118. 'resources_available.mem': '2800mb'}
  119. self.server.create_vnodes('vn', a, 1, self.mom, usenatvnode=True)
  120. p = '"express_queue, normal_jobs, server_softlimits, queue_softlimits"'
  121. self.scheduler.set_sched_config({'preempt_prio': p})
  122. self.create_workload_and_preempt()
  123. @timeout(3600)
  124. @tags('sched', 'scheduling_policy')
  125. def test_preemption_with_insufficient_resc(self):
  126. """
  127. Measure the time scheduler takes to preempt when the high priority
  128. job hits soft/hard limits and there is scarcity of resources
  129. under a considerable amount of workload.
  130. """
  131. a = {'resources_available.ncpus': 4800,
  132. 'resources_available.mem': '1500mb'}
  133. self.server.create_vnodes('vn', a, 1, self.mom, usenatvnode=True)
  134. p = '"express_queue, normal_jobs, server_softlimits, queue_softlimits"'
  135. self.scheduler.set_sched_config({'preempt_prio': p})
  136. self.create_workload_and_preempt()
  137. @timeout(3600)
  138. @tags('sched', 'scheduling_policy')
  139. def test_insufficient_resc_non_cons(self):
  140. """
  141. Submit a number of low priority job and then submit a high priority
  142. job that needs a non-consumable resource which is assigned to last
  143. running job. This will make scheduler go through all running jobs
  144. to find the preemptable job.
  145. """
  146. a = {'type': 'string', 'flag': 'h'}
  147. self.server.manager(MGR_CMD_CREATE, RSC, a, id='qlist')
  148. a = {ATTR_rescavail + ".qlist": "list1",
  149. ATTR_rescavail + ".ncpus": "8"}
  150. self.server.create_vnodes(
  151. "vn1", a, 400, self.mom, additive=True, fname="vnodedef1")
  152. a = {ATTR_rescavail + ".qlist": "list2",
  153. ATTR_rescavail + ".ncpus": "1"}
  154. self.server.create_vnodes(
  155. "vn2", a, 1, self.mom, additive=True, fname="vnodedef2")
  156. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'})
  157. a = {ATTR_l + '.select': '1:ncpus=1:qlist=list1'}
  158. for _ in range(3200):
  159. j = Job(TEST_USER, attrs=a)
  160. j.set_sleep_time(3000)
  161. self.server.submit(j)
  162. a = {ATTR_l + '.select': '1:ncpus=1:qlist=list2'}
  163. j = Job(TEST_USER, attrs=a)
  164. j.set_sleep_time(3000)
  165. # Add qlist to the resources scheduler checks for
  166. self.scheduler.add_resource('qlist')
  167. self.scheduler.unset_sched_config('preempt_sort')
  168. jid = self.server.submit(j)
  169. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  170. self.server.expect(JOB, {'substate=42': 3201}, interval=20,
  171. offset=15)
  172. qname = 'highp'
  173. a = {'queue_type': 'execution', 'priority': '200',
  174. 'started': 'True', 'enabled': 'True'}
  175. self.server.manager(MGR_CMD_CREATE, QUEUE, a, qname)
  176. a = {ATTR_l + '.select': '1:ncpus=1:qlist=list2',
  177. ATTR_q: 'highp'}
  178. j = Job(TEST_USER, attrs=a)
  179. j.set_sleep_time(3000)
  180. jid_highp = self.server.submit(j)
  181. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid_highp, interval=10)
  182. self.server.expect(JOB, {ATTR_state: 'S'}, id=jid)
  183. search_str = jid_highp + ";Considering job to run"
  184. (_, str1) = self.scheduler.log_match(search_str,
  185. id=jid_highp, n='ALL',
  186. max_attempts=1, interval=2)
  187. search_str = jid_highp + ";Job run"
  188. (_, str2) = self.scheduler.log_match(search_str,
  189. id=jid_highp, n='ALL',
  190. max_attempts=1, interval=2)
  191. date_time1 = str1.split(";")[0]
  192. date_time2 = str2.split(";")[0]
  193. epoch1 = int(time.mktime(time.strptime(
  194. date_time1, '%m/%d/%Y %H:%M:%S')))
  195. epoch2 = int(time.mktime(time.strptime(
  196. date_time2, '%m/%d/%Y %H:%M:%S')))
  197. time_diff = epoch2 - epoch1
  198. self.logger.info('#' * 80)
  199. self.logger.info('#' * 80)
  200. res_str = "RESULT: PREEMPTION TOOK: " + str(time_diff) + " SECONDS"
  201. self.logger.info(res_str)
  202. self.logger.info('#' * 80)
  203. self.logger.info('#' * 80)
  204. @timeout(3600)
  205. @tags('sched', 'scheduling_policy')
  206. def test_insufficient_resc_multiple_non_cons(self):
  207. """
  208. Submit a number of low priority jobs and then submit a high priority
  209. job that needs a non-consumable resource in 2 chunks. These resources
  210. are assigned to last two running jobs. This will make scheduler go
  211. through all running jobs to find preemptable jobs.
  212. """
  213. a = {'type': 'string', 'flag': 'h'}
  214. self.server.manager(MGR_CMD_CREATE, RSC, a, id='qlist')
  215. a = {ATTR_rescavail + ".qlist": "list1",
  216. ATTR_rescavail + ".ncpus": "8"}
  217. self.server.create_vnodes(
  218. "vn1", a, 400, self.mom, additive=True, fname="vnodedef1")
  219. a = {ATTR_rescavail + ".qlist": "list2",
  220. ATTR_rescavail + ".ncpus": "1"}
  221. self.server.create_vnodes(
  222. "vn2", a, 1, self.mom, additive=True, fname="vnodedef2")
  223. a = {ATTR_rescavail + ".qlist": "list3",
  224. ATTR_rescavail + ".ncpus": "1"}
  225. self.server.create_vnodes(
  226. "vn3", a, 1, self.mom, additive=True, fname="vnodedef3")
  227. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'})
  228. a = {ATTR_l + '.select': '1:ncpus=1:qlist=list1'}
  229. for _ in range(3200):
  230. j = Job(TEST_USER, attrs=a)
  231. j.set_sleep_time(3000)
  232. self.server.submit(j)
  233. a = {ATTR_l + '.select': '1:ncpus=1:qlist=list2'}
  234. j = Job(TEST_USER, attrs=a)
  235. j.set_sleep_time(3000)
  236. b = {ATTR_l + '.select': '1:ncpus=1:qlist=list3'}
  237. j2 = Job(TEST_USER, attrs=b)
  238. j2.set_sleep_time(3000)
  239. # Add qlist to the resources scheduler checks for
  240. self.scheduler.add_resource('qlist')
  241. self.scheduler.unset_sched_config('preempt_sort')
  242. jid = self.server.submit(j)
  243. jid2 = self.server.submit(j2)
  244. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  245. self.server.expect(JOB, {'substate=42': 3202}, interval=20,
  246. offset=15)
  247. qname = 'highp'
  248. a = {'queue_type': 'execution', 'priority': '200',
  249. 'started': 'True', 'enabled': 'True'}
  250. self.server.manager(MGR_CMD_CREATE, QUEUE, a, qname)
  251. a = {ATTR_l + '.select': '1:ncpus=1:qlist=list2+1:ncpus=1:qlist=list3',
  252. ATTR_q: 'highp'}
  253. j = Job(TEST_USER, attrs=a)
  254. j.set_sleep_time(3000)
  255. jid_highp = self.server.submit(j)
  256. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid_highp, interval=10)
  257. self.server.expect(JOB, {ATTR_state: 'S'}, id=jid)
  258. self.server.expect(JOB, {ATTR_state: 'S'}, id=jid2)
  259. search_str = jid_highp + ";Considering job to run"
  260. (_, str1) = self.scheduler.log_match(search_str,
  261. id=jid_highp, n='ALL',
  262. max_attempts=1, interval=2)
  263. search_str = jid_highp + ";Job run"
  264. (_, str2) = self.scheduler.log_match(search_str,
  265. id=jid_highp, n='ALL',
  266. max_attempts=1, interval=2)
  267. date_time1 = str1.split(";")[0]
  268. date_time2 = str2.split(";")[0]
  269. epoch1 = int(time.mktime(time.strptime(
  270. date_time1, '%m/%d/%Y %H:%M:%S')))
  271. epoch2 = int(time.mktime(time.strptime(
  272. date_time2, '%m/%d/%Y %H:%M:%S')))
  273. time_diff = epoch2 - epoch1
  274. self.logger.info('#' * 80)
  275. self.logger.info('#' * 80)
  276. res_str = "RESULT: PREEMPTION TOOK: " + str(time_diff) + " SECONDS"
  277. self.logger.info(res_str)
  278. self.logger.info('#' * 80)
  279. self.logger.info('#' * 80)
  280. @timeout(3600)
  281. @tags('sched', 'scheduling_policy')
  282. def test_insufficient_server_resc(self):
  283. """
  284. Submit a number of low priority jobs and then make the last low
  285. priority job to consume some server level resources. Submit a
  286. high priority job that request for this server level resource
  287. and measure the time it takes for preemption.
  288. """
  289. a = {'type': 'long', 'flag': 'q'}
  290. self.server.manager(MGR_CMD_CREATE, RSC, a, id='foo')
  291. a = {ATTR_rescavail + ".ncpus": "8"}
  292. self.server.create_vnodes(
  293. "vn1", a, 401, self.mom, additive=True, fname="vnodedef1")
  294. # Make resource foo available on server
  295. a = {ATTR_rescavail + ".foo": 50, 'scheduling': 'False'}
  296. self.server.manager(MGR_CMD_SET, SERVER, a)
  297. a = {ATTR_l + '.select': '1:ncpus=1'}
  298. for _ in range(3200):
  299. j = Job(TEST_USER, attrs=a)
  300. j.set_sleep_time(3000)
  301. self.server.submit(j)
  302. # Add foo to the resources scheduler checks for
  303. self.scheduler.add_resource('foo')
  304. self.scheduler.unset_sched_config('preempt_sort')
  305. a = {ATTR_l + '.select': '1:ncpus=1', ATTR_l + '.foo': 25}
  306. j = Job(TEST_USER, attrs=a)
  307. j.set_sleep_time(3000)
  308. jid = self.server.submit(j)
  309. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'True'})
  310. self.server.expect(JOB, {'substate=42': 3201}, interval=20,
  311. offset=15)
  312. qname = 'highp'
  313. a = {'queue_type': 'execution', 'priority': '200',
  314. 'started': 'True', 'enabled': 'True'}
  315. self.server.manager(MGR_CMD_CREATE, QUEUE, a, qname)
  316. a = {ATTR_l + '.select': '1:ncpus=1', ATTR_l + '.foo': 50,
  317. ATTR_q: 'highp'}
  318. j2 = Job(TEST_USER, attrs=a)
  319. j2.set_sleep_time(3000)
  320. jid_highp = self.server.submit(j2)
  321. self.server.expect(JOB, {ATTR_state: 'R'}, id=jid_highp, interval=10)
  322. self.server.expect(JOB, {ATTR_state: 'S'}, id=jid)
  323. search_str = jid_highp + ";Considering job to run"
  324. (_, str1) = self.scheduler.log_match(search_str,
  325. id=jid_highp, n='ALL',
  326. max_attempts=1, interval=2)
  327. search_str = jid_highp + ";Job run"
  328. (_, str2) = self.scheduler.log_match(search_str,
  329. id=jid_highp, n='ALL',
  330. max_attempts=1, interval=2)
  331. date_time1 = str1.split(";")[0]
  332. date_time2 = str2.split(";")[0]
  333. epoch1 = int(time.mktime(time.strptime(
  334. date_time1, '%m/%d/%Y %H:%M:%S')))
  335. epoch2 = int(time.mktime(time.strptime(
  336. date_time2, '%m/%d/%Y %H:%M:%S')))
  337. time_diff = epoch2 - epoch1
  338. self.logger.info('#' * 80)
  339. self.logger.info('#' * 80)
  340. res_str = "RESULT: PREEMPTION TOOK: " + str(time_diff) + " SECONDS"
  341. self.logger.info(res_str)
  342. self.logger.info('#' * 80)
  343. self.logger.info('#' * 80)
  344. def tearDown(self):
  345. self.server.manager(MGR_CMD_SET, SERVER, {'scheduling': 'False'})
  346. job_ids = self.server.select()
  347. self.server.delete(id=job_ids)