123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484 |
- # coding: utf-8
- # Copyright (C) 1994-2018 Altair Engineering, Inc.
- # For more information, contact Altair at www.altair.com.
- #
- # This file is part of the PBS Professional ("PBS Pro") software.
- #
- # Open Source License Information:
- #
- # PBS Pro is free software. You can redistribute it and/or modify it under the
- # terms of the GNU Affero General Public License as published by the Free
- # Software Foundation, either version 3 of the License, or (at your option) any
- # later version.
- #
- # PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
- # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- # FOR A PARTICULAR PURPOSE.
- # See the GNU Affero General Public License for more details.
- #
- # You should have received a copy of the GNU Affero General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
- #
- # Commercial License Information:
- #
- # For a copy of the commercial license terms and conditions,
- # go to: (http://www.pbspro.com/UserArea/agreement.html)
- # or contact the Altair Legal Department.
- #
- # Altair’s dual-license business model allows companies, individuals, and
- # organizations to create proprietary derivative works of PBS Pro and
- # distribute them - whether embedded or bundled with other software -
- # under a commercial license agreement.
- #
- # Use of Altair’s trademarks, including but not limited to "PBS™",
- # "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
- # trademark licensing policies.
- from tests.functional import *
- class TestTrillionJobid(TestFunctional):
- """
- This test suite tests the Trillion Job ID and sequence jobid
- """
- update_svr_db_script = """#!/bin/bash
- . %s
- . ${PBS_EXEC}/libexec/pbs_pgsql_env.sh
- DATA_PORT=${PBS_DATA_SERVICE_PORT}
- if [ -z $DATA_PORT ]; then
- DATA_PORT=15007
- fi
- sudo ls ${PBS_HOME}/server_priv/db_user &>/dev/null
- if [ $? -eq 0 ]; then
- DATA_USER=`sudo cat ${PBS_HOME}/server_priv/db_user`
- if [ $? -ne 0 ]; then
- exit 1
- fi
- fi
- sudo ${PBS_EXEC}/sbin/pbs_ds_password test
- if [ $? -eq 0 ]; then
- sudo ${PBS_EXEC}/sbin/pbs_dataservice stop
- if [ $? -ne 0 ]; then
- exit 1
- fi
- fi
- sudo ${PBS_EXEC}/sbin/pbs_dataservice status
- if [ $? -eq 0 ]; then
- sudo ${PBS_EXEC}/sbin/pbs_dataservice stop
- if [ $? -ne 0 ]; then
- exit 1
- fi
- fi
- sudo ${PBS_EXEC}/sbin/pbs_dataservice start
- if [ $? -ne 0 ]; then
- exit 1
- fi
- args="-U ${DATA_USER} -p ${DATA_PORT} -d pbs_datastore"
- PGPASSWORD=test psql ${args} <<-EOF
- UPDATE pbs.server SET sv_jobidnumber = %d;
- EOF
- ret=$?
- if [ $ret -eq 0 ]; then
- echo "Server sv_jobidnumber attribute has been updated successfully"
- fi
- sudo ${PBS_EXEC}/sbin/pbs_dataservice stop
- if [ $? -ne 0 ]; then
- exit 1
- fi
- exit 0
- """
- def set_svr_sv_jobidnumber(self, num=0):
- """
- This function is to set the next jobid into server database
- """
- # Stop the PBS server
- self.server.stop()
- stop_msg = 'Failed to stop PBS'
- self.assertFalse(self.server.isUp(), stop_msg)
- # Create a shell script file and update the database
- conf_path = self.du.get_pbs_conf_file()
- fn = self.du.create_temp_file(
- body=self.update_svr_db_script %
- (conf_path, num))
- self.du.chmod(path=fn, mode=0o755)
- fail_msg = 'Failed to set sequence id in database'
- ret = self.du.run_cmd(cmd=fn)
- self.assertEqual(ret['rc'], 0, fail_msg)
- # Start the PBS server
- start_msg = 'Failed to restart PBS'
- self.server.start()
- self.assertTrue(self.server.isUp(), start_msg)
- def stop_and_restart_svr(self, restart_type):
- """
- Abruptly/Gracefully stop and restart the server
- """
- try:
- if(restart_type == 'kill'):
- self.server.stop('-KILL')
- else:
- self.server.stop()
- except PbsServiceError as e:
- # The server failed to stop
- raise self.failureException("Server failed to stop:" + e.msg)
- try:
- self.server.start()
- except PbsServiceError as e:
- # The server failed to start
- raise self.failureException("Server failed to start:" + e.msg)
- restart_msg = 'Failed to restart PBS'
- self.assertTrue(self.server.isUp(), restart_msg)
- def submit_job(self, sleep=10, lower=0,
- upper=0, job_id=None, job_msg=None, verify=False):
- """
- Helper method to submit a normal/array job
- and also checks the R state and particular jobid if success,
- else log the error message
- :param sleep : Sleep time in seconds for the job
- :type sleep : int
- :param lower : Lower limit for the array job
- :type lower : int
- :param upper : Upper limit for the array job
- :type upper : int
- :param job_id : Expected jobid upon submission
- :type job_id : string
- :param job_msg : Expected message upon submission failure
- :type job_msg : int
- :param verify : Checks Job status R
- :type verify : boolean(True/False)
- """
- arr_flag = False
- j = Job(TEST_USER)
- if((lower >= 0) and (upper > lower)):
- j.set_attributes({ATTR_J: '%d-%d' % (lower, upper)})
- arr_flag = True
- total_jobs = upper - lower + 1
- j.set_sleep_time(sleep)
- try:
- jid = self.server.submit(j)
- if job_id is not None:
- self.assertEqual(jid.split('.')[0], job_id)
- if arr_flag:
- if verify:
- self.server.expect(JOB, {'job_state': 'B'}, id=jid)
- self.server.expect(
- JOB,
- {'job_state=R': total_jobs},
- count=True, id=jid, extend='t')
- else:
- if verify:
- self.server.expect(JOB, {'job_state': 'R'}, id=jid)
- except PbsSubmitError as e:
- if job_msg is not None:
- # if JobId already exist
- self.assertEqual(e.msg[0], job_msg)
- else:
- # Unable to submit Job
- self.logger.info('Error in submitting job:', e.msg)
- def submit_resv(self, resv_dur=2, resv_id=None, resv_msg=None):
- """
- Helper method to submit a reservation and checks the
- reservation id if success, else log the error message.
- :param resv_dur : Reservation duration in seconds
- :type resv_dur : int
- :param resv_id : Expected resvid upon submission
- :type resv_id : string
- :param resv_msg : Expected message upon reservation failure
- :type resv_msg : string
- """
- resv_start = int(time.time())
- a = {'reserve_start': int(resv_start),
- 'reserve_duration': int(resv_dur)
- }
- r = Reservation(TEST_USER, attrs=a)
- try:
- rid = self.server.submit(r)
- if resv_id is not None:
- self.assertEqual(rid.split('.')[0], resv_id)
- except PbsSubmitError as e:
- if resv_msg is not None:
- # if ResvId already exist
- self.assertEqual(e.msg[0], resv_msg)
- else:
- # Unable to submit reservation
- self.logger.info('Error in submitting reservation:', e.msg)
- def test_set_unset_max_job_sequence_id(self):
- """
- Set/Unset max_job_sequence_id attribute and
- also verify the attribute value after server qterm/kill
- """
- # Set as Non-admin user
- seq_id = {ATTR_max_job_sequence_id: 123456789}
- try:
- self.server.manager(MGR_CMD_SET, SERVER, seq_id, runas=TEST_USER1)
- except PbsManagerError as e:
- self.assertTrue('Unauthorized Request' in e.msg[0])
- # Set as Admin User and also check the value after server restart
- self.server.manager(
- MGR_CMD_SET,
- SERVER,
- seq_id,
- runas=ROOT_USER,
- expect=True)
- self.server.expect(SERVER, seq_id)
- self.server.log_match('svr_max_job_sequence_id set to '
- 'val %d' % (seq_id[ATTR_max_job_sequence_id]),
- starttime=self.server.ctime)
- # Abruptly kill the server
- self.stop_and_restart_svr('kill')
- self.server.expect(SERVER, seq_id)
- # Gracefully stop the server
- self.stop_and_restart_svr('normal')
- self.server.expect(SERVER, seq_id)
- # Unset as Non-admin user
- try:
- self.server.manager(
- MGR_CMD_UNSET,
- SERVER,
- 'max_job_sequence_id',
- runas=TEST_USER1)
- except PbsManagerError as e:
- self.assertTrue('Unauthorized Request' in e.msg[0])
- # Unset as Admin user
- self.server.manager(
- MGR_CMD_UNSET,
- SERVER,
- 'max_job_sequence_id',
- runas=ROOT_USER,
- expect=True)
- self.server.log_match('svr_max_job_sequence_id reverting back '
- 'to default val 9999999',
- starttime=self.server.ctime)
- def test_max_job_sequence_id_values(self):
- """
- Test to check valid/invalid values for the
- max_job_sequence_id server attribute
- """
- # Invalid Values
- invalid_values = [-9999999, '*456879846',
- 23545.45, 'ajndd', '**45', 'asgh456']
- for val in invalid_values:
- try:
- seq_id = {ATTR_max_job_sequence_id: val}
- self.server.manager(
- MGR_CMD_SET, SERVER, seq_id, runas=ROOT_USER)
- except PbsManagerError as e:
- self.assertTrue(
- 'Illegal attribute or resource value' in e.msg[0])
- # Less than or Greater than the attribute limit
- min_max_values = [120515, 999999, 1234567891234, 9999999999999]
- for val in min_max_values:
- try:
- seq_id = {ATTR_max_job_sequence_id: val}
- self.server.manager(
- MGR_CMD_SET, SERVER, seq_id, runas=ROOT_USER)
- except PbsManagerError as e:
- self.assertTrue('Cannot set max_job_sequence_id < 9999999, '
- 'or > 999999999999' in e.msg[0])
- # Valid values
- valid_values = [9999999, 123456789, 100000000000, 999999999999]
- for val in valid_values:
- seq_id = {ATTR_max_job_sequence_id: val}
- self.server.manager(
- MGR_CMD_SET,
- SERVER,
- seq_id,
- runas=ROOT_USER,
- expect=True)
- def test_max_job_sequence_id_wrap(self):
- """
- Test to check the jobid's/resvid's are wrapping it to zero or not,
- after reaching to the given limit
- """
- # Check default limit(9999999) and wrap it 0
- a = {'resources_available.ncpus': 20}
- self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
- self.submit_job(verify=True)
- self.submit_job(lower=1, upper=2, verify=True)
- self.submit_resv()
- sv_jobidnumber = 9999999 # default
- self.set_svr_sv_jobidnumber(sv_jobidnumber)
- self.submit_job(job_id='%s' % (sv_jobidnumber), verify=True)
- self.submit_job(lower=1, upper=2, job_id='0[]',
- verify=True) # wrap it
- self.submit_resv(resv_id='R1')
- # Check max limit (999999999999) and wrap it 0
- sv_jobidnumber = 999999999999 # max limit
- seq_id = {ATTR_max_job_sequence_id: sv_jobidnumber}
- self.server.manager(
- MGR_CMD_SET,
- SERVER,
- seq_id,
- runas=ROOT_USER,
- expect=True)
- self.server.expect(SERVER, seq_id)
- self.submit_job(verify=True)
- self.submit_job(lower=1, upper=2, verify=True)
- self.submit_resv()
- self.set_svr_sv_jobidnumber(sv_jobidnumber)
- self.submit_job(job_id='%s' % (sv_jobidnumber), verify=True)
- self.submit_job(lower=1, upper=2, job_id='0[]',
- verify=True) # wrap it
- self.submit_resv(resv_id='R1')
- # Someone set the max_job_sequence_id less than current jobid then also
- # wrap it 0
- sv_jobidnumber = 1234567890
- seq_id = {ATTR_max_job_sequence_id: sv_jobidnumber}
- self.server.manager(
- MGR_CMD_SET,
- SERVER,
- seq_id,
- runas=ROOT_USER,
- expect=True)
- self.server.expect(SERVER, seq_id)
- sv_jobidnumber = 123456789
- self.set_svr_sv_jobidnumber(sv_jobidnumber)
- self.submit_job(job_id='%s' % (sv_jobidnumber), verify=True)
- self.submit_job(lower=1, upper=2, job_id='123456790[]', verify=True)
- self.submit_resv(resv_id='R123456791')
- # Set smaller(12345678) than current jobid(123456790)
- sv_jobidnumber = 12345678
- seq_id = {ATTR_max_job_sequence_id: sv_jobidnumber}
- self.server.manager(
- MGR_CMD_SET,
- SERVER,
- seq_id,
- runas=ROOT_USER,
- expect=True)
- self.server.expect(SERVER, seq_id)
- self.submit_job(job_id='0', verify=True) # wrap it to zero
- self.submit_job(lower=1, upper=2, job_id='1[]', verify=True)
- self.submit_resv(resv_id='R2')
- def test_verify_sequence_window(self):
- """
- Tests the sequence window scenario in which jobid
- number save to the database once in a 1000 time
- """
- # Abruptly kill the server so next jobid should be 1000 after server
- # start
- self.set_svr_sv_jobidnumber(0)
- self.submit_job(job_id='0')
- self.submit_job(lower=1, upper=2, job_id='1[]')
- self.submit_resv(resv_id='R2')
- # kill the server forcefully
- self.stop_and_restart_svr('kill')
- self.submit_job(job_id='1000')
- self.submit_job(lower=1, upper=2, job_id='1001[]')
- self.submit_resv(resv_id='R1002')
- # if server gets killed again abruptly then next jobid would be 2000
- self.stop_and_restart_svr('kill')
- self.submit_job(job_id='2000')
- self.submit_job(lower=1, upper=2, job_id='2001[]')
- self.submit_resv(resv_id='R2002')
- # Gracefully stop the server so jobid's will continue from the last
- # jobid
- self.stop_and_restart_svr('normal')
- self.submit_job(job_id='2003')
- self.submit_job(lower=1, upper=2, job_id='2004[]')
- self.submit_resv(resv_id='R2005')
- # Verify the sequence window, incase of submitting more than 1001 jobs
- # and all jobs should submit successfully without any duplication error
- for _ in xrange(1010):
- j = Job(TEST_USER)
- self.server.submit(j)
- def test_jobid_duplication(self):
- """
- Tests the JobId/ResvId duplication after wrap
- Job/Resv shouldn't submit because previous
- jobs with the same id's are still running
- """
- seq_id = {ATTR_max_job_sequence_id: 99999999}
- self.server.manager(
- MGR_CMD_SET,
- SERVER,
- seq_id,
- runas=ROOT_USER,
- expect=True)
- self.set_svr_sv_jobidnumber(0)
- self.submit_job(sleep=1000, job_id='0')
- self.submit_job(sleep=1000, lower=1, upper=2, job_id='1[]')
- self.submit_resv(resv_dur=300, resv_id='R2')
- sv_jobidnumber = 99999999
- self.set_svr_sv_jobidnumber(sv_jobidnumber)
- self.submit_job(sleep=1000, job_id='%s' % (sv_jobidnumber))
- # Now job/resv shouldn't submit because same id's are already occupied
- msg = "qsub: Job with requested ID already exists"
- self.submit_job(job_msg=msg)
- self.submit_job(lower=1, upper=2, job_msg=msg)
- msg = 'pbs_rsub: Reservation with '\
- 'requested ID already exists'
- self.submit_resv(resv_msg=msg)
- # Job should submit successfully because all existing id's has been
- # passed
- self.submit_job(lower=1, upper=2, job_id='3[]')
- def test_jobid_resvid_after_multiple_restart(self):
- """
- Test to check the Jobid/Resvid should not wrap to 0 during
- server restart multiple times consecutively either gracefully/abruptly
- """
- j = Job(TEST_USER)
- jid = self.server.submit(j)
- curr_id = int(jid.split('.')[0])
- self.submit_job(job_id='%s' % str(curr_id + 1))
- self.submit_job(lower=1, upper=2, job_id='%s[]' % str(curr_id + 2))
- self.submit_resv(resv_id='R%s' % str(curr_id + 3))
- # Gracefully stop and start the server twice consecutively
- self.stop_and_restart_svr('normal')
- self.stop_and_restart_svr('normal')
- self.submit_job(job_id='%s' % str(curr_id + 4))
- self.submit_job(lower=1, upper=2, job_id='%s[]' % str(curr_id + 5))
- self.submit_resv(resv_id='R%s' % str(curr_id + 6))
- # Abruptly kill and start the server twice consecutively
- self.stop_and_restart_svr('kill')
- self.stop_and_restart_svr('kill')
- # Adding 1000 in current jobid for the sequence window buffer and
- # 4 for the jobs that ran already after server start
- curr_id += 1000 + 4
- self.submit_job(job_id='%s' % str(curr_id))
- self.submit_job(lower=1, upper=2, job_id='%s[]' % str(curr_id + 1))
- self.submit_resv(resv_id='R%s' % str(curr_id + 2))
- def tearDown(self):
- self.server.cleanup_jobs()
- TestFunctional.tearDown(self)
|