pbs_trillion_jobid.py 17 KB


  1. # coding: utf-8
  2. # Copyright (C) 1994-2018 Altair Engineering, Inc.
  3. # For more information, contact Altair at www.altair.com.
  4. #
  5. # This file is part of the PBS Professional ("PBS Pro") software.
  6. #
  7. # Open Source License Information:
  8. #
  9. # PBS Pro is free software. You can redistribute it and/or modify it under the
  10. # terms of the GNU Affero General Public License as published by the Free
  11. # Software Foundation, either version 3 of the License, or (at your option) any
  12. # later version.
  13. #
  14. # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
  15. # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  16. # FOR A PARTICULAR PURPOSE.
  17. # See the GNU Affero General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU Affero General Public License
  20. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  21. #
  22. # Commercial License Information:
  23. #
  24. # For a copy of the commercial license terms and conditions,
  25. # go to: (http://www.pbspro.com/UserArea/agreement.html)
  26. # or contact the Altair Legal Department.
  27. #
  28. # Altair’s dual-license business model allows companies, individuals, and
  29. # organizations to create proprietary derivative works of PBS Pro and
  30. # distribute them - whether embedded or bundled with other software -
  31. # under a commercial license agreement.
  32. #
  33. # Use of Altair’s trademarks, including but not limited to "PBS™",
  34. # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
  35. # trademark licensing policies.
  36. from tests.functional import *
  37. class TestTrillionJobid(TestFunctional):
  38. """
  39. This test suite tests the Trillion Job ID and sequence jobid
  40. """
  41. update_svr_db_script = """#!/bin/bash
  42. . %s
  43. . ${PBS_EXEC}/libexec/pbs_pgsql_env.sh
  44. DATA_PORT=${PBS_DATA_SERVICE_PORT}
  45. if [ -z $DATA_PORT ]; then
  46. DATA_PORT=15007
  47. fi
  48. sudo ls ${PBS_HOME}/server_priv/db_user &>/dev/null
  49. if [ $? -eq 0 ]; then
  50. DATA_USER=`sudo cat ${PBS_HOME}/server_priv/db_user`
  51. if [ $? -ne 0 ]; then
  52. exit 1
  53. fi
  54. fi
  55. sudo ${PBS_EXEC}/sbin/pbs_ds_password test
  56. if [ $? -eq 0 ]; then
  57. sudo ${PBS_EXEC}/sbin/pbs_dataservice stop
  58. if [ $? -ne 0 ]; then
  59. exit 1
  60. fi
  61. fi
  62. sudo ${PBS_EXEC}/sbin/pbs_dataservice status
  63. if [ $? -eq 0 ]; then
  64. sudo ${PBS_EXEC}/sbin/pbs_dataservice stop
  65. if [ $? -ne 0 ]; then
  66. exit 1
  67. fi
  68. fi
  69. sudo ${PBS_EXEC}/sbin/pbs_dataservice start
  70. if [ $? -ne 0 ]; then
  71. exit 1
  72. fi
  73. args="-U ${DATA_USER} -p ${DATA_PORT} -d pbs_datastore"
  74. PGPASSWORD=test psql ${args} <<-EOF
  75. UPDATE pbs.server SET sv_jobidnumber = %d;
  76. EOF
  77. ret=$?
  78. if [ $ret -eq 0 ]; then
  79. echo "Server sv_jobidnumber attribute has been updated successfully"
  80. fi
  81. sudo ${PBS_EXEC}/sbin/pbs_dataservice stop
  82. if [ $? -ne 0 ]; then
  83. exit 1
  84. fi
  85. exit 0
  86. """
  87. def set_svr_sv_jobidnumber(self, num=0):
  88. """
  89. This function is to set the next jobid into server database
  90. """
  91. # Stop the PBS server
  92. self.server.stop()
  93. stop_msg = 'Failed to stop PBS'
  94. self.assertFalse(self.server.isUp(), stop_msg)
  95. # Create a shell script file and update the database
  96. conf_path = self.du.get_pbs_conf_file()
  97. fn = self.du.create_temp_file(
  98. body=self.update_svr_db_script %
  99. (conf_path, num))
  100. self.du.chmod(path=fn, mode=0o755)
  101. fail_msg = 'Failed to set sequence id in database'
  102. ret = self.du.run_cmd(cmd=fn)
  103. self.assertEqual(ret['rc'], 0, fail_msg)
  104. # Start the PBS server
  105. start_msg = 'Failed to restart PBS'
  106. self.server.start()
  107. self.assertTrue(self.server.isUp(), start_msg)
  108. def stop_and_restart_svr(self, restart_type):
  109. """
  110. Abruptly/Gracefully stop and restart the server
  111. """
  112. try:
  113. if(restart_type == 'kill'):
  114. self.server.stop('-KILL')
  115. else:
  116. self.server.stop()
  117. except PbsServiceError as e:
  118. # The server failed to stop
  119. raise self.failureException("Server failed to stop:" + e.msg)
  120. try:
  121. self.server.start()
  122. except PbsServiceError as e:
  123. # The server failed to start
  124. raise self.failureException("Server failed to start:" + e.msg)
  125. restart_msg = 'Failed to restart PBS'
  126. self.assertTrue(self.server.isUp(), restart_msg)
  127. def submit_job(self, sleep=10, lower=0,
  128. upper=0, job_id=None, job_msg=None, verify=False):
  129. """
  130. Helper method to submit a normal/array job
  131. and also checks the R state and particular jobid if success,
  132. else log the error message
  133. :param sleep : Sleep time in seconds for the job
  134. :type sleep : int
  135. :param lower : Lower limit for the array job
  136. :type lower : int
  137. :param upper : Upper limit for the array job
  138. :type upper : int
  139. :param job_id : Expected jobid upon submission
  140. :type job_id : string
  141. :param job_msg : Expected message upon submission failure
  142. :type job_msg : int
  143. :param verify : Checks Job status R
  144. :type verify : boolean(True/False)
  145. """
  146. arr_flag = False
  147. j = Job(TEST_USER)
  148. if((lower >= 0) and (upper > lower)):
  149. j.set_attributes({ATTR_J: '%d-%d' % (lower, upper)})
  150. arr_flag = True
  151. total_jobs = upper - lower + 1
  152. j.set_sleep_time(sleep)
  153. try:
  154. jid = self.server.submit(j)
  155. if job_id is not None:
  156. self.assertEqual(jid.split('.')[0], job_id)
  157. if arr_flag:
  158. if verify:
  159. self.server.expect(JOB, {'job_state': 'B'}, id=jid)
  160. self.server.expect(
  161. JOB,
  162. {'job_state=R': total_jobs},
  163. count=True, id=jid, extend='t')
  164. else:
  165. if verify:
  166. self.server.expect(JOB, {'job_state': 'R'}, id=jid)
  167. except PbsSubmitError as e:
  168. if job_msg is not None:
  169. # if JobId already exist
  170. self.assertEqual(e.msg[0], job_msg)
  171. else:
  172. # Unable to submit Job
  173. self.logger.info('Error in submitting job:', e.msg)
  174. def submit_resv(self, resv_dur=2, resv_id=None, resv_msg=None):
  175. """
  176. Helper method to submit a reservation and checks the
  177. reservation id if success, else log the error message.
  178. :param resv_dur : Reservation duration in seconds
  179. :type resv_dur : int
  180. :param resv_id : Expected resvid upon submission
  181. :type resv_id : string
  182. :param resv_msg : Expected message upon reservation failure
  183. :type resv_msg : string
  184. """
  185. resv_start = int(time.time())
  186. a = {'reserve_start': int(resv_start),
  187. 'reserve_duration': int(resv_dur)
  188. }
  189. r = Reservation(TEST_USER, attrs=a)
  190. try:
  191. rid = self.server.submit(r)
  192. if resv_id is not None:
  193. self.assertEqual(rid.split('.')[0], resv_id)
  194. except PbsSubmitError as e:
  195. if resv_msg is not None:
  196. # if ResvId already exist
  197. self.assertEqual(e.msg[0], resv_msg)
  198. else:
  199. # Unable to submit reservation
  200. self.logger.info('Error in submitting reservation:', e.msg)
  201. def test_set_unset_max_job_sequence_id(self):
  202. """
  203. Set/Unset max_job_sequence_id attribute and
  204. also verify the attribute value after server qterm/kill
  205. """
  206. # Set as Non-admin user
  207. seq_id = {ATTR_max_job_sequence_id: 123456789}
  208. try:
  209. self.server.manager(MGR_CMD_SET, SERVER, seq_id, runas=TEST_USER1)
  210. except PbsManagerError as e:
  211. self.assertTrue('Unauthorized Request' in e.msg[0])
  212. # Set as Admin User and also check the value after server restart
  213. self.server.manager(
  214. MGR_CMD_SET,
  215. SERVER,
  216. seq_id,
  217. runas=ROOT_USER,
  218. expect=True)
  219. self.server.expect(SERVER, seq_id)
  220. self.server.log_match('svr_max_job_sequence_id set to '
  221. 'val %d' % (seq_id[ATTR_max_job_sequence_id]),
  222. starttime=self.server.ctime)
  223. # Abruptly kill the server
  224. self.stop_and_restart_svr('kill')
  225. self.server.expect(SERVER, seq_id)
  226. # Gracefully stop the server
  227. self.stop_and_restart_svr('normal')
  228. self.server.expect(SERVER, seq_id)
  229. # Unset as Non-admin user
  230. try:
  231. self.server.manager(
  232. MGR_CMD_UNSET,
  233. SERVER,
  234. 'max_job_sequence_id',
  235. runas=TEST_USER1)
  236. except PbsManagerError as e:
  237. self.assertTrue('Unauthorized Request' in e.msg[0])
  238. # Unset as Admin user
  239. self.server.manager(
  240. MGR_CMD_UNSET,
  241. SERVER,
  242. 'max_job_sequence_id',
  243. runas=ROOT_USER,
  244. expect=True)
  245. self.server.log_match('svr_max_job_sequence_id reverting back '
  246. 'to default val 9999999',
  247. starttime=self.server.ctime)
  248. def test_max_job_sequence_id_values(self):
  249. """
  250. Test to check valid/invalid values for the
  251. max_job_sequence_id server attribute
  252. """
  253. # Invalid Values
  254. invalid_values = [-9999999, '*456879846',
  255. 23545.45, 'ajndd', '**45', 'asgh456']
  256. for val in invalid_values:
  257. try:
  258. seq_id = {ATTR_max_job_sequence_id: val}
  259. self.server.manager(
  260. MGR_CMD_SET, SERVER, seq_id, runas=ROOT_USER)
  261. except PbsManagerError as e:
  262. self.assertTrue(
  263. 'Illegal attribute or resource value' in e.msg[0])
  264. # Less than or Greater than the attribute limit
  265. min_max_values = [120515, 999999, 1234567891234, 9999999999999]
  266. for val in min_max_values:
  267. try:
  268. seq_id = {ATTR_max_job_sequence_id: val}
  269. self.server.manager(
  270. MGR_CMD_SET, SERVER, seq_id, runas=ROOT_USER)
  271. except PbsManagerError as e:
  272. self.assertTrue('Cannot set max_job_sequence_id < 9999999, '
  273. 'or > 999999999999' in e.msg[0])
  274. # Valid values
  275. valid_values = [9999999, 123456789, 100000000000, 999999999999]
  276. for val in valid_values:
  277. seq_id = {ATTR_max_job_sequence_id: val}
  278. self.server.manager(
  279. MGR_CMD_SET,
  280. SERVER,
  281. seq_id,
  282. runas=ROOT_USER,
  283. expect=True)
  284. def test_max_job_sequence_id_wrap(self):
  285. """
  286. Test to check the jobid's/resvid's are wrapping it to zero or not,
  287. after reaching to the given limit
  288. """
  289. # Check default limit(9999999) and wrap it 0
  290. a = {'resources_available.ncpus': 20}
  291. self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
  292. self.submit_job(verify=True)
  293. self.submit_job(lower=1, upper=2, verify=True)
  294. self.submit_resv()
  295. sv_jobidnumber = 9999999 # default
  296. self.set_svr_sv_jobidnumber(sv_jobidnumber)
  297. self.submit_job(job_id='%s' % (sv_jobidnumber), verify=True)
  298. self.submit_job(lower=1, upper=2, job_id='0[]',
  299. verify=True) # wrap it
  300. self.submit_resv(resv_id='R1')
  301. # Check max limit (999999999999) and wrap it 0
  302. sv_jobidnumber = 999999999999 # max limit
  303. seq_id = {ATTR_max_job_sequence_id: sv_jobidnumber}
  304. self.server.manager(
  305. MGR_CMD_SET,
  306. SERVER,
  307. seq_id,
  308. runas=ROOT_USER,
  309. expect=True)
  310. self.server.expect(SERVER, seq_id)
  311. self.submit_job(verify=True)
  312. self.submit_job(lower=1, upper=2, verify=True)
  313. self.submit_resv()
  314. self.set_svr_sv_jobidnumber(sv_jobidnumber)
  315. self.submit_job(job_id='%s' % (sv_jobidnumber), verify=True)
  316. self.submit_job(lower=1, upper=2, job_id='0[]',
  317. verify=True) # wrap it
  318. self.submit_resv(resv_id='R1')
  319. # Someone set the max_job_sequence_id less than current jobid then also
  320. # wrap it 0
  321. sv_jobidnumber = 1234567890
  322. seq_id = {ATTR_max_job_sequence_id: sv_jobidnumber}
  323. self.server.manager(
  324. MGR_CMD_SET,
  325. SERVER,
  326. seq_id,
  327. runas=ROOT_USER,
  328. expect=True)
  329. self.server.expect(SERVER, seq_id)
  330. sv_jobidnumber = 123456789
  331. self.set_svr_sv_jobidnumber(sv_jobidnumber)
  332. self.submit_job(job_id='%s' % (sv_jobidnumber), verify=True)
  333. self.submit_job(lower=1, upper=2, job_id='123456790[]', verify=True)
  334. self.submit_resv(resv_id='R123456791')
  335. # Set smaller(12345678) than current jobid(123456790)
  336. sv_jobidnumber = 12345678
  337. seq_id = {ATTR_max_job_sequence_id: sv_jobidnumber}
  338. self.server.manager(
  339. MGR_CMD_SET,
  340. SERVER,
  341. seq_id,
  342. runas=ROOT_USER,
  343. expect=True)
  344. self.server.expect(SERVER, seq_id)
  345. self.submit_job(job_id='0', verify=True) # wrap it to zero
  346. self.submit_job(lower=1, upper=2, job_id='1[]', verify=True)
  347. self.submit_resv(resv_id='R2')
  348. def test_verify_sequence_window(self):
  349. """
  350. Tests the sequence window scenario in which jobid
  351. number save to the database once in a 1000 time
  352. """
  353. # Abruptly kill the server so next jobid should be 1000 after server
  354. # start
  355. self.set_svr_sv_jobidnumber(0)
  356. self.submit_job(job_id='0')
  357. self.submit_job(lower=1, upper=2, job_id='1[]')
  358. self.submit_resv(resv_id='R2')
  359. # kill the server forcefully
  360. self.stop_and_restart_svr('kill')
  361. self.submit_job(job_id='1000')
  362. self.submit_job(lower=1, upper=2, job_id='1001[]')
  363. self.submit_resv(resv_id='R1002')
  364. # if server gets killed again abruptly then next jobid would be 2000
  365. self.stop_and_restart_svr('kill')
  366. self.submit_job(job_id='2000')
  367. self.submit_job(lower=1, upper=2, job_id='2001[]')
  368. self.submit_resv(resv_id='R2002')
  369. # Gracefully stop the server so jobid's will continue from the last
  370. # jobid
  371. self.stop_and_restart_svr('normal')
  372. self.submit_job(job_id='2003')
  373. self.submit_job(lower=1, upper=2, job_id='2004[]')
  374. self.submit_resv(resv_id='R2005')
  375. # Verify the sequence window, incase of submitting more than 1001 jobs
  376. # and all jobs should submit successfully without any duplication error
  377. for _ in xrange(1010):
  378. j = Job(TEST_USER)
  379. self.server.submit(j)
  380. def test_jobid_duplication(self):
  381. """
  382. Tests the JobId/ResvId duplication after wrap
  383. Job/Resv shouldn't submit because previous
  384. jobs with the same id's are still running
  385. """
  386. seq_id = {ATTR_max_job_sequence_id: 99999999}
  387. self.server.manager(
  388. MGR_CMD_SET,
  389. SERVER,
  390. seq_id,
  391. runas=ROOT_USER,
  392. expect=True)
  393. self.set_svr_sv_jobidnumber(0)
  394. self.submit_job(sleep=1000, job_id='0')
  395. self.submit_job(sleep=1000, lower=1, upper=2, job_id='1[]')
  396. self.submit_resv(resv_dur=300, resv_id='R2')
  397. sv_jobidnumber = 99999999
  398. self.set_svr_sv_jobidnumber(sv_jobidnumber)
  399. self.submit_job(sleep=1000, job_id='%s' % (sv_jobidnumber))
  400. # Now job/resv shouldn't submit because same id's are already occupied
  401. msg = "qsub: Job with requested ID already exists"
  402. self.submit_job(job_msg=msg)
  403. self.submit_job(lower=1, upper=2, job_msg=msg)
  404. msg = 'pbs_rsub: Reservation with '\
  405. 'requested ID already exists'
  406. self.submit_resv(resv_msg=msg)
  407. # Job should submit successfully because all existing id's has been
  408. # passed
  409. self.submit_job(lower=1, upper=2, job_id='3[]')
  410. def test_jobid_resvid_after_multiple_restart(self):
  411. """
  412. Test to check the Jobid/Resvid should not wrap to 0 during
  413. server restart multiple times consecutively either gracefully/abruptly
  414. """
  415. j = Job(TEST_USER)
  416. jid = self.server.submit(j)
  417. curr_id = int(jid.split('.')[0])
  418. self.submit_job(job_id='%s' % str(curr_id + 1))
  419. self.submit_job(lower=1, upper=2, job_id='%s[]' % str(curr_id + 2))
  420. self.submit_resv(resv_id='R%s' % str(curr_id + 3))
  421. # Gracefully stop and start the server twice consecutively
  422. self.stop_and_restart_svr('normal')
  423. self.stop_and_restart_svr('normal')
  424. self.submit_job(job_id='%s' % str(curr_id + 4))
  425. self.submit_job(lower=1, upper=2, job_id='%s[]' % str(curr_id + 5))
  426. self.submit_resv(resv_id='R%s' % str(curr_id + 6))
  427. # Abruptly kill and start the server twice consecutively
  428. self.stop_and_restart_svr('kill')
  429. self.stop_and_restart_svr('kill')
  430. # Adding 1000 in current jobid for the sequence window buffer and
  431. # 4 for the jobs that ran already after server start
  432. curr_id += 1000 + 4
  433. self.submit_job(job_id='%s' % str(curr_id))
  434. self.submit_job(lower=1, upper=2, job_id='%s[]' % str(curr_id + 1))
  435. self.submit_resv(resv_id='R%s' % str(curr_id + 2))
  436. def tearDown(self):
  437. self.server.cleanup_jobs()
  438. TestFunctional.tearDown(self)