# coding: utf-8
# Copyright (C) 1994-2018 Altair Engineering, Inc.
# For more information, contact Altair at www.altair.com.
#
# This file is part of the PBS Professional ("PBS Pro") software.
#
# Open Source License Information:
#
# PBS Pro is free software. You can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.
# See the GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
#
# Commercial License Information:
#
# For a copy of the commercial license terms and conditions,
# go to: (http://www.pbspro.com/UserArea/agreement.html)
# or contact the Altair Legal Department.
#
# Altair’s dual-license business model allows companies, individuals, and
# organizations to create proprietary derivative works of PBS Pro and
# distribute them - whether embedded or bundled with other software -
# under a commercial license agreement.
#
# Use of Altair’s trademarks, including but not limited to "PBS™",
# "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
# trademark licensing policies.
from tests.functional import *
import resource
class TestMultipleSchedulers(TestFunctional):
"""
Test suite to test different scheduler interfaces
"""
def setup_sc1(self):
a = {'partition': 'P1,P4',
'sched_host': self.server.hostname,
'sched_port': '15050'}
self.server.manager(MGR_CMD_CREATE, SCHED,
a, id="sc1")
self.scheds['sc1'].create_scheduler()
self.scheds['sc1'].start()
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc1", expect=True)
self.scheds['sc1'].set_sched_config({'log_filter': 2048})
def setup_sc2(self):
dir_path = os.path.join(os.sep, 'var', 'spool', 'pbs', 'sched_dir')
if not os.path.exists(dir_path):
self.du.mkdir(path=dir_path, sudo=True)
a = {'partition': 'P2',
'sched_priv': os.path.join(dir_path, 'sched_priv_sc2'),
'sched_log': os.path.join(dir_path, 'sched_logs_sc2'),
'sched_host': self.server.hostname,
'sched_port': '15051'}
self.server.manager(MGR_CMD_CREATE, SCHED,
a, id="sc2")
self.scheds['sc2'].create_scheduler(dir_path)
self.scheds['sc2'].start(dir_path)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc2", expect=True)
def setup_sc3(self):
a = {'partition': 'P3',
'sched_host': self.server.hostname,
'sched_port': '15052'}
self.server.manager(MGR_CMD_CREATE, SCHED,
a, id="sc3")
self.scheds['sc3'].create_scheduler()
self.scheds['sc3'].start()
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc3", expect=True)
def setup_queues_nodes(self):
a = {'queue_type': 'execution',
'started': 'True',
'enabled': 'True'}
self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq1')
self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq2')
self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq3')
self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq4')
p1 = {'partition': 'P1'}
self.server.manager(MGR_CMD_SET, QUEUE, p1, id='wq1', expect=True)
p2 = {'partition': 'P2'}
self.server.manager(MGR_CMD_SET, QUEUE, p2, id='wq2', expect=True)
p3 = {'partition': 'P3'}
self.server.manager(MGR_CMD_SET, QUEUE, p3, id='wq3', expect=True)
p4 = {'partition': 'P4'}
self.server.manager(MGR_CMD_SET, QUEUE, p4, id='wq4', expect=True)
a = {'resources_available.ncpus': 2}
self.server.create_vnodes('vnode', a, 5, self.mom)
self.server.manager(MGR_CMD_SET, NODE, p1, id='vnode[0]', expect=True)
self.server.manager(MGR_CMD_SET, NODE, p2, id='vnode[1]', expect=True)
self.server.manager(MGR_CMD_SET, NODE, p3, id='vnode[2]', expect=True)
self.server.manager(MGR_CMD_SET, NODE, p4, id='vnode[3]', expect=True)
def common_setup(self):
self.setup_sc1()
self.setup_sc2()
self.setup_sc3()
self.setup_queues_nodes()
def check_vnodes(self, j, vnodes, jid):
self.server.status(JOB, 'exec_vnode', id=jid)
nodes = j.get_vnodes(j.exec_vnode)
for vnode in vnodes:
if vnode not in nodes:
self.assertFalse(True, str(vnode) +
" is not in exec_vnode list as expected")
def test_set_sched_priv(self):
"""
Test sched_priv can be only set to valid paths
and check for appropriate comments
"""
self.setup_sc1()
if not os.path.exists('/var/sched_priv_do_not_exist'):
self.server.manager(MGR_CMD_SET, SCHED,
{'sched_priv': '/var/sched_priv_do_not_exist'},
id="sc1")
msg = 'PBS failed validation checks for sched_priv directory'
a = {'sched_priv': '/var/sched_priv_do_not_exist',
'comment': msg,
'scheduling': 'False'}
self.server.expect(SCHED, a, id='sc1', attrop=PTL_AND, max_attempts=10)
pbs_home = self.server.pbs_conf['PBS_HOME']
self.du.run_copy(self.server.hostname,
os.path.join(pbs_home, 'sched_priv'),
os.path.join(pbs_home, 'sc1_new_priv'),
recursive=True)
self.server.manager(MGR_CMD_SET, SCHED,
{'sched_priv': '/var/spool/pbs/sc1_new_priv'},
id="sc1")
a = {'sched_priv': '/var/spool/pbs/sc1_new_priv'}
self.server.expect(SCHED, a, id='sc1', max_attempts=10)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc1")
# Blocked by PP-1202 will revisit once its fixed
# self.server.expect(SCHED, 'comment', id='sc1', op=UNSET)
def test_set_sched_log(self):
"""
Test sched_log can be only set to valid paths
and check for appropriate comments
"""
self.setup_sc1()
if not os.path.exists('/var/sched_log_do_not_exist'):
self.server.manager(MGR_CMD_SET, SCHED,
{'sched_log': '/var/sched_log_do_not_exist'},
id="sc1")
a = {'sched_log': '/var/sched_log_do_not_exist',
'comment': 'Unable to change the sched_log directory',
'scheduling': 'False'}
self.server.expect(SCHED, a, id='sc1', max_attempts=10)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc1")
pbs_home = self.server.pbs_conf['PBS_HOME']
self.du.mkdir(path=os.path.join(pbs_home, 'sc1_new_logs'),
sudo=True)
self.server.manager(MGR_CMD_SET, SCHED,
{'sched_log': '/var/spool/pbs/sc1_new_logs'},
id="sc1")
a = {'sched_log': '/var/spool/pbs/sc1_new_logs'}
self.server.expect(SCHED, a, id='sc1', max_attempts=10)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc1")
# Blocked by PP-1202 will revisit once its fixed
# self.server.expect(SCHED, 'comment', id='sc1', op=UNSET)
def test_start_scheduler(self):
"""
Test that scheduler wont start without appropriate folders created.
Scheduler will log a message if started without partition. Test
scheduler states down, idle, scheduling.
"""
self.setup_queues_nodes()
pbs_home = self.server.pbs_conf['PBS_HOME']
self.server.manager(MGR_CMD_CREATE, SCHED,
id="sc5")
a = {'sched_host': self.server.hostname,
'sched_port': '15055',
'scheduling': 'True'}
self.server.manager(MGR_CMD_SET, SCHED, a, id="sc5")
# Try starting without sched_priv and sched_logs
ret = self.scheds['sc5'].start()
self.server.expect(SCHED, {'state': 'down'}, id='sc5', max_attempts=10)
msg = "sched_priv dir is not present for scheduler"
self.assertTrue(ret['rc'], msg)
self.du.run_copy(self.server.hostname,
os.path.join(pbs_home, 'sched_priv'),
os.path.join(pbs_home, 'sched_priv_sc5'),
recursive=True, sudo=True)
ret = self.scheds['sc5'].start()
msg = "sched_logs dir is not present for scheduler"
self.assertTrue(ret['rc'], msg)
self.du.run_copy(self.server.hostname,
os.path.join(pbs_home, 'sched_logs'),
os.path.join(pbs_home, 'sched_logs_sc5'),
recursive=True, sudo=True)
ret = self.scheds['sc5'].start()
self.scheds['sc5'].log_match(
"Scheduler does not contain a partition",
max_attempts=10, starttime=self.server.ctime)
self.server.manager(MGR_CMD_SET, SCHED,
{'partition': 'P3'}, id="sc5")
self.server.manager(MGR_CMD_SET, SCHED,
{'Scheduling': 'True'}, id="sc5")
self.server.expect(SCHED, {'state': 'idle'}, id='sc5', max_attempts=10)
a = {'resources_available.ncpus': 100}
self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[2]', expect=True)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'False'}, id="sc5")
for _ in xrange(500):
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3'})
self.server.submit(j)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc5")
self.server.expect(SCHED, {'state': 'scheduling'},
id='sc5', max_attempts=10)
def test_resource_sched_reconfigure(self):
"""
Test all schedulers will reconfigure while creating,
setting or deleting a resource
"""
self.common_setup()
t = int(time.time())
self.server.manager(MGR_CMD_CREATE, RSC, id='foo')
for name in self.scheds:
self.scheds[name].log_match(
"Scheduler is reconfiguring",
max_attempts=10, starttime=t)
# sleeping to make sure we are not checking for the
# same scheduler reconfiguring message again
time.sleep(1)
t = int(time.time())
attr = {ATTR_RESC_TYPE: 'long'}
self.server.manager(MGR_CMD_SET, RSC, attr, id='foo')
for name in self.scheds:
self.scheds[name].log_match(
"Scheduler is reconfiguring",
max_attempts=10, starttime=t)
# sleeping to make sure we are not checking for the
# same scheduler reconfiguring message again
time.sleep(1)
t = int(time.time())
self.server.manager(MGR_CMD_DELETE, RSC, id='foo')
for name in self.scheds:
self.scheds[name].log_match(
"Scheduler is reconfiguring",
max_attempts=10, starttime=t)
def test_remove_partition_sched(self):
"""
Test that removing all the partitions from a scheduler
unsets partition attribute on scheduler and update scheduler logs.
"""
self.setup_sc1()
# self.setup_sc2()
self.server.manager(MGR_CMD_SET, SCHED,
{'partition': (DECR, 'P1')}, id="sc1")
self.server.manager(MGR_CMD_SET, SCHED,
{'partition': (DECR, 'P4')}, id="sc1")
self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
id="sc1", expect=True)
log_msg = "Scheduler does not contain a partition"
self.scheds['sc1'].log_match(log_msg, max_attempts=10,
starttime=self.server.ctime)
# Blocked by PP-1202 will revisit once its fixed
# self.server.manager(MGR_CMD_UNSET, SCHED, 'partition',
# id="sc2", expect=True)
def test_job_queue_partition(self):
"""
Test job submitted to a queue associated to a partition will land
into a node associated with that partition.
"""
self.common_setup()
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
'Resource_List.select': '1:ncpus=2'})
jid = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid)
self.check_vnodes(j, ['vnode[0]'], jid)
self.scheds['sc1'].log_match(
jid + ';Job run', max_attempts=10,
starttime=self.server.ctime)
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq2',
'Resource_List.select': '1:ncpus=2'})
jid = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid)
self.check_vnodes(j, ['vnode[1]'], jid)
self.scheds['sc2'].log_match(
jid + ';Job run', max_attempts=10,
starttime=self.server.ctime)
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
'Resource_List.select': '1:ncpus=2'})
jid = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid)
self.check_vnodes(j, ['vnode[2]'], jid)
self.scheds['sc3'].log_match(
jid + ';Job run', max_attempts=10,
starttime=self.server.ctime)
def test_multiple_partition_same_sched(self):
"""
Test that scheduler will serve the jobs from different
partition and run on nodes assigned to respective partitions.
"""
self.setup_sc1()
self.setup_queues_nodes()
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
'Resource_List.select': '1:ncpus=1'})
jid1 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
self.check_vnodes(j, ['vnode[0]'], jid1)
self.scheds['sc1'].log_match(
jid1 + ';Job run', max_attempts=10,
starttime=self.server.ctime)
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq4',
'Resource_List.select': '1:ncpus=1'})
jid2 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
self.check_vnodes(j, ['vnode[3]'], jid2)
self.scheds['sc1'].log_match(
jid2 + ';Job run', max_attempts=10,
starttime=self.server.ctime)
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
'Resource_List.select': '1:ncpus=1'})
jid3 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
self.check_vnodes(j, ['vnode[0]'], jid3)
self.scheds['sc1'].log_match(
jid3 + ';Job run', max_attempts=10,
starttime=self.server.ctime)
def test_multiple_queue_same_partition(self):
"""
Test multiple queue associated with same partition
is serviced by same scheduler
"""
self.setup_sc1()
self.setup_queues_nodes()
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
'Resource_List.select': '1:ncpus=1'})
jid = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid)
self.check_vnodes(j, ['vnode[0]'], jid)
self.scheds['sc1'].log_match(
jid + ';Job run', max_attempts=10,
starttime=self.server.ctime)
p1 = {'partition': 'P1'}
self.server.manager(MGR_CMD_SET, QUEUE, p1, id='wq4')
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq4',
'Resource_List.select': '1:ncpus=1'})
jid = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid)
self.check_vnodes(j, ['vnode[0]'], jid)
self.scheds['sc1'].log_match(
jid + ';Job run', max_attempts=10,
starttime=self.server.ctime)
def test_preemption_highp_queue(self):
"""
Test preemption occures only within queues which are assigned
to same partition and check for equivalence classes
"""
self.common_setup()
prio = {'Priority': 150, 'partition': 'P1'}
self.server.manager(MGR_CMD_SET, QUEUE, prio, id='wq4')
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
'Resource_List.select': '1:ncpus=2'})
jid1 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
t = int(time.time())
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq4',
'Resource_List.select': '1:ncpus=2'})
jid2 = self.server.submit(j)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc1")
self.scheds['sc1'].log_match("Number of job equivalence classes: 1",
max_attempts=10, starttime=t)
self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq4',
'Resource_List.select': '1:ncpus=2'})
jid3 = self.server.submit(j)
self.server.expect(JOB, ATTR_comment, op=SET, id=jid3)
self.server.expect(JOB, {'job_state': 'Q'}, id=jid3)
self.server.expect(JOB, {'job_state': 'S'}, id=jid1)
self.scheds['sc1'].log_match(
jid1 + ';Job preempted by suspension',
max_attempts=10, starttime=t)
# Two equivalence class one for suspended and one for remaining jobs
self.scheds['sc1'].log_match("Number of job equivalence classes: 2",
max_attempts=10, starttime=t)
def test_backfill_per_scheduler(self):
"""
Test backfilling is applicable only per scheduler
"""
self.common_setup()
t = int(time.time())
self.scheds['sc2'].set_sched_config(
{'strict_ordering': 'True ALL'})
a = {ATTR_queue: 'wq2',
'Resource_List.select': '1:ncpus=2',
'Resource_List.walltime': 60}
j = Job(TEST_USER1, attrs=a)
jid1 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
j = Job(TEST_USER1, attrs=a)
jid2 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
self.scheds['sc2'].log_match(
jid2 + ';Job is a top job and will run at',
starttime=t)
a['queue'] = 'wq3'
j = Job(TEST_USER1, attrs=a)
jid3 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
j = Job(TEST_USER1, attrs=a)
jid4 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'Q'}, id=jid4)
self.scheds['sc3'].log_match(
jid4 + ';Job is a top job and will run at',
max_attempts=5, starttime=t, existence=False)
def test_resource_per_scheduler(self):
"""
Test resources will be considered only by scheduler
to which resource is added in sched_config
"""
self.common_setup()
a = {'type': 'float', 'flag': 'nh'}
self.server.manager(MGR_CMD_CREATE, RSC, a, id='gpus')
self.scheds['sc3'].add_resource("gpus")
a = {'resources_available.gpus': 2}
self.server.manager(MGR_CMD_SET, NODE, a, id='@default', expect=True)
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
'Resource_List.select': '1:gpus=2',
'Resource_List.walltime': 60})
jid1 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
'Resource_List.select': '1:gpus=2',
'Resource_List.walltime': 60})
jid2 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
job_comment = "Not Running: Insufficient amount of resource: "
job_comment += "gpus (R: 2 A: 0 T: 2)"
self.server.expect(JOB, {'comment': job_comment}, id=jid2)
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq2',
'Resource_List.select': '1:gpus=2'})
jid3 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq2',
'Resource_List.select': '1:gpus=2'})
jid4 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid4)
def test_restart_server(self):
"""
Test after server restarts sched attributes are persistent
"""
self.setup_sc1()
sched_priv = os.path.join(
self.server.pbs_conf['PBS_HOME'], 'sched_priv_sc1')
sched_logs = os.path.join(
self.server.pbs_conf['PBS_HOME'], 'sched_logs_sc1')
a = {'sched_port': 15050,
'sched_host': self.server.hostname,
'sched_priv': sched_priv,
'sched_log': sched_logs,
'scheduling': 'True',
'scheduler_iteration': 600,
'state': 'idle',
'sched_cycle_length': '00:20:00'}
self.server.expect(SCHED, a, id='sc1',
attrop=PTL_AND, max_attempts=10)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduler_iteration': 300,
'sched_cycle_length': '00:10:00'},
id='sc1')
self.server.restart()
a['scheduler_iteration'] = 300
a['sched_cycle_length'] = '00:10:00'
self.server.expect(SCHED, a, id='sc1',
attrop=PTL_AND, max_attempts=10)
def test_resv_default_sched(self):
"""
Test reservations will only go to defualt scheduler
"""
self.setup_queues_nodes()
t = int(time.time())
r = Reservation(TEST_USER)
a = {'Resource_List.select': '2:ncpus=1'}
r.set_attributes(a)
rid = self.server.submit(r)
a = {'reserve_state': (MATCH_RE, 'RESV_CONFIRMED|2')}
self.server.expect(RESV, a, rid)
self.scheds['default'].log_match(
rid + ';Reservation Confirmed',
max_attempts=10, starttime=t)
def test_job_sorted_per_scheduler(self):
"""
Test jobs are sorted as per job_sort_formula
inside each scheduler
"""
self.common_setup()
self.server.manager(MGR_CMD_SET, SERVER,
{'job_sort_formula': 'ncpus'})
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'False'}, id="default")
j = Job(TEST_USER1, attrs={'Resource_List.select': '1:ncpus=1'})
jid1 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'Q'}, id=jid1)
j = Job(TEST_USER1, attrs={'Resource_List.select': '1:ncpus=2'})
jid2 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="default")
self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'False'}, id="sc3")
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
'Resource_List.select': '1:ncpus=1'})
jid3 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'Q'}, id=jid3)
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
'Resource_List.select': '1:ncpus=2'})
jid4 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'Q'}, id=jid4)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc3")
self.server.expect(JOB, {'job_state': 'R'}, id=jid4)
def test_qrun_job(self):
"""
Test jobs can be run by qrun by a newly created scheduler.
"""
self.setup_sc1()
self.setup_queues_nodes()
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'False'}, id="sc1")
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
'Resource_List.select': '1:ncpus=2'})
jid1 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'Q'}, id=jid1)
self.server.runjob(jid1)
self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
def test_run_limts_per_scheduler(self):
"""
Test run_limits applied at server level is
applied for every scheduler seperately.
"""
self.common_setup()
self.server.manager(MGR_CMD_SET, SERVER,
{'max_run': '[u:PBS_GENERIC=1]'})
j = Job(TEST_USER1, attrs={'Resource_List.select': '1:ncpus=1'})
jid1 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
j = Job(TEST_USER1, attrs={'Resource_List.select': '1:ncpus=1'})
jid2 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
j = Job(TEST_USER1, attrs={'Resource_List.select': '1:ncpus=1'})
jc = "Not Running: User has reached server running job limit."
self.server.expect(JOB, {'comment': jc}, id=jid2)
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
'Resource_List.select': '1:ncpus=1'})
jid3 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq3',
'Resource_List.select': '1:ncpus=1'})
jid4 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'Q'}, id=jid4)
jc = "Not Running: User has reached server running job limit."
self.server.expect(JOB, {'comment': jc}, id=jid4)
def test_multi_fairshare(self):
"""
Test different schedulers have their own fairshare trees with
their own usage
"""
self.common_setup()
default_shares = 10
default_usage = 100
sc1_shares = 20
sc1_usage = 200
sc2_shares = 30
sc2_usage = 300
sc3_shares = 40
sc3_usage = 400
self.scheds['default'].add_to_resource_group(TEST_USER, 10, 'root',
default_shares)
self.scheds['default'].set_fairshare_usage(TEST_USER, default_usage)
self.scheds['sc1'].add_to_resource_group(TEST_USER, 10, 'root',
sc1_shares)
self.scheds['sc1'].set_fairshare_usage(TEST_USER, sc1_usage)
self.scheds['sc2'].add_to_resource_group(TEST_USER, 10, 'root',
sc2_shares)
self.scheds['sc2'].set_fairshare_usage(TEST_USER, sc2_usage)
self.scheds['sc3'].add_to_resource_group(TEST_USER, 10, 'root',
sc3_shares)
self.scheds['sc3'].set_fairshare_usage(TEST_USER, sc3_usage)
# requery fairshare info from pbsfs
default_fs = self.scheds['default'].query_fairshare()
sc1_fs = self.scheds['sc1'].query_fairshare()
sc2_fs = self.scheds['sc2'].query_fairshare()
sc3_fs = self.scheds['sc3'].query_fairshare()
n = default_fs.get_node(id=10)
self.assertEquals(n.nshares, default_shares)
self.assertEquals(n.usage, default_usage)
n = sc1_fs.get_node(id=10)
self.assertEquals(n.nshares, sc1_shares)
self.assertEquals(n.usage, sc1_usage)
n = sc2_fs.get_node(id=10)
self.assertEquals(n.nshares, sc2_shares)
self.assertEquals(n.usage, sc2_usage)
n = sc3_fs.get_node(id=10)
self.assertEquals(n.nshares, sc3_shares)
self.assertEquals(n.usage, sc3_usage)
def test_fairshare_usage(self):
"""
Test the schedulers fairshare usage file and
check the usage file is updating correctly or not
"""
self.setup_sc1()
a = {'queue_type': 'execution',
'started': 'True',
'enabled': 'True',
'partition': 'P1'}
self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq1')
# Set resources to node
resc = {'resources_available.ncpus': 1,
'partition': 'P1'}
self.server.manager(MGR_CMD_SET, NODE, resc, self.mom.shortname)
# Add entry to the resource group of multisched 'sc1'
self.scheds['sc1'].add_to_resource_group('grp1', 100, 'root', 60)
self.scheds['sc1'].add_to_resource_group('grp2', 200, 'root', 40)
self.scheds['sc1'].add_to_resource_group(TEST_USER1,
101, 'grp1', 40)
self.scheds['sc1'].add_to_resource_group(TEST_USER2,
102, 'grp1', 20)
self.scheds['sc1'].add_to_resource_group(TEST_USER3,
201, 'grp2', 30)
self.scheds['sc1'].add_to_resource_group(TEST_USER4,
202, 'grp2', 10)
# Set scheduler iteration
sc_attr = {'scheduler_iteration': 7,
'scheduling': 'False'}
self.server.manager(MGR_CMD_SET, SCHED, sc_attr, id='sc1')
# Update scheduler config file
sc_config = {'fair_share': 'True',
'fairshare_usage_res': 'ncpus*100'}
self.scheds['sc1'].set_sched_config(sc_config)
# submit jobs to multisched 'sc1'
sc1_attr = {ATTR_queue: 'wq1',
'Resource_List.select': '1:ncpus=1',
'Resource_List.walltime': 10}
sc1_J1 = Job(TEST_USER1, attrs=sc1_attr)
sc1_jid1 = self.server.submit(sc1_J1)
sc1_J2 = Job(TEST_USER2, attrs=sc1_attr)
sc1_jid2 = self.server.submit(sc1_J2)
sc1_J3 = Job(TEST_USER3, attrs=sc1_attr)
sc1_jid3 = self.server.submit(sc1_J3)
self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
id='sc1')
# pbsuser1 job will run and other two will be queued
self.server.expect(JOB, {'job_state': 'R'}, id=sc1_jid1)
self.server.expect(JOB, {'job_state': 'Q'}, id=sc1_jid3)
self.server.expect(JOB, {'job_state': 'Q'}, id=sc1_jid2)
self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
id='sc1')
# need to delete the running job because PBS has only 1 ncpu and
# our work is also done with the job.
# this step will decrease the execution time as well
self.server.delete(sc1_jid1, wait=True)
# pbsuser3 job will run after pbsuser1
self.server.expect(JOB, {'job_state': 'R'}, id=sc1_jid3)
self.server.expect(JOB, {'job_state': 'Q'}, id=sc1_jid2)
self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
id='sc1')
# deleting the currently running job
self.server.delete(sc1_jid3, wait=True)
# pbsuser2 job will run in the end
self.server.expect(JOB, {'job_state': 'R'}, id=sc1_jid2)
self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
id='sc1')
# deleting the currently running job
self.server.delete(sc1_jid2, wait=True)
# query fairshare and check usage
sc1_fs_user1 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER1))
self.assertEquals(sc1_fs_user1.usage, 101)
sc1_fs_user2 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER2))
self.assertEquals(sc1_fs_user2.usage, 101)
sc1_fs_user3 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER3))
self.assertEquals(sc1_fs_user3.usage, 101)
sc1_fs_user4 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER4))
self.assertEquals(sc1_fs_user4.usage, 1)
# Restart the scheduler
self.scheds['sc1'].restart()
# Check the multisched 'sc1' usage file whether it's updating or not
self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'False'},
id='sc1')
sc1_J1 = Job(TEST_USER1, attrs=sc1_attr)
sc1_jid1 = self.server.submit(sc1_J1)
sc1_J2 = Job(TEST_USER2, attrs=sc1_attr)
sc1_jid2 = self.server.submit(sc1_J2)
sc1_J4 = Job(TEST_USER4, attrs=sc1_attr)
sc1_jid4 = self.server.submit(sc1_J4)
self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
id='sc1')
# pbsuser4 job will run and other two will be queued
self.server.expect(JOB, {'job_state': 'R'}, id=sc1_jid4)
self.server.expect(JOB, {'job_state': 'Q'}, id=sc1_jid1)
self.server.expect(JOB, {'job_state': 'Q'}, id=sc1_jid2)
self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
id='sc1')
# deleting the currently running job
self.server.delete(sc1_jid4, wait=True)
# pbsuser1 job will run after pbsuser4
self.server.expect(JOB, {'job_state': 'R'}, id=sc1_jid1)
self.server.expect(JOB, {'job_state': 'Q'}, id=sc1_jid2)
self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
id='sc1')
# deleting the currently running job
self.server.delete(sc1_jid1, wait=True)
# pbsuser2 job will run in the end
self.server.expect(JOB, {'job_state': 'R'}, id=sc1_jid2)
self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
id='sc1')
# deleting the currently running job
self.server.delete(sc1_jid2, wait=True)
# query fairshare and check usage
sc1_fs_user1 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER1))
self.assertEquals(sc1_fs_user1.usage, 201)
sc1_fs_user2 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER2))
self.assertEquals(sc1_fs_user2.usage, 201)
sc1_fs_user3 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER3))
self.assertEquals(sc1_fs_user3.usage, 101)
sc1_fs_user4 = self.scheds['sc1'].query_fairshare(name=str(TEST_USER4))
self.assertEquals(sc1_fs_user4.usage, 101)
def test_sched_priv_change(self):
"""
Test that when the sched_priv directory changes, all of the
PTL internal scheduler objects (e.g. fairshare tree) are reread
"""
new_sched_priv = os.path.join(self.server.pbs_conf['PBS_HOME'],
'sched_priv2')
if os.path.exists(new_sched_priv):
self.du.rm(path=new_sched_priv, recursive=True,
sudo=True, force=True)
dflt_sched_priv = os.path.join(self.server.pbs_conf['PBS_HOME'],
'sched_priv')
self.du.run_copy(src=dflt_sched_priv, dest=new_sched_priv,
recursive=True, sudo=True)
self.setup_sc3()
s = self.server.status(SCHED, id='sc3')
old_sched_priv = s[0]['sched_priv']
self.scheds['sc3'].add_to_resource_group(TEST_USER, 10, 'root', 20)
self.scheds['sc3'].holidays_set_year(new_year="3000")
self.scheds['sc3'].set_sched_config({'fair_share': 'True ALL'})
self.server.manager(MGR_CMD_SET, SCHED,
{'sched_priv': new_sched_priv}, id='sc3')
n = self.scheds['sc3'].fairshare_tree.get_node(id=10)
self.assertFalse(n)
y = self.scheds['sc3'].holidays_get_year()
self.assertNotEquals(y, "3000")
self.assertTrue(self.scheds['sc3'].
sched_config['fair_share'].startswith('false'))
# clean up: revert_to_defaults() will remove the new sched_priv. We
# need to remove the old one
self.du.rm(path=old_sched_priv, sudo=True, recursive=True, force=True)
def test_fairshare_decay(self):
"""
Test pbsfs's fairshare decay for multisched
"""
self.setup_sc3()
self.scheds['sc3'].add_to_resource_group(TEST_USER, 10, 'root', 20)
self.scheds['sc3'].set_fairshare_usage(name=TEST_USER, usage=10)
self.scheds['sc3'].decay_fairshare_tree()
n = self.scheds['sc3'].fairshare_tree.get_node(id=10)
self.assertTrue(n.usage, 5)
def test_cmp_fairshare(self):
"""
Test pbsfs's compare fairshare functionality for multisched
"""
self.setup_sc3()
self.scheds['sc3'].add_to_resource_group(TEST_USER, 10, 'root', 20)
self.scheds['sc3'].set_fairshare_usage(name=TEST_USER, usage=10)
self.scheds['sc3'].add_to_resource_group(TEST_USER2, 20, 'root', 20)
self.scheds['sc3'].set_fairshare_usage(name=TEST_USER2, usage=100)
user = self.scheds['sc3'].cmp_fairshare_entities(TEST_USER, TEST_USER2)
self.assertEquals(user, str(TEST_USER))
def test_pbsfs_invalid_sched(self):
"""
Test pbsfs -I where sched_name does not exist
"""
sched_name = 'foo'
pbsfs_cmd = os.path.join(self.server.pbs_conf['PBS_EXEC'],
'sbin', 'pbsfs') + ' -I ' + sched_name
ret = self.du.run_cmd(cmd=pbsfs_cmd, sudo=True)
err_msg = 'Scheduler %s does not exist' % sched_name
self.assertEquals(err_msg, ret['err'][0])
def test_pbsfs_no_fairshare_data(self):
"""
Test pbsfs -I where sched_priv_ dir
does not exist
"""
a = {'partition': 'P5',
'sched_host': self.server.hostname,
'sched_port': '15050'}
self.server.manager(MGR_CMD_CREATE, SCHED, a, id="sc5")
err_msg = 'Unable to access fairshare data: No such file or directory'
try:
# Only a scheduler object is created. Corresponding sched_priv
# dir not created yet. Try to query fairshare data.
self.scheds['sc5'].query_fairshare()
except PbsFairshareError as e:
self.assertTrue(err_msg in e.msg)
def test_pbsfs_server_restart(self):
"""
Verify that server restart has no impact on fairshare data
"""
self.setup_sc1()
self.scheds['sc1'].add_to_resource_group(TEST_USER, 20, 'root', 50)
self.scheds['sc1'].set_fairshare_usage(name=TEST_USER, usage=25)
n = self.scheds['sc1'].query_fairshare().get_node(name=str(TEST_USER))
self.assertTrue(n.usage, 25)
self.server.restart()
n = self.scheds['sc1'].query_fairshare().get_node(name=str(TEST_USER))
self.assertTrue(n.usage, 25)
def test_pbsfs_revert_to_defaults(self):
"""
Test if revert_to_defaults() works properly with multi scheds.
revert_to_defaults() removes entities from resource_group file and
removes their usage(with pbsfs -e)
"""
self.setup_sc1()
a = {'queue_type': 'execution',
'started': 'True',
'enabled': 'True',
'partition': 'P1'}
self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq1', expect=True)
a = {'partition': 'P1', 'resources_available.ncpus': 2}
self.server.manager(MGR_CMD_SET, NODE, a,
id=self.mom.shortname, expect=True)
self.scheds['sc1'].add_to_resource_group(TEST_USER,
11, 'root', 10)
self.scheds['sc1'].add_to_resource_group(TEST_USER1,
12, 'root', 10)
self.scheds['sc1'].set_sched_config({'fair_share': 'True'})
self.scheds['sc1'].set_fairshare_usage(TEST_USER, 100)
self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'False'},
id='sc1')
j1 = Job(TEST_USER, attrs={ATTR_queue: 'wq1'})
jid1 = self.server.submit(j1)
j2 = Job(TEST_USER1, attrs={ATTR_queue: 'wq1'})
jid2 = self.server.submit(j2)
t_start = int(time.time())
self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
id='sc1')
self.scheds['sc1'].log_match(
'Leaving Scheduling Cycle', starttime=t_start)
t_end = int(time.time())
job_list = self.scheds['sc1'].log_match(
'Considering job to run', starttime=t_start,
allmatch=True, endtime=t_end)
# job 1 runs second as it's run by an entity with usage = 100
self.assertTrue(jid1 in job_list[0][1])
self.server.deljob(id=jid1, wait=True)
self.server.deljob(id=jid2, wait=True)
# revert_to_defaults() does a pbsfs -I -e and cleans up
# the resource_group file
self.scheds['sc1'].revert_to_defaults()
# Fairshare tree is trimmed now. TEST_USER1 is the only entity with
# usage set. So its job, job2 will run second. If trimming was not
# successful TEST_USER would still have usage=100 and job1 would run
# second
self.scheds['sc1'].add_to_resource_group(TEST_USER,
15, 'root', 10)
self.scheds['sc1'].add_to_resource_group(TEST_USER1,
16, 'root', 10)
self.scheds['sc1'].set_sched_config({'fair_share': 'True'})
self.scheds['sc1'].set_fairshare_usage(TEST_USER1, 50)
self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'False'},
id='sc1')
j1 = Job(TEST_USER, attrs={ATTR_queue: 'wq1'})
jid1 = self.server.submit(j1)
j2 = Job(TEST_USER1, attrs={ATTR_queue: 'wq1'})
jid2 = self.server.submit(j2)
t_start = int(time.time())
self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
id='sc1')
self.scheds['sc1'].log_match(
'Leaving Scheduling Cycle', starttime=t_start)
t_end = int(time.time())
job_list = self.scheds['sc1'].log_match(
'Considering job to run', starttime=t_start,
allmatch=True, endtime=t_end)
self.assertTrue(jid2 in job_list[0][1])
def submit_jobs(self, num_jobs=1, attrs=None, user=TEST_USER):
"""
Submit num_jobs number of jobs with attrs attributes for user.
Return a list of job ids
"""
if attrs is None:
attrs = {'Resource_List.select': '1:ncpus=2'}
ret_jids = []
for _ in range(num_jobs):
J = Job(user, attrs)
jid = self.server.submit(J)
ret_jids += [jid]
return ret_jids
def test_equiv_partition(self):
"""
Test the basic behavior of job equivalence classes: submit two
different types of jobs into 2 partitions and see they are
in four different equivalence classes
"""
self.setup_sc1()
self.setup_queues_nodes()
t = int(time.time())
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'False'}, id="sc1")
# Eat up all the resources with the first job to each queue
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
self.submit_jobs(4, a)
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
self.submit_jobs(4, a)
a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'wq1'}
self.submit_jobs(3, a)
a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'wq4'}
self.submit_jobs(3, a)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc1")
self.scheds['sc1'].log_match("Number of job equivalence classes: 4",
max_attempts=10, starttime=t)
def test_equiv_multisched(self):
"""
Test the basic behavior of job equivalence classes: submit two
different types of jobs into 2 different schedulers and see they
are in two different classes in each scheduler
"""
self.setup_sc1()
self.setup_sc2()
self.setup_queues_nodes()
self.scheds['sc2'].set_sched_config({'log_filter': 2048})
t = int(time.time())
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'False'}, id="sc1")
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'False'}, id="sc2")
# Eat up all the resources with the first job to each queue
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
self.submit_jobs(4, a)
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq2'}
self.submit_jobs(4, a)
a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'wq1'}
self.submit_jobs(3, a)
a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'wq2'}
self.submit_jobs(3, a)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc1")
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc2")
self.scheds['sc1'].log_match("Number of job equivalence classes: 2",
max_attempts=10, starttime=t)
self.scheds['sc2'].log_match("Number of job equivalence classes: 2",
max_attempts=10, starttime=t)
def test_select_partition(self):
"""
Test to see if jobs with select resources not in the resources line
fall into the same equivalence class and jobs in different partition
fall into different equivalence classes
"""
self.server.manager(MGR_CMD_CREATE, RSC,
{'type': 'long', 'flag': 'nh'}, id='foo')
self.setup_sc1()
self.setup_queues_nodes()
t = int(time.time())
# Eat up all the resources
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
J = Job(TEST_USER, attrs=a)
self.server.submit(J)
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
J = Job(TEST_USER, attrs=a)
self.server.submit(J)
a = {'Resource_List.select': '1:ncpus=1:foo=4', ATTR_queue: 'wq1'}
self.submit_jobs(3, a)
a = {'Resource_List.select': '1:ncpus=1:foo=4', ATTR_queue: 'wq4'}
self.submit_jobs(3, a)
a = {'Resource_List.select': '1:ncpus=1:foo=8', ATTR_queue: 'wq1'}
self.submit_jobs(3, a)
a = {'Resource_List.select': '1:ncpus=1:foo=8', ATTR_queue: 'wq4'}
self.submit_jobs(3, a)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc1")
# Four equivalence classes: two for the resource eating job in each
# partition and two for the other jobs in each partition. While jobs
# have different amount of the foo resources which isn't in the
# resources line
self.scheds['sc1'].log_match("Number of job equivalence classes: 4",
max_attempts=10, starttime=t)
def test_select_res_partition(self):
"""
Test to see if jobs with select resources in the resources line and
in different partitions fall into the different equivalence class
"""
self.server.manager(MGR_CMD_CREATE, RSC,
{'type': 'long', 'flag': 'nh'}, id='foo')
self.setup_sc1()
self.setup_queues_nodes()
self.scheds['sc1'].add_resource("foo")
t = int(time.time())
# Eat up all the resources
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
J = Job(TEST_USER, attrs=a)
self.server.submit(J)
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
J = Job(TEST_USER, attrs=a)
self.server.submit(J)
a = {'Resource_List.select': '1:ncpus=1:foo=4', ATTR_queue: 'wq1'}
self.submit_jobs(3, a)
a = {'Resource_List.select': '1:ncpus=1:foo=4', ATTR_queue: 'wq4'}
self.submit_jobs(3, a)
a = {'Resource_List.select': '1:ncpus=1:foo=8', ATTR_queue: 'wq1'}
self.submit_jobs(3, a)
a = {'Resource_List.select': '1:ncpus=1:foo=8', ATTR_queue: 'wq4'}
self.submit_jobs(3, a)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc1")
# Six equivalence classes: two for the resource eating jobs in each
# partition and 4 for the other jobs requesting different amounts of
# the foo resource in each partition.
self.scheds['sc1'].log_match("Number of job equivalence classes: 6",
max_attempts=10, starttime=t)
def test_multiple_res_partition(self):
"""
Test to see if jobs with select resources in the resources line
with multiple custom resources fall into the different equiv class
and jobs in different partitions fall into different equiv classes
"""
self.server.manager(MGR_CMD_CREATE, RSC,
{'type': 'long', 'flag': 'nh'}, id='foo')
self.server.manager(MGR_CMD_CREATE, RSC,
{'type': 'string', 'flag': 'h'}, id='colour')
self.setup_sc1()
self.setup_queues_nodes()
self.scheds['sc1'].add_resource("foo")
self.scheds['sc1'].add_resource("colour")
t = int(time.time())
# Eat up all the resources
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
J = Job(TEST_USER, attrs=a)
self.server.submit(J)
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
J = Job(TEST_USER, attrs=a)
self.server.submit(J)
a = {'Resource_List.select': '1:ncpus=1:foo=4', ATTR_queue: 'wq1'}
self.submit_jobs(3, a)
a = {'Resource_List.select': '1:ncpus=1:foo=4', ATTR_queue: 'wq4'}
self.submit_jobs(3, a)
a = {'Resource_List.select': '1:ncpus=1:colour=blue',
ATTR_queue: 'wq1'}
self.submit_jobs(3, a)
a = {'Resource_List.select': '1:ncpus=1:colour=blue',
ATTR_queue: 'wq4'}
self.submit_jobs(3, a)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc1")
# Six equivalence classes: two for the resource eating job in each
# partition and four for the other jobs. While jobs have different
# resource requests two for each resource in different partitions
self.scheds['sc1'].log_match("Number of job equivalence classes: 6",
max_attempts=10, starttime=t)
def test_place_partition(self):
"""
Test to see if jobs with different place statements and different
partitions fall into the different equivalence classes
"""
self.setup_sc1()
self.setup_queues_nodes()
t = int(time.time())
# Eat up all the resources
a = {'Resource_List.select': '1:ncpus=2',
ATTR_queue: 'wq1'}
J = Job(TEST_USER, attrs=a)
self.server.submit(J)
a = {'Resource_List.select': '1:ncpus=2',
ATTR_queue: 'wq4'}
J = Job(TEST_USER, attrs=a)
self.server.submit(J)
a = {'Resource_List.select': '1:ncpus=1',
'Resource_List.place': 'free',
ATTR_queue: 'wq1'}
self.submit_jobs(3, a)
a = {'Resource_List.select': '1:ncpus=1',
'Resource_List.place': 'free',
ATTR_queue: 'wq4'}
self.submit_jobs(3, a)
a = {'Resource_List.select': '1:ncpus=1',
'Resource_List.place': 'excl',
ATTR_queue: 'wq1'}
self.submit_jobs(3, a)
a = {'Resource_List.select': '1:ncpus=1',
'Resource_List.place': 'excl',
ATTR_queue: 'wq4'}
self.submit_jobs(3, a)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc1")
# Six equivalence classes: two for the resource eating job in
# each partition and one for each place statement in each partition
self.scheds['sc1'].log_match("Number of job equivalence classes: 6",
max_attempts=10, starttime=t)
def test_nolimits_partition(self):
"""
Test to see that jobs from different users, groups, and projects
all fall into the same equivalence class when there are no limits
but fall into different equivalence classes for each partition
"""
self.setup_sc1()
self.setup_queues_nodes()
t = int(time.time())
# Eat up all the resources
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
J = Job(TEST_USER, attrs=a)
self.server.submit(J)
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
J = Job(TEST_USER, attrs=a)
self.server.submit(J)
a = {ATTR_queue: 'wq1'}
self.submit_jobs(3, a, user=TEST_USER)
self.submit_jobs(3, a, user=TEST_USER2)
a = {ATTR_queue: 'wq4'}
self.submit_jobs(3, a, user=TEST_USER)
self.submit_jobs(3, a, user=TEST_USER2)
b = {'group_list': TSTGRP1, ATTR_queue: 'wq1'}
self.submit_jobs(3, b, TEST_USER1)
b = {'group_list': TSTGRP2, ATTR_queue: 'wq1'}
self.submit_jobs(3, b, TEST_USER1)
b = {'group_list': TSTGRP1, ATTR_queue: 'wq4'}
self.submit_jobs(3, b, TEST_USER1)
b = {'group_list': TSTGRP2, ATTR_queue: 'wq4'}
self.submit_jobs(3, b, TEST_USER1)
b = {'project': 'p1', ATTR_queue: 'wq1'}
self.submit_jobs(3, b)
b = {'project': 'p2', ATTR_queue: 'wq1'}
self.submit_jobs(3, b)
b = {'project': 'p1', ATTR_queue: 'wq4'}
self.submit_jobs(3, b)
b = {'project': 'p2', ATTR_queue: 'wq4'}
self.submit_jobs(3, b)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc1")
# Four equivalence classes: two for the resource eating job in each
# partition and two for the rest. Since there are no limits, user,
# group, nor project are taken into account
self.scheds['sc1'].log_match("Number of job equivalence classes: 4",
max_attempts=10, starttime=t)
def test_limits_partition(self):
"""
Test to see that jobs from different users fall into different
equivalence classes with queue hard limits and partitions
"""
self.setup_sc1()
self.setup_queues_nodes()
t = int(time.time())
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'False'}, id="sc1")
self.server.manager(MGR_CMD_SET, QUEUE,
{'max_run': '[u:PBS_GENERIC=1]'}, id='wq1')
self.server.manager(MGR_CMD_SET, QUEUE,
{'max_run': '[u:PBS_GENERIC=1]'}, id='wq4')
# Eat up all the resources
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
J = Job(TEST_USER, attrs=a)
self.server.submit(J)
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
J = Job(TEST_USER, attrs=a)
self.server.submit(J)
a = {ATTR_queue: 'wq1'}
self.submit_jobs(3, a, user=TEST_USER1)
self.submit_jobs(3, a, user=TEST_USER2)
a = {ATTR_queue: 'wq4'}
self.submit_jobs(3, a, user=TEST_USER1)
self.submit_jobs(3, a, user=TEST_USER2)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc1")
# Six equivalence classes. Two for the resource eating job in
# different partitions and one for each user per partition.
self.scheds['sc1'].log_match("Number of job equivalence classes: 6",
max_attempts=10, starttime=t)
def test_job_array_partition(self):
"""
Test that various job types will fall into single equivalence
class with same type of request and will only fall into different
equivalence class if partition is different
"""
self.setup_sc1()
self.setup_queues_nodes()
t = int(time.time())
# Eat up all the resources
a = {'Resource_List.select': '1:ncpus=2', 'queue': 'wq1'}
J = Job(TEST_USER1, attrs=a)
self.server.submit(J)
a = {'Resource_List.select': '1:ncpus=2', 'queue': 'wq4'}
J = Job(TEST_USER1, attrs=a)
self.server.submit(J)
# Submit a job array
j = Job(TEST_USER)
j.set_attributes(
{ATTR_J: '1-3:1',
'Resource_List.select': '1:ncpus=2',
'queue': 'wq1'})
self.server.submit(j)
j.set_attributes(
{ATTR_J: '1-3:1',
'Resource_List.select': '1:ncpus=2',
'queue': 'wq4'})
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc1")
# Two equivalence class one for each partition
self.scheds['sc1'].log_match("Number of job equivalence classes: 2",
max_attempts=10, starttime=t)
def test_equiv_suspend_jobs(self):
"""
Test that jobs fall into different equivalence classes
after they get suspended
"""
self.setup_sc1()
self.setup_queues_nodes()
t = int(time.time())
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'False'}, id="sc1")
# Eat up all the resources
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
J = Job(TEST_USER, attrs=a)
jid1 = self.server.submit(J)
self.server.submit(J)
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
J = Job(TEST_USER, attrs=a)
jid3 = self.server.submit(J)
self.server.submit(J)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc1")
# 2 equivalence classes one for each partition
self.scheds['sc1'].log_match("Number of job equivalence classes: 2",
max_attempts=10, starttime=t)
t = int(time.time())
# Make sure that Job is in R state before issuing a signal to suspend
self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
self.server.sigjob(jobid=jid1, signal="suspend")
self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
self.server.sigjob(jobid=jid3, signal="suspend")
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc1")
# 4 equivalance classes 2 for partition 2 for suspended jobs
self.scheds['sc1'].log_match("Number of job equivalence classes: 4",
max_attempts=10, starttime=t)
def test_equiv_single_partition(self):
"""
Test that jobs fall into same equivalence class if jobs fall
into queues set to same partition
"""
self.setup_sc1()
self.setup_queues_nodes()
t = int(time.time())
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'False'}, id="sc1")
self.server.manager(MGR_CMD_SET, QUEUE,
{'partition': 'P1'}, id='wq4')
# Eat up all the resources with the first job to wq1
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq1'}
self.submit_jobs(4, a)
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq4'}
self.submit_jobs(3, a)
a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'wq1'}
self.submit_jobs(3, a)
a = {'Resource_List.select': '1:ncpus=1', ATTR_queue: 'wq4'}
self.submit_jobs(3, a)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc1")
# 2 equivalence classes one for each with different ncpus request
# as both queues are having same partition
self.scheds['sc1'].log_match("Number of job equivalence classes: 2",
max_attempts=10, starttime=t)
def test_list_multi_sched(self):
"""
Test to verify that qmgr list sched works when multiple
schedulers are present
"""
self.setup_sc1()
self.setup_sc2()
self.setup_sc3()
self.server.manager(MGR_CMD_LIST, SCHED)
self.server.manager(MGR_CMD_LIST, SCHED, id="default")
self.server.manager(MGR_CMD_LIST, SCHED, id="sc1")
dir_path = os.path.join(os.sep, 'var', 'spool', 'pbs', 'sched_dir')
a = {'partition': 'P2',
'sched_priv': os.path.join(dir_path, 'sched_priv_sc2'),
'sched_log': os.path.join(dir_path, 'sched_logs_sc2'),
'sched_host': self.server.hostname,
'sched_port': '15051'}
self.server.manager(MGR_CMD_LIST, SCHED, a, id="sc2", expect=True)
self.server.manager(MGR_CMD_LIST, SCHED, id="sc3")
try:
self.server.manager(MGR_CMD_LIST, SCHED, id="invalid_scname")
except PbsManagerError as e:
err_msg = "Unknown Scheduler"
self.assertTrue(err_msg in e.msg[0],
"Error message is not expected")
# delete sc3 sched
self.server.manager(MGR_CMD_DELETE, SCHED, id="sc3", sudo=True)
try:
self.server.manager(MGR_CMD_LIST, SCHED, id="sc3")
except PbsManagerError as e:
err_msg = "Unknown Scheduler"
self.assertTrue(err_msg in e.msg[0],
"Error message is not expected")
self.server.manager(MGR_CMD_LIST, SCHED)
self.server.manager(MGR_CMD_LIST, SCHED, id="default")
self.server.manager(MGR_CMD_LIST, SCHED, id="sc1")
self.server.manager(MGR_CMD_LIST, SCHED, id="sc2")
# delete sc1 sched
self.server.manager(MGR_CMD_DELETE, SCHED, id="sc1")
try:
self.server.manager(MGR_CMD_LIST, SCHED, id="sc1")
except PbsManagerError as e:
err_msg = "Unknown Scheduler"
self.assertTrue(err_msg in e.msg[0],
"Error message is not expected")
def test_job_sort_formula_threshold(self):
"""
Test the scheduler attribute job_sort_formula_threshold for multisched
"""
# Multisched setup
self.setup_sc3()
p3 = {'partition': 'P3'}
a = {'queue_type': 'execution',
'started': 'True',
'enabled': 'True'}
a.update(p3)
self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq1', expect=True)
a = {'resources_available.ncpus': 2}
self.server.create_vnodes('vnode', a, 2, self.mom)
self.server.manager(MGR_CMD_SET, NODE, p3, id='vnode[0]', expect=True)
# Set job_sort_formula on the server
self.server.manager(MGR_CMD_SET, SERVER, {'job_sort_formula': 'ncpus'})
# Set job_sort_formula_threshold on the multisched
self.server.manager(MGR_CMD_SET, SCHED,
{'job_sort_formula_threshold': '2'},
id="sc3", expect=True)
# Submit job to multisched
j1_attrs = {ATTR_queue: 'wq1', 'Resource_List.ncpus': '1'}
J1 = Job(TEST_USER, j1_attrs)
jid_1 = self.server.submit(J1)
# Submit job to default scheduler
J2 = Job(TEST_USER, attrs={'Resource_List.ncpus': '1'})
jid_2 = self.server.submit(J2)
msg = {'job_state': 'Q',
'comment': ('Not Running: Job is ' +
'under job_sort_formula threshold value')}
self.server.expect(JOB, msg, id=jid_1)
self.server.expect(JOB, {'job_state': 'R'}, id=jid_2)
@staticmethod
def cust_attr(name, totnodes, numnode, attrib):
a = {}
if numnode in range(0, 3):
a['resources_available.switch'] = 'A'
if numnode in range(3, 5):
a['resources_available.switch'] = 'B'
if numnode in range(6, 9):
a['resources_available.switch'] = 'A'
a['partition'] = 'P2'
if numnode in range(9, 11):
a['resources_available.switch'] = 'B'
a['partition'] = 'P2'
if numnode is 11:
a['partition'] = 'P2'
return dict(attrib.items() + a.items())
def setup_placement_set(self):
self.server.add_resource('switch', 'string_array', 'h')
a = {'resources_available.ncpus': 2}
self.server.create_vnodes(
'vnode', a, 12, self.mom, attrfunc=self.cust_attr)
self.server.manager(MGR_CMD_SET, SERVER, {'node_group_key': 'switch'})
self.server.manager(MGR_CMD_SET, SERVER, {'node_group_enable': 't'})
def test_multi_sched_explicit_ps(self):
"""
Test only_explicit_ps set to sched attr will be in affect
and will not read from default scheduler
"""
self.setup_placement_set()
self.setup_sc2()
a = {'queue_type': 'execution',
'started': 'True',
'enabled': 'True',
'partition': 'P2'}
self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq2')
a = {'Resource_List.select': '1:ncpus=2'}
j = Job(TEST_USER, attrs=a)
j1id = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=j1id)
nodes = ['vnode[5]']
self.check_vnodes(j, nodes, j1id)
a = {'Resource_List.select': '2:ncpus=2'}
j = Job(TEST_USER, attrs=a)
j2id = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=j2id)
nodes = ['vnode[3]', 'vnode[4]']
self.check_vnodes(j, nodes, j2id)
a = {'Resource_List.select': '3:ncpus=2'}
j = Job(TEST_USER, attrs=a)
j3id = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=j3id)
nodes = ['vnode[0]', 'vnode[1]', 'vnode[2]']
self.check_vnodes(j, nodes, j3id)
self.server.manager(MGR_CMD_SET, SCHED,
{'only_explicit_psets': 't'}, id='sc2')
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq2'}
j = Job(TEST_USER, attrs=a)
j4id = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=j4id)
nodes = ['vnode[9]']
self.check_vnodes(j, nodes, j4id)
a = {'Resource_List.select': '2:ncpus=2', ATTR_queue: 'wq2'}
j = Job(TEST_USER, attrs=a)
j5id = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=j5id)
nodes = ['vnode[6]', 'vnode[7]']
self.check_vnodes(j, nodes, j5id)
a = {'Resource_List.select': '3:ncpus=2', ATTR_queue: 'wq2'}
j = Job(TEST_USER, attrs=a)
j6id = self.server.submit(j)
self.server.expect(JOB, {
'job_state': 'Q',
'comment': 'Not Running: Placement set switch=A'
' has too few free resources'}, id=j6id)
def test_jobs_do_not_span_ps(self):
"""
Test do_not_span_psets set to sched attr will be in affect
and will not read from default scheduler
"""
self.setup_placement_set()
self.setup_sc2()
a = {'queue_type': 'execution',
'started': 'True',
'enabled': 'True',
'partition': 'P2'}
self.server.manager(MGR_CMD_CREATE, QUEUE, a, id='wq2')
# Scheduler sc2 cannot span across placement sets
self.server.manager(MGR_CMD_SET, SCHED, {
'do_not_span_psets': 't'}, id='sc2')
self.server.manager(MGR_CMD_SET, SCHED, {
'scheduling': 't'}, id='sc2')
a = {'Resource_List.select': '4:ncpus=2', ATTR_queue: 'wq2'}
j = Job(TEST_USER, attrs=a)
j1id = self.server.submit(j)
self.server.expect(
JOB, {'job_state': 'Q', 'comment': 'Can Never Run: can\'t fit in '
'the largest placement set, and can\'t span psets'}, id=j1id)
# Default scheduler can span as do_not_span_psets is not set
a = {'Resource_List.select': '4:ncpus=2'}
j = Job(TEST_USER, attrs=a)
j2id = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=j2id)
def test_sched_preempt_enforce_resumption(self):
"""
Test sched_preempt_enforce_resumption can be set to a multi sched
and that even if topjob_ineligible is set for a preempted job
and sched_preempt_enforce_resumption is set true , the
preempted job will be calandered
"""
self.setup_sc1()
self.setup_queues_nodes()
prio = {'Priority': 150, 'partition': 'P1'}
self.server.manager(MGR_CMD_SET, QUEUE, prio, id='wq4')
self.server.manager(MGR_CMD_SET, SCHED,
{'sched_preempt_enforce_resumption': 'true'},
id='sc1')
self.server.manager(MGR_CMD_SET, SERVER, {'backfill_depth': '2'})
# Submit a job
j = Job(TEST_USER, {'Resource_List.walltime': '120',
'Resource_List.ncpus': '2'})
jid1 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
j = Job(TEST_USER, {'Resource_List.walltime': '120',
'Resource_List.ncpus': '2',
ATTR_queue: 'wq1'})
jid2 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
# Alter topjob_ineligible for running job
self.server.alterjob(jid1, {ATTR_W: "topjob_ineligible = true"},
runas=ROOT_USER, logerr=True)
self.server.alterjob(jid1, {ATTR_W: "topjob_ineligible = true"},
runas=ROOT_USER, logerr=True)
# Create a high priority queue
a = {'queue_type': 'e', 'started': 't',
'enabled': 't', 'Priority': '150'}
self.server.manager(MGR_CMD_CREATE, QUEUE, a, id="highp")
# Submit 2 jobs to high priority queue
j = Job(TEST_USER, {'queue': 'highp', 'Resource_List.walltime': '60',
'Resource_List.ncpus': '2'})
jid3 = self.server.submit(j)
j = Job(TEST_USER, {'queue': 'wq4', 'Resource_List.walltime': '60',
'Resource_List.ncpus': '2'})
jid4 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
self.server.expect(JOB, {'job_state': 'R'}, id=jid4)
# Verify that job1 is not calendered
self.server.expect(JOB, 'estimated.start_time',
op=UNSET, id=jid1)
# Verify that job2 is calendared
self.server.expect(JOB, 'estimated.start_time',
op=SET, id=jid2)
qstat = self.server.status(JOB, 'estimated.start_time',
id=jid2)
est_time = qstat[0]['estimated.start_time']
self.assertNotEqual(est_time, None)
self.scheds['sc1'].log_match(jid2 + ";Job is a top job",
starttime=self.server.ctime)
def set_primetime(self, ptime_start, ptime_end, scid='default'):
"""
This function will set the prime time
in holidays file
"""
p_day = 'weekday'
p_hhmm = time.strftime('%H%M', time.localtime(ptime_start))
np_hhmm = time.strftime('%H%M', time.localtime(ptime_end))
self.scheds[scid].holidays_set_day(p_day, p_hhmm, np_hhmm)
p_day = 'saturday'
self.scheds[scid].holidays_set_day(p_day, p_hhmm, np_hhmm)
p_day = 'sunday'
self.scheds[scid].holidays_set_day(p_day, p_hhmm, np_hhmm)
def test_prime_time_backfill(self):
"""
Test opt_backfill_fuzzy can be set to a multi sched and
while calandering primetime/nonprimetime will be considered
"""
self.setup_sc2()
self.setup_queues_nodes()
a = {'strict_ordering': "True ALL"}
self.scheds['sc2'].set_sched_config(a)
# set primetime which will start after 30min
prime_start = int(time.time()) + 1800
prime_end = int(time.time()) + 3600
self.set_primetime(prime_start, prime_end, scid='sc2')
self.server.manager(MGR_CMD_SET, SCHED,
{'opt_backfill_fuzzy': 'high'}, id='sc2')
self.server.manager(MGR_CMD_SET, SERVER, {'backfill_depth': '2'})
# Submit a job
j = Job(TEST_USER, {'Resource_List.walltime': '60',
'Resource_List.ncpus': '2',
ATTR_queue: 'wq2'})
jid1 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
j = Job(TEST_USER1, {'Resource_List.ncpus': '2',
ATTR_queue: 'wq2'})
jid2 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
# Verify that job2 is calendared to start at primetime start
self.server.expect(JOB, 'estimated.start_time',
op=SET, id=jid2)
qstat = self.server.status(JOB, 'estimated.start_time',
id=jid2)
est_time = qstat[0]['estimated.start_time']
est_epoch = est_time
if self.server.get_op_mode() == PTL_CLI:
est_epoch = int(time.mktime(time.strptime(est_time, '%c')))
prime_mod = prime_start % 60 # ignoring the seconds
self.assertEqual((prime_start - prime_mod), est_epoch)
def test_prime_time_multisched(self):
"""
Test prime time queue can be set partition and multi sched
considers prime time queue for jobs submitted to the p_queue
"""
self.setup_sc2()
self.setup_queues_nodes()
# set primetime which will start after 30min
prime_start = int(time.time()) + 1800
prime_end = int(time.time()) + 3600
self.set_primetime(prime_start, prime_end, scid='sc2')
a = {'queue_type': 'e', 'started': 't',
'enabled': 't', 'partition': 'P2'}
self.server.manager(MGR_CMD_CREATE, QUEUE, a, id="p_queue")
j = Job(TEST_USER1, {'Resource_List.ncpus': '1',
ATTR_queue: 'wq2'})
jid1 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
j = Job(TEST_USER1, {'Resource_List.ncpus': '1',
ATTR_queue: 'p_queue'})
jid2 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
msg = 'Job will run in primetime only'
self.server.expect(JOB, {ATTR_comment: "Not Running: " + msg}, id=jid2)
self.scheds['sc2'].log_match(jid2 + ";Job only runs in primetime",
starttime=self.server.ctime)
def test_dedicated_time_multisched(self):
"""
Test dedicated time queue can be set partition and multi sched
considers dedicated time for jobs submitted to the ded_queue
"""
self.setup_sc2()
self.setup_queues_nodes()
# Create a dedicated time queue
ded_start = int(time.time()) + 1800
ded_end = int(time.time()) + 3600
self.scheds['sc2'].add_dedicated_time(start=ded_start, end=ded_end)
a = {'queue_type': 'e', 'started': 't',
'enabled': 't', 'partition': 'P2'}
self.server.manager(MGR_CMD_CREATE, QUEUE, a, id="ded_queue")
j = Job(TEST_USER1, {'Resource_List.ncpus': '1',
ATTR_queue: 'wq2'})
jid1 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
j = Job(TEST_USER1, {'Resource_List.ncpus': '1',
ATTR_queue: 'ded_queue'})
jid2 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'Q'}, id=jid2)
msg = 'Dedicated time conflict'
self.server.expect(JOB, {ATTR_comment: "Not Running: " + msg}, id=jid2)
self.scheds['sc2'].log_match(jid2 + ";Dedicated Time",
starttime=self.server.ctime)
def test_auto_sched_off_due_to_fds_limit(self):
"""
Test to make sure scheduling should be turned off automatically
when number of open files per process are exhausted
"""
if os.getuid() != 0 or sys.platform in ('cygwin', 'win32'):
self.skipTest("Test need to run as root")
try:
# get the number of open files per process
(open_files_soft_limit, open_files_hard_limit) =\
resource.getrlimit(resource.RLIMIT_NOFILE)
# set the soft limit of number of open files per process to 10
resource.setrlimit(resource.RLIMIT_NOFILE,
(10, open_files_hard_limit))
except (ValueError, resource.error):
self.assertFalse(True, "Error in accessing system RLIMIT_ "
"variables, test fails.")
self.setup_sc3()
self.server.manager(MGR_CMD_SET, SCHED, {'scheduler_iteration': 1},
id="sc3", expect=True)
self.server.manager(MGR_CMD_SET, SCHED, {'scheduling': 'True'},
id="sc3", expect=True)
self.logger.info('The sleep is 15 seconds which will trigger required '
'number of scheduling cycles that are needed to '
'exhaust open files per process which is 10 in our '
'case')
time.sleep(15)
# scheduling should not go to false once all fds per process
# are exhausted.
self.server.expect(SCHED, {'scheduling': 'True'},
id='sc3', max_attempts=10)
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (open_files_soft_limit,
open_files_hard_limit))
except (ValueError, resource.error):
self.assertFalse(True, "Error in accessing system RLIMIT_ "
"variables, test fails.")
def test_set_msched_attr_sched_log_with_sched_off(self):
"""
Test to set Multisched attributes even when its scheduling is off
and check whether they are actually be effective
"""
self.setup_sc3()
self.scheds['sc3'].set_sched_config({'log_filter': 2048})
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'False'}, id="sc3")
new_sched_log = os.path.join(self.server.pbs_conf['PBS_HOME'],
'sc3_new_logs')
if os.path.exists(new_sched_log):
self.du.rm(path=new_sched_log, recursive=True,
sudo=True, force=True)
self.du.mkdir(path=new_sched_log, sudo=True)
self.server.manager(MGR_CMD_SET, SCHED,
{'sched_log': new_sched_log},
id="sc3", expect=True)
a = {'sched_log': new_sched_log}
self.server.expect(SCHED, a, id='sc3', max_attempts=10)
# This is required since we need to call log_match only when
# the new log file is created.
time.sleep(1)
self.scheds['sc3'].log_match(
"scheduler log directory is changed to " + new_sched_log,
max_attempts=10, starttime=self.server.ctime)
def test_set_msched_attr_sched_priv_with_sched_off(self):
"""
Test to set Multisched attributes even when its scheduling is off
and check whether they are actually be effective
"""
self.setup_sc3()
self.scheds['sc3'].set_sched_config({'log_filter': 2048})
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'False'}, id="sc3")
# create and set-up a new priv directory for sc3
new_sched_priv = os.path.join(self.server.pbs_conf['PBS_HOME'],
'sched_priv_new')
if os.path.exists(new_sched_priv):
self.du.rm(path=new_sched_priv, recursive=True,
sudo=True, force=True)
dflt_sched_priv = os.path.join(self.server.pbs_conf['PBS_HOME'],
'sched_priv')
self.du.run_copy(src=dflt_sched_priv, dest=new_sched_priv,
recursive=True, sudo=True)
self.server.manager(MGR_CMD_SET, SCHED, {'sched_priv': new_sched_priv},
id="sc3", expect=True)
a = {'sched_priv': new_sched_priv}
self.server.expect(SCHED, a, id='sc3', max_attempts=10)
# This is required since we need to call log_match only when
# the new log file is created.
time.sleep(1)
self.scheds['sc3'].log_match(
"scheduler priv directory has changed to " + new_sched_priv,
max_attempts=10, starttime=self.server.ctime)
def test_set_msched_update_inbuilt_attrs_accrue_type(self):
"""
Test to make sure Multisched is able to update any one of the builtin
attributes like accrue_type
"""
a = {'eligible_time_enable': 'True'}
self.server.manager(MGR_CMD_SET, SERVER, a)
self.setup_sc3()
self.setup_queues_nodes()
a = {'Resource_List.select': '1:ncpus=2', ATTR_queue: 'wq3'}
J1 = Job(TEST_USER1, attrs=a)
J2 = Job(TEST_USER1, attrs=a)
jid1 = self.server.submit(J1)
self.server.expect(JOB, {ATTR_state: 'R'}, id=jid1)
jid2 = self.server.submit(J2)
self.server.expect(JOB, {ATTR_state: 'Q'}, id=jid2)
# accrue_type = 2 is eligible_time
self.server.expect(JOB, {ATTR_accrue_type: 2}, id=jid2)
self.server.delete(jid1, wait=True)
self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
# This makes sure that accrue_type is indeed getting changed
self.server.expect(JOB, {ATTR_accrue_type: 3}, id=jid2)
def test_multisched_not_crash(self):
"""
Test to make sure Multisched does not crash when all nodes in partition
are not associated with the corresponding queue
"""
self.setup_sc1()
self.setup_queues_nodes()
# Assign a queue with the partition P1. This queue association is not
# required as per the current Multisched feature. But this is just to
# verify even if we associate a queue to one of the nodes in partition
# the scheduler won't crash.
# Ex: Here we are associating wq1 to vnode[0] but vnode[4] has no
# queue associated to it. Expectation is in this case scheduler won't
# crash
a = {ATTR_queue: 'wq1'}
self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[0]', expect="True")
self.scheds['sc1'].terminate()
self.scheds['sc1'].start()
# Ideally the following statement is not requried. start() method
# itself should take care of updating the PID in its cache. I have
# created a new bug to fix in this framework. For the time being
# the following statement is required as a work around.
self.scheds['sc1']._update_pid(self.scheds['sc1'])
j = Job(TEST_USER1, attrs={ATTR_queue: 'wq1',
'Resource_List.select': '1:ncpus=1'})
jid1 = self.server.submit(j)
# If job goes to R state means scheduler is still alive.
self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
def test_multi_sched_job_sort_key(self):
"""
Test to make sure that jobs are sorted as per
job_sort_key in a multi sched
"""
self.setup_sc1()
self.setup_queues_nodes()
a = {'job_sort_key': '"ncpus LOW"'}
self.scheds['sc1'].set_sched_config(a)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'False'}, id="sc1")
j = Job(TEST_USER, {'Resource_List.ncpus': '2',
ATTR_queue: 'wq1'})
jid1 = self.server.submit(j)
j = Job(TEST_USER, {'Resource_List.ncpus': '1',
ATTR_queue: 'wq1'})
jid2 = self.server.submit(j)
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id="sc1")
self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
self.server.expect(JOB, {'job_state': 'Q'}, id=jid1)
def test_multi_sched_node_sort_key(self):
"""
Test to make sure nodes are sorted in the order
as per node_sort_key in a multi sched
"""
self.setup_sc1()
self.setup_queues_nodes()
a = {'partition': 'P1'}
self.server.manager(MGR_CMD_SET, NODE, a, id='@default', expect=True)
a = {'node_sort_key': '"ncpus HIGH " ALL'}
self.scheds['sc1'].set_sched_config(a)
a = {'resources_available.ncpus': 1}
self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[0]', expect=True)
a = {'resources_available.ncpus': 2}
self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[1]', expect=True)
a = {'resources_available.ncpus': 3}
self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[2]', expect=True)
a = {'resources_available.ncpus': 4}
self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[3]', expect=True)
# Offlining the node as we do not need for the test
a = {'state': 'offline'}
self.server.manager(MGR_CMD_SET, NODE, a, id='vnode[4]', expect=True)
a = {'Resource_List.select': '1:ncpus=1',
'Resource_List.place': 'excl',
ATTR_queue: 'wq1'}
j = Job(TEST_USER1, a)
jid = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid)
self.check_vnodes(j, ['vnode[3]'], jid)
j = Job(TEST_USER1, a)
jid1 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid1)
self.check_vnodes(j, ['vnode[2]'], jid1)
j = Job(TEST_USER1, a)
jid2 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid2)
self.check_vnodes(j, ['vnode[1]'], jid2)
j = Job(TEST_USER1, a)
jid3 = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, id=jid3)
self.check_vnodes(j, ['vnode[0]'], jid3)
def test_multi_sched_priority_sockets(self):
"""
Test scheduler socket connections from all the schedulers
are processed on priority
"""
self.common_setup()
self.server.manager(MGR_CMD_SET, SERVER, {'log_events': 2047})
for name in self.scheds:
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'False'}, id=name, expect=True)
a = {ATTR_queue: 'wq1',
'Resource_List.select': '1:ncpus=2',
'Resource_List.walltime': 60}
j = Job(TEST_USER1, attrs=a)
self.server.submit(j)
t = int(time.time())
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id='sc1', expect=True)
self.server.log_match("processing priority socket", starttime=t)
a = {ATTR_queue: 'wq2',
'Resource_List.select': '1:ncpus=2',
'Resource_List.walltime': 60}
j = Job(TEST_USER1, attrs=a)
self.server.submit(j)
t = int(time.time())
self.server.manager(MGR_CMD_SET, SCHED,
{'scheduling': 'True'}, id='sc2', expect=True)
self.server.log_match("processing priority socket", starttime=t)