pbs_qstat_count.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. # coding: utf-8
  2. # Copyright (C) 1994-2018 Altair Engineering, Inc.
  3. # For more information, contact Altair at www.altair.com.
  4. #
  5. # This file is part of the PBS Professional ("PBS Pro") software.
  6. #
  7. # Open Source License Information:
  8. #
  9. # PBS Pro is free software. You can redistribute it and/or modify it under the
  10. # terms of the GNU Affero General Public License as published by the Free
  11. # Software Foundation, either version 3 of the License, or (at your option) any
  12. # later version.
  13. #
  14. # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
  15. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16. # FOR A PARTICULAR PURPOSE.
  17. # See the GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. #
  22. # Commercial License Information:
  23. #
  24. # For a copy of the commercial license terms and conditions,
  25. # go to: (http://www.pbspro.com/UserArea/agreement.html)
  26. # or contact the Altair Legal Department.
  27. #
  28. # Altair’s dual-license business model allows companies, individuals, and
  29. # organizations to create proprietary derivative works of PBS Pro and
  30. # distribute them - whether embedded or bundled with other software -
  31. # under a commercial license agreement.
  32. #
  33. # Use of Altair’s trademarks, including but not limited to "PBS™",
  34. # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
  35. # trademark licensing policies.
  36. from tests.functional import *
  37. class TestqstatStateCount(TestFunctional):
  38. def setUp(self):
  39. TestFunctional.setUp(self)
  40. # set ncpus to a known value, 2 here
  41. a = {'resources_available.ncpus': 2}
  42. self.server.manager(MGR_CMD_SET, NODE, a,
  43. self.mom.shortname, expect=True)
  44. def submit_waiting_job(self, timedelta):
  45. """
  46. Submit a job in W state using -a option.
  47. The time specified for -a is current time + timedelta.
  48. """
  49. attribs = {ATTR_a: BatchUtils().convert_seconds_to_datetime(
  50. int(time.time()) + timedelta)}
  51. j = Job(TEST_USER, attribs)
  52. jid = self.server.submit(j)
  53. self.server.expect(JOB, {'job_state': 'W'}, id=jid)
  54. return jid
  55. def find_state_counts(self):
  56. """
  57. From the output of qstat -Bf, parses the number of jobs in R, H, W
  58. and Q states and the value of total_jobs. Calculates the total number
  59. of jobs based on individual counts parsed. Returns these values in a
  60. dictionary.
  61. """
  62. counts = {}
  63. # Get output of qstat
  64. qstat = self.server.status(SERVER)
  65. state_count = qstat[0]['state_count'].split()
  66. all_state_count = 0
  67. for s in state_count:
  68. state = s.split(':')
  69. # Check for negative value
  70. self.assertGreaterEqual(
  71. int(state[1]), 0, 'state count has negative values')
  72. counts[state[0]] = int(state[1])
  73. all_state_count = all_state_count + int(state[1])
  74. counts['all_state_count'] = all_state_count
  75. counts['total_jobs'] = int(qstat[0]['total_jobs'])
  76. # Find queued count from output of qstat
  77. counts['expected_queued_count'] = (counts['total_jobs']
  78. - counts['Held']
  79. - counts['Waiting']
  80. - counts['Running'])
  81. return counts
  82. def verify_count(self):
  83. """
  84. The function does following checks based on output of qstat -Bf:
  85. 1. total_jobs should match the number of jobs submitted
  86. 2. queued_count should match total_jobs minus the number of jobs in
  87. state other than Q.
  88. (each job uses ncpus=1)
  89. """
  90. counts = self.find_state_counts()
  91. self.assertEqual(counts['total_jobs'],
  92. counts['all_state_count'], 'Job count incorrect')
  93. self.assertEqual(counts['expected_queued_count'], counts['Queued'],
  94. 'Queued count incorrect')
  95. def test_queued_no_restart(self):
  96. """
  97. The test case verifies that the reported queued_count in qstat -Bf
  98. without a server restart is equal to the total_jobs - number of jobs in
  99. state other than Q.
  100. (each job uses ncpus=1)
  101. """
  102. jid = []
  103. # submit 4 jobs to ensure some jobs are in state Q as available ncpus=2
  104. for _ in range(4):
  105. j = Job(TEST_USER)
  106. jid.append(self.server.submit(j))
  107. a = {ATTR_h: None}
  108. j = Job(TEST_USER, a)
  109. self.server.submit(j)
  110. self.submit_waiting_job(600)
  111. # Wait for jobs to go in R state
  112. self.server.expect(JOB, {'job_state': 'R'}, id=jid[0])
  113. self.server.expect(JOB, {'job_state': 'R'}, id=jid[1])
  114. self.verify_count()
  115. def test_queued_restart(self):
  116. """
  117. The test case verifies that the reported queued_count in qstat -Bf
  118. is equal to total_jobs - number of jobs in state other than Q,
  119. even after the server is restarted.
  120. (each job uses ncpus=1)
  121. """
  122. jid = []
  123. # submit 4 jobs to ensure some jobs are in state Q as available ncpus=2
  124. for _ in range(4):
  125. j = Job(TEST_USER)
  126. jid.append(self.server.submit(j))
  127. a = {ATTR_h: None}
  128. j = Job(TEST_USER, a)
  129. self.server.submit(j)
  130. self.submit_waiting_job(600)
  131. self.server.expect(JOB, {'job_state': 'R'}, id=jid[0])
  132. self.server.expect(JOB, {'job_state': 'R'}, id=jid[1])
  133. self.server.restart()
  134. self.verify_count()
  135. def test_queued_no_restart_multiple_queue(self):
  136. """
  137. The test case verifies that the queued_count reported in the output
  138. of qstat -Bf is equal to total_jobs - running jobs, without server
  139. restart.
  140. (each job uses ncpus=1)
  141. """
  142. # create 2 execution queues
  143. qname = ['workq1', 'workq2']
  144. for que in qname:
  145. a = {
  146. 'queue_type': 'Execution',
  147. 'enabled': 'True',
  148. 'started': 'True'}
  149. self.server.manager(MGR_CMD_CREATE, QUEUE,
  150. a, que, expect=True)
  151. q1_attr = {ATTR_queue: 'workq1'}
  152. q2_attr = {ATTR_queue: 'workq2'}
  153. # submit 1 job per queue to ensure a running job in each queue,
  154. # then submit 2 more jobs per queue i.e. overall 3 jobs in each queue
  155. j = Job(TEST_USER, q1_attr)
  156. jid = self.server.submit(j)
  157. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  158. j = Job(TEST_USER, q2_attr)
  159. jid = self.server.submit(j)
  160. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  161. for _ in range(2):
  162. j = Job(TEST_USER, q1_attr)
  163. self.server.submit(j)
  164. j = Job(TEST_USER, q2_attr)
  165. self.server.submit(j)
  166. self.verify_count()
  167. def test_queued_restart_multiple_queue(self):
  168. """
  169. The test case verifies that the queued_count reported in the output
  170. of qstat -Bf is equal to total_jobs - running jobs, even after the
  171. server is restart.
  172. (each job uses ncpus=1)
  173. """
  174. qname = ['workq1', 'workq2']
  175. for que in qname:
  176. a = {
  177. 'queue_type': 'Execution',
  178. 'enabled': 'True',
  179. 'started': 'True'}
  180. self.server.manager(MGR_CMD_CREATE, QUEUE,
  181. a, que, expect=True)
  182. q1_attr = {ATTR_queue: 'workq1'}
  183. q2_attr = {ATTR_queue: 'workq2'}
  184. # submit 1 job per queue to ensure a running job in each queue,
  185. # then submit 2 more jobs per queue i.e. overall 3 jobs in each queue
  186. j = Job(TEST_USER, q1_attr)
  187. jid = self.server.submit(j)
  188. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  189. j = Job(TEST_USER, q2_attr)
  190. jid = self.server.submit(j)
  191. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  192. for _ in range(2):
  193. j = Job(TEST_USER, q1_attr)
  194. self.server.submit(j)
  195. j = Job(TEST_USER, q2_attr)
  196. self.server.submit(j)
  197. self.server.restart()
  198. self.verify_count()
  199. def test_queued_sched_false(self):
  200. """
  201. This test case verifies that the value of queued_count in the output
  202. of qstat -Bf matches the number of jobs submitted (each using ncpus=1),
  203. as scheduling is set to False.
  204. """
  205. a = {'scheduling': 'False'}
  206. self.server.manager(MGR_CMD_SET, SERVER, a)
  207. for _ in range(4):
  208. j = Job(TEST_USER)
  209. self.server.submit(j)
  210. self.server.restart()
  211. self.verify_count()
  212. def test_wait_to_queued(self):
  213. """
  214. This test case verifies that when a job state changes from W to Q after
  215. server is restarted, the value of queued_count reported in the
  216. output of qstat -Bf is as expected.
  217. """
  218. a = {
  219. ATTR_stagein: 'inputData@' +
  220. self.server.hostname +
  221. ':' + os.path.join('noDir', 'nofile')}
  222. j = Job(TEST_USER, a)
  223. jid = self.server.submit(j)
  224. self.server.expect(JOB, {'job_state': 'W'}, id=jid,
  225. offset=30, interval=2)
  226. jid = self.submit_waiting_job(3)
  227. j = Job(TEST_USER)
  228. self.server.submit(j)
  229. j = Job(TEST_USER)
  230. self.server.submit(j)
  231. self.server.expect(JOB, {'job_state': 'Q'}, id=jid, offset=3)
  232. self.server.restart()
  233. self.verify_count()
  234. def test_job_state_count(self):
  235. """
  236. Testing if jobs in the 'W' state will cause
  237. the state_count to go negative or incorrect
  238. """
  239. # Failing stage-in operation, to put job into the waiting state
  240. a = {
  241. ATTR_stagein: 'inputData@' +
  242. self.server.hostname +
  243. ':/noDir/nofile'}
  244. j = Job(TEST_USER, a)
  245. jid = self.server.submit(j)
  246. self.server.expect(JOB, {'job_state': 'W'}, id=jid,
  247. offset=30, interval=2)
  248. # Restart server
  249. self.server.restart()
  250. self.verify_count()