# coding: utf-8
# Copyright (C) 1994-2018 Altair Engineering, Inc.
# For more information, contact Altair at www.altair.com.
#
# This file is part of the PBS Professional ("PBS Pro") software.
#
# Open Source License Information:
#
# PBS Pro is free software. You can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.
# See the GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
#
# Commercial License Information:
#
# For a copy of the commercial license terms and conditions,
# go to: (http://www.pbspro.com/UserArea/agreement.html)
# or contact the Altair Legal Department.
#
# Altair’s dual-license business model allows companies, individuals, and
# organizations to create proprietary derivative works of PBS Pro and
# distribute them - whether embedded or bundled with other software -
# under a commercial license agreement.
#
# Use of Altair’s trademarks, including but not limited to "PBS™",
# "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
# trademark licensing policies.
from tests.functional import *
def convert_time(fmt, tm, fixdate=False):
"""
Convert given time stamp into given format
if fixdate is True add before date if date is < 9
(This is because to match output with ctime as qstat uses it)
"""
rv = time.strftime(fmt, time.localtime(float(tm)))
if ((sys.platform not in ('cygwin', 'win32')) and (fixdate)):
rv = rv.split()
date = int(rv[2])
if date <= 9:
date = ' ' + str(date)
rv[2] = str(date)
rv = ' '.join(rv)
return rv
def create_subjob_id(job_array_id, subjob_index):
"""
insert subjob index into the square brackets of job array id
"""
idx = job_array_id.find('[]')
return job_array_id[:idx + 1] + str(subjob_index) + job_array_id[idx + 1:]
class TestPbsReliableJobStartup(TestFunctional):
"""
This tests the Reliable Job Startup Feature,
where a job can be started with extra nodes,
with node failures tolerated during job start
(and even throughout the life of the job),
before pruning job back to a set of healthy
nodes that satisfy the original request.
Custom parameters:
moms: colon-separated hostnames of five MoMs
"""
def pbs_nodefile_match_exec_host(self, jid, exec_host,
schedselect=None):
"""
Look into the PBS_NODEFILE on the first host listed in 'exec_host'
and returns True if all host entries in 'exec_host' match the entries
in the file. Otherwise, return False.
# Look for 'mpiprocs' values in 'schedselect' (if not None), and
# verify that the corresponding node hosts are appearing in
# PBS_NODEFILE 'mpiprocs' number of times.
"""
pbs_nodefile = os.path.join(self.server.
pbs_conf['PBS_HOME'], 'aux', jid)
# look for mpiprocs settings
mpiprocs = []
if schedselect is not None:
for chunk in schedselect.split('+'):
chl = chunk.split(':')
for ch in chl:
if ch.find('=') != -1:
c = ch.split('=')
if c[0] == "mpiprocs":
mpiprocs.append(c[1])
ehost = exec_host.split('+')
first_host = ehost[0].split('/')[0]
cmd = ['cat', pbs_nodefile]
ret = self.server.du.run_cmd(first_host, cmd, sudo=False)
ehost2 = []
for h in ret['out']:
ehost2.append(h.split('.')[0])
ehost1 = []
j = 0
for eh in ehost:
h = eh.split('/')
if (len(mpiprocs) > 0):
for _ in range(int(mpiprocs[j])):
ehost1.append(h[0])
else:
ehost1.append(h[0])
j += 1
self.logger.info("EHOST1=%s" % (ehost1,))
self.logger.info("EHOST2=%s" % (ehost2,))
if cmp(ehost1, ehost2) != 0:
return False
return True
def match_accounting_log(self, atype, jid, exec_host, exec_vnode,
mem, ncpus, nodect, place, select):
"""
This checks if there's an accounting log record 'atype' for
job 'jid' containing the values given (i.e.
Resource_List.exec_host, Resource_List.exec_vnode, etc...)
This throws an exception upon encountering a non-matching
accounting_logs entry.
Some example values of 'atype' are: 'u' (update record due to
release node request), 'c' (record containing the next
set of resources to be used by a phased job as a result of
release node request), 'e' (last update record for a phased job
due to a release node request), 'E' (end of job record),
's' (secondary start record).
"""
if atype == 'e':
self.mom.log_match("Job;%s;Obit sent" % (jid,), n=100,
max_attempts=5, interval=5)
self.server.accounting_match(
msg=".*%s;%s.*exec_host=%s" % (atype, jid, exec_host),
regexp=True, n=20, max_attempts=3)
self.server.accounting_match(
msg=".*%s;%s.*exec_vnode=%s" % (atype, jid, exec_vnode),
regexp=True, n=20, max_attempts=3)
self.server.accounting_match(
msg=".*%s;%s.*Resource_List\.mem=%s" % (atype, jid, mem),
regexp=True, n=20, max_attempts=3)
self.server.accounting_match(
msg=".*%s;%s.*Resource_List\.ncpus=%d" % (atype, jid, ncpus),
regexp=True, n=20, max_attempts=3)
self.server.accounting_match(
msg=".*%s;%s.*Resource_List\.nodect=%d" % (atype, jid, nodect),
regexp=True, n=20, max_attempts=3)
self.server.accounting_match(
msg=".*%s;%s.*Resource_List\.place=%s" % (atype, jid, place),
regexp=True, n=20, max_attempts=3)
self.server.accounting_match(
msg=".*%s;%s.*Resource_List\.select=%s" % (atype, jid, select),
regexp=True, n=20, max_attempts=3)
if (atype != 'c') and (atype != 'S') and (atype != 's'):
self.server.accounting_match(
msg=".*%s;%s.*resources_used\." % (atype, jid),
regexp=True, n=20, max_attempts=3)
def match_vnode_status(self, vnode_list, state, jobs=None, ncpus=None,
mem=None):
"""
Given a list of vnode names in 'vnode_list', check to make
sure each vnode's state, jobs string, resources_assigned.mem,
and resources_assigned.ncpus match the passed arguments.
This will throw an exception if a match is not found.
"""
for vn in vnode_list:
dict_match = {'state': state}
if jobs is not None:
dict_match['jobs'] = jobs
if ncpus is not None:
dict_match['resources_assigned.ncpus'] = ncpus
if mem is not None:
dict_match['resources_assigned.mem'] = mem
self.server.expect(VNODE, dict_match, id=vn)
def create_and_submit_job(self, job_type, attribs=None):
"""
create the job object and submit it to the server
based on 'job_type' and attributes list 'attribs'.
"""
if attribs:
retjob = Job(TEST_USER, attrs=attribs)
else:
retjob = Job(TEST_USER)
if job_type == 'job1':
retjob.create_script(self.script['job1'])
elif job_type == 'job1_2':
retjob.create_script(self.script['job1_2'])
elif job_type == 'job1_3':
retjob.create_script(self.script['job1_3'])
elif job_type == 'job1_4':
retjob.create_script(self.script['job1_4'])
elif job_type == 'job2':
retjob.create_script(self.script['job2'])
elif job_type == 'job3':
retjob.create_script(self.script['job3'])
elif job_type == 'job4':
retjob.create_script(self.script['job4'])
elif job_type == 'job5':
retjob.create_script(self.script['job5'])
elif job_type == 'jobA':
retjob.create_script(self.script['jobA'])
return self.server.submit(retjob)
def setUp(self):
if len(self.moms) != 5:
cmt = "need 5 mom hosts: -p moms=::::"
self.skip_test(reason=cmt)
TestFunctional.setUp(self)
Job.dflt_attributes[ATTR_k] = 'oe'
self.server.cleanup_jobs(extend="force")
self.momA = self.moms.values()[0]
self.momB = self.moms.values()[1]
self.momC = self.moms.values()[2]
self.momD = self.moms.values()[3]
self.momE = self.moms.values()[4]
# Now start setting up and creating the vnodes
self.server.manager(MGR_CMD_DELETE, NODE, None, "")
# set node momA
self.hostA = self.momA.shortname
self.momA.delete_vnode_defs()
vnode_prefix = self.hostA
a = {'resources_available.mem': '1gb',
'resources_available.ncpus': '1'}
vnodedef = self.momA.create_vnode_def(vnode_prefix, a, 4)
self.assertNotEqual(vnodedef, None)
self.momA.insert_vnode_def(vnodedef, 'vnode.def')
self.server.manager(MGR_CMD_CREATE, NODE, id=self.hostA)
# set node momB
self.hostB = self.momB.shortname
self.momB.delete_vnode_defs()
vnode_prefix = self.hostB
a = {'resources_available.mem': '1gb',
'resources_available.ncpus': '1'}
vnodedef = self.momB.create_vnode_def(vnode_prefix, a, 5,
usenatvnode=True)
self.assertNotEqual(vnodedef, None)
self.momB.insert_vnode_def(vnodedef, 'vnode.def')
self.server.manager(MGR_CMD_CREATE, NODE, id=self.hostB)
# set node momC
# This one has no vnode definition.
self.hostC = self.momC.shortname
self.momC.delete_vnode_defs()
self.server.manager(MGR_CMD_CREATE, NODE, id=self.hostC)
a = {'resources_available.ncpus': 2,
'resources_available.mem': '2gb'}
# set natural vnode of hostC
self.server.manager(MGR_CMD_SET, NODE, a, id=self.hostC,
expect=True)
# set node momD
# This one has no vnode definition.
self.hostD = self.momD.shortname
self.momD.delete_vnode_defs()
self.server.manager(MGR_CMD_CREATE, NODE, id=self.hostD)
a = {'resources_available.ncpus': 5,
'resources_available.mem': '5gb'}
# set natural vnode of hostD
self.server.manager(MGR_CMD_SET, NODE, a, id=self.hostD,
expect=True)
# set node momE
self.hostE = self.momE.shortname
self.momE.delete_vnode_defs()
vnode_prefix = self.hostE
a = {'resources_available.mem': '1gb',
'resources_available.ncpus': '1'}
vnodedef = self.momE.create_vnode_def(vnode_prefix, a, 5,
usenatvnode=True)
self.assertNotEqual(vnodedef, None)
self.momE.insert_vnode_def(vnodedef, 'vnode.def')
self.server.manager(MGR_CMD_CREATE, NODE, id=self.hostE)
# Various node names
self.nA = self.hostA
self.nAv0 = '%s[0]' % (self.hostA,)
self.nAv1 = '%s[1]' % (self.hostA,)
self.nAv2 = '%s[2]' % (self.hostA,)
self.nAv3 = '%s[3]' % (self.hostA,)
self.nB = self.hostB
self.nBv0 = '%s[0]' % (self.hostB,)
self.nBv1 = '%s[1]' % (self.hostB,)
self.nBv2 = '%s[2]' % (self.hostB,)
self.nBv3 = '%s[3]' % (self.hostB,)
self.nC = self.hostC
self.nD = self.hostD
self.nE = self.hostE
self.nEv0 = '%s[0]' % (self.hostE,)
self.nEv1 = '%s[1]' % (self.hostE,)
self.nEv2 = '%s[2]' % (self.hostE,)
self.nEv3 = '%s[3]' % (self.hostE,)
a = {'state': 'free', 'resources_available.ncpus': (GE, 1)}
self.server.expect(VNODE, {'state=free': 17}, count=True,
max_attempts=10, interval=2)
if sys.platform in ('cygwin', 'win32'):
SLEEP_CMD = "pbs-sleep"
else:
SLEEP_CMD = os.path.join(os.sep, "bin", "sleep")
self.pbs_release_nodes_cmd = os.path.join(
self.server.pbs_conf['PBS_EXEC'], 'bin', 'pbs_release_nodes')
FIB37 = os.path.join(self.server.pbs_conf['PBS_EXEC'], 'bin',
'pbs_python') + \
' -c "exec(\\\"def fib(i):\\n if i < 2:\\n \
return i\\n return fib(i-1) + fib(i-2)\\n\\nprint fib(37)\\\")"'
self.fib37_value = 24157817
FIB40 = os.path.join(self.server.pbs_conf['PBS_EXEC'], 'bin',
'pbs_python') + \
' -c "exec(\\\"def fib(i):\\n if i < 2:\\n \
return i\\n return fib(i-1) + fib(i-2)\\n\\nprint fib(40)\\\")"'
# job submission arguments
self.script = {}
# original select spec
self.job1_oselect = "ncpus=3:mem=2gb+ncpus=3:mem=2gb+ncpus=2:mem=2gb"
self.job1_place = "scatter"
# incremented values at job start and just before actual launch
self.job1_iselect = \
"1:ncpus=3:mem=2gb+2:ncpus=3:mem=2gb+2:ncpus=2:mem=2gb"
self.job1_ischedselect = self.job1_iselect
self.job1_iexec_host = "%s/0*0+%s/0*0+%s/0*3+%s/0*2+%s/0*0" % (
self.nA, self.nB, self.nD, self.nC, self.nE)
self.job1_iexec_vnode = \
"(%s:mem=1048576kb:ncpus=1+" % (self.nAv0,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nAv1,) + \
"%s:ncpus=1)+" % (self.nAv2) + \
"(%s:mem=1048576kb:ncpus=1+" % (self.nB,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nBv0,) + \
"%s:ncpus=1)+" % (self.nBv1,) + \
"(%s:ncpus=3:mem=2097152kb)+" % (self.nD,) + \
"(%s:ncpus=2:mem=2097152kb)+" % (self.nC,) + \
"(%s:mem=1048576kb:ncpus=1+" % (self.nE,) + \
"%s:mem=1048576kb:ncpus=1)" % (self.nEv0,)
self.job1_isel_esc = self.job1_iselect.replace("+", "\+")
self.job1_iexec_host_esc = self.job1_iexec_host.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.job1_iexec_vnode_esc = self.job1_iexec_vnode.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
# expected values version 1 upon successful job launch
self.job1_select = \
"1:ncpus=3:mem=2gb+1:ncpus=3:mem=2gb+1:ncpus=2:mem=2gb"
self.job1_schedselect = self.job1_select
self.job1_exec_host = "%s/0*0+%s/0*3+%s/0*0" % (
self.nA, self.nD, self.nE)
self.job1_exec_vnode = \
"(%s:mem=1048576kb:ncpus=1+" % (self.nAv0,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nAv1,) + \
"%s:ncpus=1)+" % (self.nAv2) + \
"(%s:ncpus=3:mem=2097152kb)+" % (self.nD,) + \
"(%s:mem=1048576kb:ncpus=1+" % (self.nE,) + \
"%s:mem=1048576kb:ncpus=1)" % (self.nEv0,)
self.job1_sel_esc = self.job1_select.replace("+", "\+")
self.job1_exec_host_esc = self.job1_exec_host.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.job1_exec_vnode_esc = self.job1_exec_vnode.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
# expected values version 2 upon successful job launch
self.job1v2_select = \
"1:ncpus=3:mem=2gb+1:ncpus=3:mem=2gb+1:ncpus=2:mem=2gb"
self.job1v2_schedselect = self.job1v2_select
self.job1v2_exec_host = "%s/0*0+%s/0*3+%s/0*2" % (
self.nA, self.nD, self.nC)
self.job1v2_exec_vnode = \
"(%s:mem=1048576kb:ncpus=1+" % (self.nAv0,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nAv1,) + \
"%s:ncpus=1)+" % (self.nAv2) + \
"(%s:ncpus=3:mem=2097152kb)+" % (self.nD,) + \
"(%s:ncpus=2:mem=2097152kb)" % (self.nC,)
self.job1v2_sel_esc = self.job1v2_select.replace("+", "\+")
self.job1v2_exec_host_esc = self.job1v2_exec_host.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.job1v2_exec_vnode_esc = self.job1v2_exec_vnode.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
# expected values version 3 upon successful job launch
self.job1v3_select = \
"1:ncpus=3:mem=2gb+1:ncpus=3:mem=2gb+1:ncpus=2:mem=2gb"
self.job1v3_schedselect = self.job1v3_select
self.job1v3_exec_host = "%s/0*0+%s/0*0+%s/0*0" % (
self.nA, self.nB, self.nE)
self.job1v3_exec_vnode = \
"(%s:mem=1048576kb:ncpus=1+" % (self.nAv0,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nAv1,) + \
"%s:ncpus=1)+" % (self.nAv2) + \
"(%s:mem=1048576kb:ncpus=1+" % (self.nB,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nBv0,) + \
"%s:ncpus=1)+" % (self.nBv1,) + \
"(%s:mem=1048576kb:ncpus=1+" % (self.nE,) + \
"%s:mem=1048576kb:ncpus=1)" % (self.nEv0,)
self.job1v3_sel_esc = self.job1v3_select.replace("+", "\+")
self.job1v3_exec_host_esc = self.job1v3_exec_host.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.job1v3_exec_vnode_esc = self.job1v3_exec_vnode.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
# expected values version 4 upon successful job launch
self.job1v4_select = \
"1:ncpus=3:mem=2gb+1:ncpus=3:mem=2gb+1:ncpus=2:mem=2gb"
self.job1v4_schedselect = self.job1v4_select
self.job1v4_exec_host = "%s/0*0+%s/0*0+%s/0*2" % (
self.nA, self.nB, self.nD)
self.job1v4_exec_vnode = \
"(%s:mem=1048576kb:ncpus=1+" % (self.nAv0,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nAv1,) + \
"%s:ncpus=1)+" % (self.nAv2) + \
"(%s:mem=1048576kb:ncpus=1+" % (self.nB,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nBv0,) + \
"%s:ncpus=1)+" % (self.nBv1,) + \
"(%s:ncpus=2:mem=2097152kb)" % (self.nD,)
self.job1v4_sel_esc = self.job1v4_select.replace("+", "\+")
self.job1v4_exec_host_esc = self.job1v4_exec_host.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.job1v4_exec_vnode_esc = self.job1v4_exec_vnode.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
# expected values version 5 upon successful job launch
self.job1v5_select = \
"1:ncpus=3:mem=2gb+1:ncpus=3:mem=2gb+1:ncpus=2:mem=2gb"
self.job1v5_schedselect = self.job1v5_select
self.job1v5_exec_host = "%s/0*0+%s/0*0+%s/0*2" % (
self.nA, self.nB, self.nC)
self.job1v5_exec_vnode = \
"(%s:mem=1048576kb:ncpus=1+" % (self.nAv0,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nAv1,) + \
"%s:ncpus=1)+" % (self.nAv2) + \
"(%s:mem=1048576kb:ncpus=1+" % (self.nB,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nBv0,) + \
"%s:ncpus=1)+" % (self.nBv1,) + \
"(%s:ncpus=2:mem=2097152kb)" % (self.nC,)
self.job1v5_sel_esc = self.job1v5_select.replace("+", "\+")
self.job1v5_exec_host_esc = self.job1v5_exec_host.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.job1v5_exec_vnode_esc = self.job1v5_exec_vnode.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
# expected values version 6 upon successful job launch
self.job1v6_select = \
"1:ncpus=3:mem=2gb+1:ncpus=3:mem=2gb+1:ncpus=2:mem=2gb"
self.job1v6_select += "+1:ncpus=1:mem=1gb"
self.job1v6_schedselect = self.job1v6_select
self.job1v6_exec_host = "%s/0*0+%s/0*0+%s/0*2+%s/0" % (
self.nA, self.nB, self.nC, self.nE)
self.job1v6_exec_vnode = \
"(%s:mem=1048576kb:ncpus=1+" % (self.nAv0,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nAv1,) + \
"%s:ncpus=1)+" % (self.nAv2) + \
"(%s:mem=1048576kb:ncpus=1+" % (self.nB,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nBv0,) + \
"%s:ncpus=1)+" % (self.nBv1,) + \
"(%s:ncpus=2:mem=2097152kb)+" % (self.nC,) + \
"(%s:mem=1048576kb:ncpus=1)" % (self.nE,)
self.job1v6_sel_esc = self.job1v6_select.replace("+", "\+")
self.job1v6_exec_host_esc = self.job1v6_exec_host.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.job1v6_exec_vnode_esc = self.job1v6_exec_vnode.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
self.script['job1'] = """
#PBS -l select=%s
#PBS -l place=%s
#PBS -W umask=022
#PBS -S /bin/bash
echo "$PBS_NODEFILE"
cat $PBS_NODEFILE
echo 'FIB TESTS'
echo 'pbsdsh -n 1 fib 37'
pbsdsh -n 1 -- %s
echo 'pbsdsh -n 2 fib 37'
pbsdsh -n 2 -- %s
echo 'fib 37'
%s
echo 'HOSTNAME TESTS'
echo 'pbsdsh -n 0 hostname'
pbsdsh -n 0 -- hostname -s
echo 'pbsdsh -n 1 hostname'
pbsdsh -n 1 -- hostname -s
echo 'pbsdsh -n 2 hostname'
pbsdsh -n 2 -- hostname -s
echo 'PBS_NODEFILE tests'
for h in `cat $PBS_NODEFILE`
do
echo "HOST=$h"
echo "pbs_tmrsh $h hostname"
pbs_tmrsh $h hostname -s
done
""" % (self.job1_oselect, self.job1_place, FIB37, FIB37, FIB37)
# original select spec
self.jobA_oselect = "ncpus=1:mem=1gb+ncpus=1:mem=1gb+ncpus=1:mem=1gb"
self.jobA_place = "scatter"
# incremented values at job start and just before actual launch
self.jobA_iselect = \
"1:ncpus=1:mem=1gb+2:ncpus=1:mem=1gb+2:ncpus=1:mem=1gb"
self.jobA_ischedselect = self.jobA_iselect
self.jobA_iexec_host1 = "%s/0+%s/0+%s/0+%s/0+%s/0" % (
self.nA, self.nB, self.nC, self.nD, self.nE)
self.jobA_iexec_host2 = "%s/1+%s/1+%s/1+%s/1+%s/1" % (
self.nA, self.nB, self.nC, self.nD, self.nE)
self.jobA_iexec_host3 = "%s/2+%s/2+%s/0+%s/2+%s/0" % (
self.nA, self.nB, self.nC, self.nD, self.nE)
self.jobA_iexec_vnode1 = \
"(%s:ncpus=1:mem=1048576kb)+" % (self.nAv0,) + \
"(%s:ncpus=1:mem=1048576kb)+" % (self.nB,) + \
"(%s:ncpus=1:mem=1048576kb)+" % (self.nC,) + \
"(%s:ncpus=1:mem=1048576kb)+" % (self.nD,) + \
"(%s:ncpus=1:mem=1048576kb)" % (self.nE,)
self.jobA_iexec_vnode2 = \
"(%s:ncpus=1:mem=1048576kb)+" % (self.nAv1,) + \
"(%s:ncpus=1:mem=1048576kb)+" % (self.nBv0,) + \
"(%s:ncpus=1:mem=1048576kb)+" % (self.nC,) + \
"(%s:ncpus=1:mem=1048576kb)+" % (self.nD,) + \
"(%s:ncpus=1:mem=1048576kb)" % (self.nEv0,)
self.jobA_iexec_vnode3 = \
"(%s:ncpus=1:mem=1048576kb)+" % (self.nAv2,) + \
"(%s:ncpus=1:mem=1048576kb)+" % (self.nBv1,) + \
"(%s:ncpus=1:mem=1048576kb)+" % (self.nC,) + \
"(%s:ncpus=1:mem=1048576kb)+" % (self.nD,) + \
"(%s:ncpus=1:mem=1048576kb)" % (self.nE,)
self.jobA_isel_esc = self.jobA_iselect.replace("+", "\+")
self.jobA_iexec_host1_esc = self.jobA_iexec_host1.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.jobA_iexec_host2_esc = self.jobA_iexec_host2.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.jobA_iexec_host3_esc = self.jobA_iexec_host3.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.jobA_iexec_vnode1_esc = self.jobA_iexec_vnode1.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
self.jobA_iexec_vnode2_esc = self.jobA_iexec_vnode2.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
self.jobA_iexec_vnode3_esc = self.jobA_iexec_vnode3.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
# expected values version 1 upon successful job launch
self.jobA_select = \
"1:ncpus=1:mem=1gb+1:ncpus=1:mem=1gb+1:ncpus=1:mem=1gb"
self.jobA_schedselect = self.jobA_select
self.jobA_exec_host1 = "%s/0+%s/0+%s/0" % (
self.nA, self.nB, self.nD)
self.jobA_exec_host2 = "%s/1+%s/1+%s/1" % (
self.nA, self.nB, self.nD)
self.jobA_exec_host3 = "%s/2+%s/2+%s/2" % (
self.nA, self.nB, self.nD)
self.jobA_exec_vnode1 = \
"(%s:ncpus=1:mem=1048576kb)+" % (self.nAv0,) + \
"(%s:ncpus=1:mem=1048576kb)+" % (self.nB,) + \
"(%s:ncpus=1:mem=1048576kb)" % (self.nD,)
self.jobA_exec_vnode2 = \
"(%s:ncpus=1:mem=1048576kb)+" % (self.nAv1,) + \
"(%s:ncpus=1:mem=1048576kb)+" % (self.nBv0,) + \
"(%s:ncpus=1:mem=1048576kb)" % (self.nD,)
self.jobA_exec_vnode3 = \
"(%s:ncpus=1:mem=1048576kb)+" % (self.nAv2,) + \
"(%s:ncpus=1:mem=1048576kb)+" % (self.nBv1,) + \
"(%s:ncpus=1:mem=1048576kb)" % (self.nD,)
self.jobA_sel_esc = self.jobA_select.replace("+", "\+")
self.jobA_exec_host1_esc = self.jobA_exec_host1.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.jobA_exec_host2_esc = self.jobA_exec_host2.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.jobA_exec_host3_esc = self.jobA_exec_host3.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.jobA_exec_vnode1_esc = self.jobA_exec_vnode1.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
self.jobA_exec_vnode2_esc = self.jobA_exec_vnode2.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
self.jobA_exec_vnode3_esc = self.jobA_exec_vnode3.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
self.script['jobA'] = """
#PBS -J 1-3
#PBS -l select=%s
#PBS -l place=%s
#PBS -S /bin/bash
echo 'HOSTNAME TESTS'
echo 'pbsdsh -n 0 hostname'
pbsdsh -n 0 -- hostname -s
echo 'pbsdsh -n 1 hostname'
pbsdsh -n 1 -- hostname -s
echo 'pbsdsh -n 2 hostname'
pbsdsh -n 2 -- hostname -s
sleep 180
""" % (self.jobA_oselect, self.jobA_place)
self.script['job1_3'] = """
#PBS -l select=%s
#PBS -l place=%s
#PBS -W umask=022
#PBS -S /bin/bash
echo "$PBS_NODEFILE"
cat $PBS_NODEFILE
echo 'FIB TESTS'
echo 'pbsdsh -n 2 fib 40'
pbsdsh -n 2 -- %s
echo 'fib 40'
%s
echo 'HOSTNAME TESTS'
echo 'pbsdsh -n 0 hostname'
pbsdsh -n 0 -- hostname -s
echo 'pbsdsh -n 2 hostname'
pbsdsh -n 2 -- hostname -s
""" % (self.job1_oselect, self.job1_place, FIB40, FIB40)
self.script['job1_2'] = """
#PBS -l select=%s
#PBS -l place=%s
#PBS -W umask=022
#PBS -S /bin/bash
echo "$PBS_NODEFILE"
cat $PBS_NODEFILE
echo 'FIB TESTS'
echo 'pbsdsh -n 2 fib 37'
pbsdsh -n 2 -- %s
echo 'fib 37'
%s
echo 'HOSTNAME TESTS'
echo 'pbsdsh -n 0 hostname'
pbsdsh -n 0 -- hostname -s
echo 'pbsdsh -n 2 hostname'
pbsdsh -n 2 -- hostname -s
""" % (self.job1_oselect, self.job1_place, FIB37, FIB37)
self.script['job1_3'] = """
#PBS -l select=%s
#PBS -l place=%s
#PBS -W umask=022
#PBS -S /bin/bash
echo "$PBS_NODEFILE"
cat $PBS_NODEFILE
echo 'FIB TESTS'
echo 'pbsdsh -n 2 fib 40'
pbsdsh -n 2 -- %s
echo 'fib 40'
%s
echo 'HOSTNAME TESTS'
echo 'pbsdsh -n 0 hostname'
pbsdsh -n 0 -- hostname -s
echo 'pbsdsh -n 2 hostname'
pbsdsh -n 2 -- hostname -s
""" % (self.job1_oselect, self.job1_place, FIB40, FIB40)
self.script['job1_4'] = """
#PBS -l select=%s
#PBS -l place=%s
#PBS -W umask=022
#PBS -S /bin/bash
echo "$PBS_NODEFILE"
cat $PBS_NODEFILE
echo 'FIB TESTS'
echo 'pbsdsh -n 1 fib 37'
pbsdsh -n 1 -- %s
echo 'pbsdsh -n 2 fib 37'
pbsdsh -n 2 -- %s
echo 'pbsdsh -n 3 fib 37'
pbsdsh -n 3 -- %s
echo 'fib 37'
%s
echo 'HOSTNAME TESTS'
echo 'pbsdsh -n 0 hostname'
pbsdsh -n 0 -- hostname -s
echo 'pbsdsh -n 1 hostname'
pbsdsh -n 1 -- hostname -s
echo 'pbsdsh -n 2 hostname'
pbsdsh -n 2 -- hostname -s
echo 'pbsdsh -n 3 hostname'
pbsdsh -n 3 -- hostname -s
echo 'PBS_NODEFILE tests'
for h in `cat $PBS_NODEFILE`
do
echo "HOST=$h"
echo "pbs_tmrsh $h hostname"
pbs_tmrsh $h hostname -s
done
""" % (self.job1_oselect, self.job1_place, FIB37, FIB37, FIB37, FIB37)
# original select spec
self.job2_oselect = "ncpus=3:mem=2gb+ncpus=3:mem=2gb+ncpus=0:mem=2gb"
self.job2_place = "scatter"
# incremented values at job start and just before actual launch
self.job2_iselect = \
"1:ncpus=3:mem=2gb+2:ncpus=3:mem=2gb+2:ncpus=0:mem=2gb"
self.job2_ischedselect = self.job2_iselect
self.job2_iexec_host = "%s/0*0+%s/0*0+%s/0*3+%s/0*0+%s/0*0" % (
self.nA, self.nB, self.nD, self.nC, self.nE)
self.job2_iexec_vnode = \
"(%s:mem=1048576kb:ncpus=1+" % (self.nAv0,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nAv1,) + \
"%s:ncpus=1)+" % (self.nAv2) + \
"(%s:mem=1048576kb:ncpus=1+" % (self.nB,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nBv0,) + \
"%s:ncpus=1)+" % (self.nBv1,) + \
"(%s:ncpus=3:mem=2097152kb)+" % (self.nD,) + \
"(%s:ncpus=0:mem=2097152kb)+" % (self.nC,) + \
"(%s:mem=1048576kb:ncpus=0+" % (self.nE,) + \
"%s:mem=1048576kb)" % (self.nEv0,)
self.job2_isel_esc = self.job2_iselect.replace("+", "\+")
self.job2_iexec_host_esc = self.job2_iexec_host.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.job2_iexec_vnode_esc = self.job2_iexec_vnode.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
# expected values version upon successful job launch
self.job2_select = \
"1:ncpus=3:mem=2gb+1:ncpus=3:mem=2gb+1:ncpus=0:mem=2gb"
self.job2_schedselect = self.job2_select
self.job2_exec_host = "%s/0*0+%s/0*3+%s/0*0" % (
self.nA, self.nD, self.nE)
# ncpus=0 assigned hosts are not listed in $PBS_NODEFILE
self.job2_exec_host_nfile = "%s/0*0+%s/0*3" % (
self.nA, self.nD)
self.job2_exec_vnode = \
"(%s:mem=1048576kb:ncpus=1+" % (self.nAv0,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nAv1,) + \
"%s:ncpus=1)+" % (self.nAv2) + \
"(%s:ncpus=3:mem=2097152kb)+" % (self.nD,) + \
"(%s:mem=1048576kb+" % (self.nE,) + \
"%s:mem=1048576kb)" % (self.nEv0,)
self.job2_sel_esc = self.job2_select.replace("+", "\+")
self.job2_exec_host_esc = self.job2_exec_host.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.job2_exec_vnode_esc = self.job2_exec_vnode.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
self.script['job2'] = \
"#PBS -l select=" + self.job2_oselect + "\n" + \
"#PBS -l place=" + self.job2_place + "\n" + \
SLEEP_CMD + " 60\n"
# Job with mpiprocs and ompthreads requested
self.job3_oselect = \
"ncpus=3:mem=2gb:mpiprocs=3:ompthreads=2+" + \
"ncpus=3:mem=2gb:mpiprocs=3:ompthreads=3+" + \
"ncpus=2:mem=2gb:mpiprocs=2:ompthreads=2"
self.job3_place = "scatter"
# incremented values at job start and just before actual launch
self.job3_iselect = \
"1:ncpus=3:mem=2gb:mpiprocs=3:ompthreads=2+" + \
"2:ncpus=3:mem=2gb:mpiprocs=3:ompthreads=3+" + \
"2:ncpus=2:mem=2gb:mpiprocs=2:ompthreads=2"
self.job3_ischedselect = self.job3_iselect
self.job3_iexec_host = \
"%s/0*0+%s/0*0+%s/0*3+%s/0*2+%s/0*0" % (
self.nA, self.nB, self.nD, self.nC, self.nE)
self.job3_iexec_vnode = \
"(%s:mem=1048576kb:ncpus=1+" % (self.nAv0,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nAv1,) + \
"%s:ncpus=1)+" % (self.nAv2) + \
"(%s:mem=1048576kb:ncpus=1+" % (self.nB,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nBv0,) + \
"%s:ncpus=1)+" % (self.nBv1,) + \
"(%s:ncpus=3:mem=2097152kb)+" % (self.nD,) + \
"(%s:ncpus=2:mem=2097152kb)+" % (self.nC,) + \
"(%s:mem=1048576kb:ncpus=1+" % (self.nE,) + \
"%s:mem=1048576kb:ncpus=1)" % (self.nEv0,)
# expected values version 6 upon successful job launch
self.job3_select = \
"1:ncpus=3:mem=2gb:mpiprocs=3:ompthreads=2+" + \
"1:ncpus=3:mem=2gb:mpiprocs=3:ompthreads=3+" + \
"1:ncpus=2:mem=2gb:mpiprocs=2:ompthreads=2"
self.job3_schedselect = self.job3_select
self.job3_exec_host = "%s/0*0+%s/0*3+%s/0*0" % (
self.nA, self.nD, self.nE)
self.job3_exec_vnode = \
"(%s:mem=1048576kb:ncpus=1+" % (self.nAv0,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nAv1,) + \
"%s:ncpus=1)+" % (self.nAv2) + \
"(%s:ncpus=3:mem=2097152kb)+" % (self.nD,) + \
"(%s:mem=1048576kb:ncpus=1+" % (self.nE,) + \
"%s:mem=1048576kb:ncpus=1)" % (self.nEv0,)
self.job3_sel_esc = self.job3_select.replace("+", "\+")
self.job3_exec_host_esc = self.job3_exec_host.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.job3_exec_vnode_esc = self.job3_exec_vnode.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
self.job3_isel_esc = self.job3_iselect.replace("+", "\+")
self.job3_iexec_host_esc = self.job3_iexec_host.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.job3_iexec_vnode_esc = self.job3_iexec_vnode.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
self.script['job3'] = \
"#PBS -l select=" + self.job3_oselect + "\n" + \
"#PBS -l place=" + self.job3_place + "\n" + \
SLEEP_CMD + " 300\n"
self.job3_ischedselect = self.job3_iselect
self.job4_oselect = "ncpus=3:mem=2gb+ncpus=3:mem=2gb+ncpus=2:mem=2gb"
self.job4_place = "scatter:excl"
self.job4_iselect = \
"1:ncpus=3:mem=2gb+2:ncpus=3:mem=2gb+2:ncpus=2:mem=2gb"
self.job4_ischedselect = self.job4_iselect
self.job4_iexec_host = \
"%s/0*0+%s/0*0+%s/0*3+%s/0*2+%s/0*0" % (
self.nA, self.nB, self.nD, self.nC, self.nE)
self.job4_iexec_vnode = \
"(%s:mem=1048576kb:ncpus=1+" % (self.nAv0,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nAv1,) + \
"%s:ncpus=1)+" % (self.nAv2) + \
"(%s:mem=1048576kb:ncpus=1+" % (self.nB,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nBv0,) + \
"%s:ncpus=1)+" % (self.nBv1,) + \
"(%s:ncpus=3:mem=2097152kb)+" % (self.nD,) + \
"(%s:ncpus=2:mem=2097152kb)+" % (self.nC,) + \
"(%s:mem=1048576kb:ncpus=1+" % (self.nE,) + \
"%s:mem=1048576kb:ncpus=1)" % (self.nEv0,)
# expected values upon successful job launch
self.job4_select = \
"1:ncpus=3:mem=2gb+1:ncpus=3:mem=2gb+1:ncpus=2:mem=2gb"
self.job4_schedselect = "1:ncpus=3:mem=2gb+" + \
"1:ncpus=3:mem=2gb+1:ncpus=2:mem=2gb"
self.job4_exec_host = "%s/0*0+%s/0*3+%s/0*0" % (
self.nA, self.nD, self.nE)
self.job4_exec_vnode = \
"(%s:mem=1048576kb:ncpus=1+" % (self.nAv0,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nAv1,) + \
"%s:ncpus=1)+" % (self.nAv2) + \
"(%s:ncpus=3:mem=2097152kb)+" % (self.nD,) + \
"(%s:mem=1048576kb:ncpus=1+" % (self.nE,) + \
"%s:mem=1048576kb:ncpus=1)" % (self.nEv0,)
self.script['job4'] = \
"#PBS -l select=" + self.job4_oselect + "\n" + \
"#PBS -l place=" + self.job4_place + "\n" + \
SLEEP_CMD + " 300\n"
self.job4_sel_esc = self.job4_select.replace("+", "\+")
self.job4_exec_host_esc = self.job4_exec_host.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.job4_exec_vnode_esc = self.job4_exec_vnode.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
self.job4_isel_esc = self.job4_iselect.replace("+", "\+")
self.job4_iexec_host_esc = self.job4_iexec_host.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.job4_iexec_vnode_esc = self.job4_iexec_vnode.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
self.job5_oselect = "ncpus=3:mem=2gb+ncpus=3:mem=2gb+ncpus=2:mem=2gb"
self.job5_place = "free"
self.job5_iselect = \
"1:ncpus=3:mem=2gb+2:ncpus=3:mem=2gb+2:ncpus=2:mem=2gb"
self.job5_ischedselect = self.job5_iselect
self.job5_iexec_host = \
"%s/0*0+%s/0*0+%s/0*3+%s/1*0+%s/0*2" % (
self.nA, self.nB, self.nD, self.nB, self.nC)
self.job5_iexec_vnode = \
"(%s:mem=1048576kb:ncpus=1+" % (self.nAv0,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nAv1,) + \
"%s:ncpus=1)+" % (self.nAv2) + \
"(%s:mem=1048576kb:ncpus=1+" % (self.nB,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nBv0,) + \
"%s:ncpus=1)+" % (self.nBv1,) + \
"(%s:ncpus=3:mem=2097152kb)+" % (self.nD,) + \
"(%s:mem=1048576kb+" % (self.nBv1,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nBv2,) + \
"%s:ncpus=1)+" % (self.nBv3,) + \
"(%s:ncpus=2:mem=2097152kb)" % (self.nC,)
# expected values upon successful job launch
self.job5_select = \
"1:ncpus=3:mem=2gb+1:ncpus=3:mem=2gb+1:ncpus=1:mem=1gb"
self.job5_schedselect = self.job5_select
self.job5_exec_host = "%s/0*0+%s/0*0+%s/1*0" % (
self.nA, self.nB, self.nB)
self.job5_exec_vnode = \
"(%s:mem=1048576kb:ncpus=1+" % (self.nAv0,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nAv1,) + \
"%s:ncpus=1)+" % (self.nAv2) + \
"(%s:mem=1048576kb:ncpus=1+" % (self.nB,) + \
"%s:mem=1048576kb:ncpus=1+" % (self.nBv0,) + \
"%s:ncpus=1)+" % (self.nBv1,) + \
"(%s:mem=1048576kb+" % (self.nBv1,) + \
"%s:ncpus=1)" % (self.nBv2,)
self.script['job5'] = \
"#PBS -l select=" + self.job5_oselect + "\n" + \
"#PBS -l place=" + self.job5_place + "\n" + \
SLEEP_CMD + " 300\n"
self.job5_sel_esc = self.job5_select.replace("+", "\+")
self.job5_exec_host_esc = self.job5_exec_host.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.job5_exec_vnode_esc = self.job5_exec_vnode.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
self.job5_isel_esc = self.job5_iselect.replace("+", "\+")
self.job5_iexec_host_esc = self.job5_iexec_host.replace(
"*", "\*").replace("[", "\[").replace("]", "\]").replace("+", "\+")
self.job5_iexec_vnode_esc = self.job5_iexec_vnode.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
# queuejob hooks used throughout the test
self.qjob_hook_body = """
import pbs
e=pbs.event()
pbs.logmsg(pbs.LOG_DEBUG, "queuejob hook executed")
# Save current select spec in resource 'site'
e.job.Resource_List["site"] = str(e.job.Resource_List["select"])
new_select = e.job.Resource_List["select"].increment_chunks(1)
e.job.Resource_List["select"] = new_select
e.job.tolerate_node_failures = "job_start"
"""
self.qjob_hook_body2 = """
import pbs
e=pbs.event()
pbs.logmsg(pbs.LOG_DEBUG, "queuejob hook executed")
# Save current select spec in resource 'site'
e.job.Resource_List["site"] = str(e.job.Resource_List["select"])
new_select = e.job.Resource_List["select"].increment_chunks(1)
e.job.Resource_List["select"] = new_select
e.job.tolerate_node_failures = "all"
"""
# begin hooks used throughout the test
self.begin_hook_body = """
import pbs
e=pbs.event()
pbs.logmsg(pbs.LOG_DEBUG, "Executing begin")
localnode=pbs.get_local_nodename()
if not e.job.in_ms_mom() and (localnode == '%s'):
e.reject("bad node")
""" % (self.nB,)
# The below hook may not really be doing anything, but is
# used in a test of the sister join job alarm time with
# the hook's alarm value.
self.begin_hook_body2 = """
import pbs
e=pbs.event()
pbs.logmsg(pbs.LOG_DEBUG, "Executing begin")
localnode=pbs.get_local_nodename()
"""
self.begin_hook_body3 = """
import pbs
e=pbs.event()
pbs.logmsg(pbs.LOG_DEBUG, "Executing begin")
localnode=pbs.get_local_nodename()
if not e.job.in_ms_mom() and (localnode == '%s'):
x
""" % (self.nE,)
self.begin_hook_body4 = """
import pbs
e=pbs.event()
pbs.logmsg(pbs.LOG_DEBUG, "Executing begin")
localnode=pbs.get_local_nodename()
if not e.job.in_ms_mom() and (localnode == '%s'):
e.reject("bad node")
""" % (self.nD,)
self.begin_hook_body5 = """
import pbs
e=pbs.event()
pbs.logmsg(pbs.LOG_DEBUG, "Executing begin")
localnode=pbs.get_local_nodename()
if not e.job.in_ms_mom() and (localnode == '%s'):
e.reject("bad node")
""" % (self.nC,)
# prologue hooks used throughout the test
self.prolo_hook_body = """
import pbs
e=pbs.event()
pbs.logmsg(pbs.LOG_DEBUG, "Executing prolo")
for vn in e.vnode_list:
v = e.vnode_list[vn]
pbs.logjobmsg(e.job.id, "prolo: found vnode_list[" + v.name + "]")
for vn in e.vnode_list_fail:
v = e.vnode_list_fail[vn]
pbs.logjobmsg(e.job.id, "prolo: found vnode_list_fail[" + v.name + "]")
localnode=pbs.get_local_nodename()
if not e.job.in_ms_mom() and (localnode == '%s'):
e.reject("bad node")
""" % (self.nC,)
self.prolo_hook_body2 = """
import pbs
e=pbs.event()
pbs.logmsg(pbs.LOG_DEBUG, "Executing prologue")
localnode=pbs.get_local_nodename()
if not e.job.in_ms_mom() and (localnode == '%s'):
x
""" % (self.nC,)
self.prolo_hook_body3 = """
import pbs
e=pbs.event()
pbs.logmsg(pbs.LOG_DEBUG, "Executing prolo")
for vn in e.vnode_list:
v = e.vnode_list[vn]
pbs.logjobmsg(e.job.id, "prolo: found vnode_list[" + v.name + "]")
for vn in e.vnode_list_fail:
v = e.vnode_list_fail[vn]
pbs.logjobmsg(e.job.id, "prolo: found vnode_list_fail[" + v.name + "]")
localnode=pbs.get_local_nodename()
"""
self.prolo_hook_body4 = """
import pbs
e=pbs.event()
pbs.logmsg(pbs.LOG_DEBUG, "Executing prolo")
for vn in e.vnode_list:
v = e.vnode_list[vn]
pbs.logjobmsg(e.job.id, "prolo: found vnode_list[" + v.name + "]")
for vn in e.vnode_list_fail:
v = e.vnode_list_fail[vn]
pbs.logjobmsg(e.job.id, "prolo: found vnode_list_fail[" + v.name + "]")
localnode=pbs.get_local_nodename()
if e.job.in_ms_mom():
pj = e.job.release_nodes(keep_select=e.job.Resource_List["site"])
if pj != None:
pbs.logjobmsg(e.job.id, "prolo: job.exec_vnode=%s" % (pj.exec_vnode,))
pbs.logjobmsg(e.job.id, "prolo: job.exec_host=%s" % (pj.exec_host,))
pbs.logjobmsg(e.job.id,
"prolo: job.schedselect=%s" % (pj.schedselect,))
else:
e.job.Hold_Types = pbs.hold_types("s")
e.job.rerun()
e.reject("unsuccessful at PROLOGUE")
"""
self.prolo_hook_body5 = """
import pbs
import time
e=pbs.event()
pbs.logmsg(pbs.LOG_DEBUG, "Executing prolo")
for vn in e.vnode_list:
v = e.vnode_list[vn]
pbs.logjobmsg(e.job.id, "prolo: found vnode_list[" + v.name + "]")
for vn in e.vnode_list_fail:
v = e.vnode_list_fail[vn]
pbs.logjobmsg(e.job.id, "prolo: found vnode_list_fail[" + v.name + "]")
if not e.job.in_ms_mom():
pbs.logjobmsg(e.job.id, "sleeping for 30 secs")
time.sleep(30)
"""
# launch hooks used throughout the test
self.launch_hook_body = """
import pbs
e=pbs.event()
if 'PBS_NODEFILE' not in e.env:
e.accept()
pbs.logmsg(pbs.LOG_DEBUG, "Executing launch")
for vn in e.vnode_list:
v = e.vnode_list[vn]
pbs.logjobmsg(e.job.id, "launch: found vnode_list[" + v.name + "]")
for vn in e.vnode_list_fail:
v = e.vnode_list_fail[vn]
pbs.logjobmsg(e.job.id, "launch: found vnode_list_fail[" + v.name + "]")
if e.job.in_ms_mom():
pj = e.job.release_nodes(keep_select=e.job.Resource_List["site"])
if pj != None:
pbs.logjobmsg(e.job.id, "launch: job.exec_vnode=%s" % (pj.exec_vnode,))
pbs.logjobmsg(e.job.id, "launch: job.exec_host=%s" % (pj.exec_host,))
pbs.logjobmsg(e.job.id,
"launch: job.schedselect=%s" % (pj.schedselect,))
else:
e.job.Hold_Types = pbs.hold_types("s")
e.job.rerun()
e.reject("unsuccessful at LAUNCH")
"""
self.launch_hook_body2 = """
import pbs
e=pbs.event()
if 'PBS_NODEFILE' not in e.env:
e.accept()
pbs.logmsg(pbs.LOG_DEBUG, "Executing launch")
for vn in e.vnode_list:
v = e.vnode_list[vn]
pbs.logjobmsg(e.job.id, "launch: found vnode_list[" + v.name + "]")
for vn in e.vnode_list_fail:
v = e.vnode_list_fail[vn]
pbs.logjobmsg(e.job.id, "launch: found vnode_list_fail[" + v.name + "]")
if e.job.in_ms_mom():
new_sel = "ncpus=3:mem=2gb+ncpus=3:mem=2gb+ncpus=1:mem=1gb"
pj = e.job.release_nodes(keep_select=new_sel)
if pj != None:
pbs.logjobmsg(e.job.id, "launch: job.exec_vnode=%s" % (pj.exec_vnode,))
pbs.logjobmsg(e.job.id, "launch: job.exec_host=%s" % (pj.exec_host,))
pbs.logjobmsg(e.job.id,
"launch: job.schedselect=%s" % (pj.schedselect,))
else:
e.job.Hold_Types = pbs.hold_types("s")
e.job.rerun()
e.reject("unsuccessful at LAUNCH")
"""
def tearDown(self):
self.momA.signal("-CONT")
self.momB.signal("-CONT")
self.momC.signal("-CONT")
self.momD.signal("-CONT")
self.momE.signal("-CONT")
self.momA.unset_mom_config('$sister_join_job_alarm', False)
self.momA.unset_mom_config('$job_launch_delay', False)
a = {'state': (DECR, 'offline')}
self.server.manager(MGR_CMD_SET, NODE, a, self.momA.shortname)
self.server.manager(MGR_CMD_SET, NODE, a, self.momB.shortname)
self.server.manager(MGR_CMD_SET, NODE, a, self.momC.shortname)
self.server.manager(MGR_CMD_SET, NODE, a, self.momD.shortname)
self.server.manager(MGR_CMD_SET, NODE, a, self.momE.shortname)
TestFunctional.tearDown(self)
# Delete managers and operators if added
attrib = ['operators', 'managers']
self.server.manager(MGR_CMD_UNSET, SERVER, attrib, expect=True)
@timeout(400)
def test_t1(self):
"""
Test tolerating job_start 2 node failures after adding
extra nodes to the job, pruning
job's assigned resources to match up to the original
select spec, and offlining the failed vnodes.
1. Have a job that has been submitted with a select
spec of 2 super-chunks say (A) and (B), and 1 chunk
of (C), along with place spec of "scatter",
resulting in the following assignment:
exec_vnode = (A)+(B)+(C)
and -Wtolerate_node_failures=job_start
2. Have a queuejob hook that adds 1 extra node to each
chunk (except the MS (first) chunk), resulting in the
assignment:
exec_vnode = (A)+(B)+(D)+(C)+(E)
where D mirrors super-chunk B specs while E mirrors
chunk C.
3. Have an execjob_begin hook that fails (causes rejection)
when executed by mom managing vnodes in (B).
4. Have an execjob_prologue hook that fails (causes rejection)
when executed by mom managing vnodes in (C).
5. Then create an execjob_launch hook that offlines the failed
nodes (B) and (C), and prunes back the job's exec_vnode
assignment back to satisfying the original 3-node select
spec, choosing only healthy nodes.
6. Result:
a. This results in the following reassignment of chunks:
exec_vnode = (A)+(D)+(E)
since (B) and (C) contain vnodes from failed moms.
b. vnodes in (B) and (C) are now showing a state of
"offline".
c. The accounting log start record 'S' will reflect the
select request where additional chunks were added, while
the secondary start record 's' will reflect the assigned
resources after pruning the original select request via
the pbs.release_nodes(keep_select=...) call
inside execjob_launch hook.
"""
# instantiate queuejob hook
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_begin hook
hook_event = "execjob_begin"
hook_name = "begin"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.begin_hook_body)
# instantiate execjob_prologue hook
hook_event = "execjob_prologue"
hook_name = "prolo"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.prolo_hook_body)
# instantiate execjob_launch hook
hook_body = """
import pbs
e=pbs.event()
if 'PBS_NODEFILE' not in e.env:
e.accept()
pbs.logmsg(pbs.LOG_DEBUG, "Executing launch")
for vn in e.vnode_list:
v = e.vnode_list[vn]
pbs.logjobmsg(e.job.id, "launch: found vnode_list[" + v.name + "]")
for vn in e.vnode_list_fail:
v = e.vnode_list_fail[vn]
pbs.logjobmsg(e.job.id, "launch:offline vnode_list_fail[" + v.name + "]")
v.state = pbs.ND_OFFLINE
if e.job.in_ms_mom():
pj = e.job.release_nodes(keep_select=e.job.Resource_List["site"])
if pj != None:
pbs.logjobmsg(e.job.id, "launch: job.exec_vnode=%s" % (pj.exec_vnode,))
pbs.logjobmsg(e.job.id, "launch: job.exec_host=%s" % (pj.exec_host,))
pbs.logjobmsg(e.job.id,
"launch: job.schedselect=%s" % (pj.schedselect,))
else:
e.job.Hold_Types = pbs.hold_types("s")
e.job.rerun()
e.reject("unsuccessful at LAUNCH")
"""
hook_event = "execjob_launch"
hook_name = "launch"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, hook_body)
# First, turn off scheduling
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
jid = self.create_and_submit_job('job1')
# Job gets queued and reflects the incremented values from queuejob
# hook
self.server.expect(JOB, {'job_state': 'Q',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 13,
'Resource_List.nodect': 5,
'Resource_List.select': self.job1_iselect,
'Resource_List.site': self.job1_oselect,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_ischedselect},
id=jid, attrop=PTL_AND)
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
# Job eventually launches reflecting the pruned back values
# to the original select spec
# There's a max_attempts=60 for it would take up to 60 seconds
# for primary mom to wait for the sisters to join
# (default $sister_join_job_alarm of 30 seconds) and to wait for
# sisters to execjob_prologue hooks (default $job_launch_delay
# value of 30 seconds)
self.server.expect(JOB, {'job_state': 'R',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '6gb',
'Resource_List.ncpus': 8,
'Resource_List.nodect': 3,
'Resource_List.select': self.job1_select,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_schedselect,
'exec_host': self.job1_exec_host,
'exec_vnode': self.job1_exec_vnode},
id=jid, interval=1, attrop=PTL_AND, max_attempts=60)
thisjob = self.server.status(JOB, id=jid)
if thisjob:
job_output_file = thisjob[0]['Output_Path'].split(':')[1]
# Check various vnode status.
jobs_assn1 = "%s/0" % (jid,)
self.match_vnode_status(
[self.nAv0, self.nAv1, self.nE, self.nEv0],
'job-busy', jobs_assn1, 1, '1048576kb')
self.match_vnode_status([self.nAv2],
'job-busy', jobs_assn1, 1, '0kb')
jobs_assn3 = "%s/0, %s/1, %s/2" % (jid, jid, jid)
self.match_vnode_status([self.nD], 'free', jobs_assn3,
3, '2097152kb')
self.match_vnode_status([self.nA, self.nAv3, self.nBv2, self.nBv3,
self.nEv1, self.nEv2, self.nEv3], 'free')
self.match_vnode_status([self.nB, self.nBv0, self.nBv1, self.nC],
'offline')
# Check server/queue counts.
self.server.expect(SERVER, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
attrop=PTL_AND)
self.server.expect(QUEUE, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
id='workq', attrop=PTL_AND)
self.assertTrue(
self.pbs_nodefile_match_exec_host(jid, self.job1_exec_host))
# Verify mom_logs
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+could not JOIN_JOB" % (
jid, self.hostB), n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+as job " % (jid, self.hostB) +
"is tolerant of node failures",
regexp=True, n=10)
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+" % (jid, self.hostC) +
"could not IM_EXEC_PROLOGUE", n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+" % (jid, self.hostC) +
"as job is tolerant of node failures", n=10, regexp=True)
# Check vnode_list[] parameter in execjob_prologue hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;prolo: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_prologue hook
vnode_list_fail = [self.nB, self.nBv0, self.nBv1]
for vn in vnode_list_fail:
self.momA.log_match("Job;%s;prolo: found vnode_list_fail[%s]" % (
jid, vn), n=10)
# Check vnode_list[] parameter in execjob_launch hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;launch: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_launch hook
vnode_list_fail = [self.nB, self.nBv0, self.nBv1, self.nC]
for vn in vnode_list_fail:
self.momA.log_match(
"Job;%s;launch:offline vnode_list_fail[%s]" % (jid, vn), n=10)
# Check result of pbs.event().job.release_nodes(keep_select) call
self.momA.log_match("Job;%s;launch: job.exec_vnode=%s" % (
jid, self.job1_exec_vnode), n=10)
self.momA.log_match("Job;%s;launch: job.schedselect=%s" % (
jid, self.job1_schedselect), n=10)
self.momA.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, self.job1_iexec_vnode), n=10)
self.momA.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, self.job1_exec_vnode), n=10)
# Check accounting_logs
self.match_accounting_log('S', jid, self.job1_iexec_host_esc,
self.job1_iexec_vnode_esc, "10gb", 13, 5,
self.job1_place,
self.job1_isel_esc)
self.match_accounting_log('s', jid, self.job1_exec_host_esc,
self.job1_exec_vnode_esc,
"6gb", 8, 3,
self.job1_place,
self.job1_sel_esc)
self.momA.log_match("Job;%s;task.+started, hostname" % (jid,),
n=10, max_attempts=60, interval=2, regexp=True)
self.momA.log_match("Job;%s;copy file request received" % (jid,),
n=10, max_attempts=10, interval=2)
# validate output
expected_out = """/var/spool/pbs/aux/%s
%s
%s
%s
FIB TESTS
pbsdsh -n 1 fib 37
%d
pbsdsh -n 2 fib 37
%d
fib 37
%d
HOSTNAME TESTS
pbsdsh -n 0 hostname
%s
pbsdsh -n 1 hostname
%s
pbsdsh -n 2 hostname
%s
PBS_NODEFILE tests
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
""" % (jid, self.momA.hostname, self.momD.hostname, self.momE.hostname,
self.fib37_value, self.fib37_value, self.fib37_value,
self.momA.shortname, self.momD.shortname, self.momE.shortname,
self.momA.hostname, self.momA.hostname, self.momA.shortname,
self.momD.hostname, self.momD.hostname, self.momD.shortname,
self.momE.hostname, self.momE.hostname, self.momE.shortname)
self.logger.info("expected out=%s" % (expected_out,))
job_out = ""
with open(job_output_file, 'r') as fd:
job_out = fd.read()
self.logger.info("job_out=%s" % (job_out,))
self.assertEquals(job_out, expected_out)
@timeout(400)
def test_t2(self):
"""
Test tolerating job_start 2 node failures after adding
extra nodes to the job, pruning
job's assigned resources to match up to the original
select spec, without offlining the failed vnodes, and
specifying mom config file options 'sister_join_job_alarm' and
'job_launch_delay'.
1. Set $sister_join_job_alarm and $job_launch_delay values
in mom's config file.
2. Submit a job that has been submitted with a select
spec of 2 super-chunks say (A) and (B), and 1 chunk
of (C), along with place spec of "scatter",
resulting in the following assignment:
exec_vnode = (A)+(B)+(C)
and -Wtolerate_node_failures=job_start
3. Have a queuejob hook that adds 1 extra node to each
chunk (except the MS (first) chunk), resulting in the
assignment:
exec_vnode = (A)+(B)+(D)+(C)+(E)
where D mirrors super-chunk B specs while E mirrors
chunk C.
4. Prior to submitting a job, suspend mom B. When job runs,
momB won't be able to join the job, so it won't be considered
as a "healthy" mom.
5. Have an execjob_begin hook that doesn't fail.
6. Have an execjob_prologue hook that fails (causes rejection)
when executed by mom managing vnodes in (C).
7. Have an execjob_launch hook that prunes back the
job's exec_vnode assignment back to satisfying the original
3-node select spec, choosing only healthy nodes.
8. Result:
a. This results in the following reassignment of chunks:
exec_vnode = (A)+(D)+(E)
since (B) and (C) contain vnodes from failed moms.
b. vnodes in (B) and (C) are now showing a state of "free".
c. Mom's log file will show explicit values to
$sister_join_job_alarm and $job_launch_delay.
c. The accounting log start record 'S' will reflect the
select request where additional chunks were added, while
the secondary start record 's' will reflect the assigned
resources after pruning the original select request via
the pbs.release_nodes(keep_select=...) call
inside execjob_launch hook.
"""
# set mom config options:
sis_join_alarm = 45
c = {'$sister_join_job_alarm': sis_join_alarm}
self.momA.add_config(c)
job_launch_delay = 40
c = {'$job_launch_delay': job_launch_delay}
self.momA.add_config(c)
self.momA.signal("-HUP")
self.momA.log_match(
"sister_join_job_alarm;%d" % (sis_join_alarm,), max_attempts=5,
interval=5)
self.momA.log_match(
"job_launch_delay;%d" % (job_launch_delay,),
max_attempts=5, interval=5)
# instantiate queuejob hook
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_begin hook
hook_event = "execjob_begin"
hook_name = "begin"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.begin_hook_body2)
# instantiate execjob_prologue hook
hook_event = "execjob_prologue"
hook_name = "prolo"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.prolo_hook_body)
# instantiate execjob_launch hook
hook_event = "execjob_launch"
hook_name = "launch"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.launch_hook_body)
# First, turn off scheduling
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
# temporarily suspend momB, simulating a failed mom host.
self.momB.signal("-STOP")
jid = self.create_and_submit_job('job1')
# Job gets queued and reflects the incremented values from queuejob
# hook
self.server.expect(JOB, {'job_state': 'Q',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 13,
'Resource_List.nodect': 5,
'Resource_List.select': self.job1_iselect,
'Resource_List.site': self.job1_oselect,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_ischedselect},
id=jid, attrop=PTL_AND)
# Set time to start scanning logs
stime = int(time.time())
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
# Job eventually launches reflecting the pruned back values
# to the original select spec
# There's a max_attempts=60 for it would take up to 60 seconds
# for primary mom to wait for the sisters to join
# (default $sister_join_job_alarm of 30 seconds) and to wait for
# sisters to execjob_prologue hooks (default $job_launch_delay
# value of 30 seconds)
self.server.expect(JOB, {'job_state': 'R',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '6gb',
'Resource_List.ncpus': 8,
'Resource_List.nodect': 3,
'Resource_List.select': self.job1_select,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_schedselect,
'exec_host': self.job1_exec_host,
'exec_vnode': self.job1_exec_vnode},
id=jid, interval=1, attrop=PTL_AND,
max_attempts=100)
thisjob = self.server.status(JOB, id=jid)
if thisjob:
job_output_file = thisjob[0]['Output_Path'].split(':')[1]
# Verify the logs and make sure sister_join_job_alarm is honored
logs = self.mom.log_match(
"Executing begin",
allmatch=True, starttime=stime, max_attempts=8)
log1 = logs[0][1]
logs = self.mom.log_match(
"Executing prolo",
allmatch=True, starttime=stime, max_attempts=8)
log2 = logs[0][1]
pattern = '%m/%d/%Y %H:%M:%S'
tmp = log1.split(';')
# Convert the time into epoch time
time1 = int(time.mktime(time.strptime(tmp[0], pattern)))
tmp = log2.split(';')
time2 = int(time.mktime(time.strptime(tmp[0], pattern)))
diff = time2 - time1
self.logger.info(
"Time diff between begin hook and prologue hook is " +
str(diff) + " seconds")
# Leave a little wiggle room for slow systems
self.assertTrue((diff >= sis_join_alarm) and
diff <= (sis_join_alarm + 5))
self.mom.log_match(
"sister_join_job_alarm wait time %d secs exceeded" % (
sis_join_alarm,), starttime=stime, max_attempts=8)
# Verify the logs and make sure job_launch_delay is honored
logs = self.mom.log_match(
"Executing prolo",
allmatch=True, starttime=stime, max_attempts=8)
log1 = logs[0][1]
logs = self.mom.log_match(
"Executing launch",
allmatch=True, starttime=stime, max_attempts=8)
log2 = logs[0][1]
pattern = '%m/%d/%Y %H:%M:%S'
tmp = log1.split(';')
# Convert the time into epoch time
time1 = int(time.mktime(time.strptime(tmp[0], pattern)))
tmp = log2.split(';')
time2 = int(time.mktime(time.strptime(tmp[0], pattern)))
diff = time2 - time1
self.logger.info("Time diff between prolo hook and launch hook is " +
str(diff) + " seconds")
# Leave a little wiggle room for slow systems
self.assertTrue((diff >= job_launch_delay) and
diff <= (job_launch_delay + 3))
self.momA.log_match(
"not all prologue hooks to sister moms completed, " +
"but job will proceed to execute", n=10)
# Check various vnode status.
jobs_assn1 = "%s/0" % (jid,)
self.match_vnode_status(
[self.nAv0, self.nAv1, self.nE, self.nEv0],
'job-busy', jobs_assn1, 1, '1048576kb')
self.match_vnode_status([self.nAv2],
'job-busy', jobs_assn1, 1, '0kb')
jobs_assn3 = "%s/0, %s/1, %s/2" % (jid, jid, jid)
self.match_vnode_status([self.nD], 'free', jobs_assn3,
3, '2097152kb')
self.match_vnode_status([self.nA, self.nAv3, self.nB, self.nBv0,
self.nBv1, self.nBv2, self.nBv3, self.nC,
self.nEv1, self.nEv2, self.nEv3], 'free')
self.assertTrue(
self.pbs_nodefile_match_exec_host(jid, self.job1_exec_host))
# Verify mom_logs
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+" % (jid, self.hostC) +
"could not IM_EXEC_PROLOGUE", n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+" % (jid, self.hostC) +
"as job is tolerant of node failures", n=10, regexp=True)
# Check vnode_list[] parameter in execjob_prologue hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;prolo: found vnode_list[%s]" % (
jid, vn), n=10)
# check server/queue counts
self.server.expect(SERVER, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
attrop=PTL_AND)
self.server.expect(QUEUE, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
id='workq', attrop=PTL_AND)
# Check vnode_list[] parameter in execjob_launch hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;launch: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_launch hook
vnode_list_fail = [self.nC]
for vn in vnode_list_fail:
self.momA.log_match("Job;%s;launch: found vnode_list_fail[%s]" % (
jid, vn), n=10)
# Check result of pbs.event().job.release_nodes(keep_select) call
self.momA.log_match("Job;%s;launch: job.exec_vnode=%s" % (
jid, self.job1_exec_vnode), n=10)
self.momA.log_match("Job;%s;launch: job.schedselect=%s" % (
jid, self.job1_schedselect), n=10)
self.momA.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, self.job1_iexec_vnode), n=10)
self.momA.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, self.job1_exec_vnode), n=10)
# Check accounting_logs
self.match_accounting_log('S', jid, self.job1_iexec_host_esc,
self.job1_iexec_vnode_esc, "10gb", 13, 5,
self.job1_place,
self.job1_isel_esc)
self.match_accounting_log('s', jid, self.job1_exec_host_esc,
self.job1_exec_vnode_esc,
"6gb", 8, 3,
self.job1_place,
self.job1_sel_esc)
self.momA.log_match("Job;%s;task.+started, hostname" % (jid,),
n=10, max_attempts=60, interval=2, regexp=True)
self.momA.log_match("Job;%s;copy file request received" % (jid,),
n=10, max_attempts=10, interval=2)
# validate output
expected_out = """/var/spool/pbs/aux/%s
%s
%s
%s
FIB TESTS
pbsdsh -n 1 fib 37
%d
pbsdsh -n 2 fib 37
%d
fib 37
%d
HOSTNAME TESTS
pbsdsh -n 0 hostname
%s
pbsdsh -n 1 hostname
%s
pbsdsh -n 2 hostname
%s
PBS_NODEFILE tests
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
""" % (jid, self.momA.hostname, self.momD.hostname, self.momE.hostname,
self.fib37_value, self.fib37_value, self.fib37_value,
self.momA.shortname, self.momD.shortname, self.momE.shortname,
self.momA.hostname, self.momA.hostname, self.momA.shortname,
self.momD.hostname, self.momD.hostname, self.momD.shortname,
self.momE.hostname, self.momE.hostname, self.momE.shortname)
self.logger.info("expected out=%s" % (expected_out,))
job_out = ""
with open(job_output_file, 'r') as fd:
job_out = fd.read()
self.logger.info("job_out=%s" % (job_out,))
self.assertEquals(job_out, expected_out)
@timeout(400)
def test_t3(self):
"""
Test: tolerating job_start 2 node failures after adding
extra nodes to the job, pruning
job's assigned resources to match up to the original
select spec, without offlining the failed vnodes, and
with 2 execjob_prologue hooks, with prologue hook1
having alarm1 and prologue hook2 having alarm2.
This also test the default value to sister_join_job_alarm.
1. Submit a job that has been submitted with a select
spec of 2 super-chunks say (A) and (B), and 1 chunk
of (C), along with place spec of "scatter",
resulting in the following assignment:
exec_vnode = (A)+(B)+(C)
and -Wtolerate_node_failures=job_start
2. Have a queuejob hook that adds 1 extra node to each
chunk (except the MS (first) chunk), resulting in the
assignment:
exec_vnode = (A)+(B)+(D)+(C)+(E)
where D mirrors super-chunk B specs while E mirrors
chunk C.
3. Prior to submitting a job, suspend mom B. When job runs,
momB won't be able to join the job, so it won't be considered
as a "healthy" mom.
4. Have an execjob_prologue hook that doesn't fail any mom host
with alarm=alarm1, order=1.
5. Have an execjob_prologue hook2 with alarm=alarm2, order=2,
that fails (causes rejection) when executed by mom managing
vnodes in (C).
6. Have an execjob_launch hook that prunes back the
job's exec_vnode assignment back to satisfying the original
3-node select spec, choosing only healthy nodes.
7. Result:
a. This results in the following reassignment of chunks:
exec_vnode = (A)+(D)+(E)
since (B) and (C) contain vnodes from failed moms.
b. vnodes in (B) and (C) are now showing a state of "free".
c. Mom's log file shows the wait time from execjob_prologue
hook1 execution and the execution of the exescjob_launch
hook is no more than alarm1+alarm2.
c. The accounting log start record 'S' will reflect the
select request where additional chunks were added, while
the secondary start record 's' will reflect the assigned
resources after pruning the original select request via
the pbs.release_nodes(keep_select=...) call
inside execjob_launch hook.
"""
# instantiate queuejob hook
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_begin hook
hook_event = "execjob_begin"
hook_name = "begin"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.begin_hook_body2)
# instantiate execjob_prologue hook #1
hook_body = """
import pbs
e=pbs.event()
pbs.logmsg(pbs.LOG_DEBUG, "Executing prolo1")
localnode=pbs.get_local_nodename()
"""
hook_event = "execjob_prologue"
hook_name = "prolo1"
alarm1 = 17
a = {'event': hook_event, 'enabled': 'true', 'order': 1,
'alarm': alarm1}
self.server.create_import_hook(hook_name, a, hook_body)
# instantiate execjob_prologue hook #2
hook_body = """
import pbs
e=pbs.event()
pbs.logmsg(pbs.LOG_DEBUG, "Executing prolo2")
for vn in e.vnode_list:
v = e.vnode_list[vn]
pbs.logjobmsg(e.job.id, "prolo2: found vnode_list[" + v.name + "]")
for vn in e.vnode_list_fail:
v = e.vnode_list_fail[vn]
pbs.logjobmsg(e.job.id, "prolo2: found vnode_list_fail[" + v.name + "]")
localnode=pbs.get_local_nodename()
if not e.job.in_ms_mom() and (localnode == '%s'):
x
""" % (self.nC,)
hook_event = "execjob_prologue"
hook_name = "prolo2"
alarm2 = 16
a = {'event': hook_event, 'enabled': 'true', 'order': 2,
'alarm': alarm2}
self.server.create_import_hook(hook_name, a, hook_body)
# instantiate execjob_launch hook
hook_event = "execjob_launch"
hook_name = "launch"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.launch_hook_body)
# First, turn off scheduling
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
# temporarily suspend momB, simulating a failed mom host.
self.momB.signal("-STOP")
jid = self.create_and_submit_job('job1')
# Job gets queued and reflects the incremented values from queuejob
# hook
self.server.expect(JOB, {'job_state': 'Q',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 13,
'Resource_List.nodect': 5,
'Resource_List.select': self.job1_iselect,
'Resource_List.site': self.job1_oselect,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_ischedselect},
id=jid, attrop=PTL_AND)
# Set time to start scanning logs
stime = int(time.time())
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
# Job eventually launches reflecting the pruned back values
# to the original select spec
# There's a max_attempts=60 for it would take up to 60 seconds
# for primary mom to wait for the sisters to join
# (default $sister_join_job_alarm of 30 seconds) and to wait for
# sisters to execjob_prologue hooks (default $job_launch_delay
# value of 30 seconds)
self.server.expect(JOB, {'job_state': 'R',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '6gb',
'Resource_List.ncpus': 8,
'Resource_List.nodect': 3,
'Resource_List.select': self.job1_select,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_schedselect,
'exec_host': self.job1_exec_host,
'exec_vnode': self.job1_exec_vnode},
id=jid, interval=1, attrop=PTL_AND,
max_attempts=100)
thisjob = self.server.status(JOB, id=jid)
if thisjob:
job_output_file = thisjob[0]['Output_Path'].split(':')[1]
# Verify the logs and make sure sister_join_job_alarm is honored
logs = self.mom.log_match(
"Executing begin",
allmatch=True, starttime=stime, max_attempts=8)
log1 = logs[0][1]
logs = self.mom.log_match(
"Executing prolo1",
allmatch=True, starttime=stime, max_attempts=8)
log2 = logs[0][1]
pattern = '%m/%d/%Y %H:%M:%S'
tmp = log1.split(';')
# Convert the time into epoch time
time1 = int(time.mktime(time.strptime(tmp[0], pattern)))
tmp = log2.split(';')
time2 = int(time.mktime(time.strptime(tmp[0], pattern)))
diff = time2 - time1
self.logger.info(
"Time diff between begin hook and prologue hook is " +
str(diff) + " seconds")
# Leave a little wiggle room for slow systems
# test default sister_join_job_alarm value
sis_join_alarm = 30
self.assertTrue((diff >= sis_join_alarm) and
diff <= (sis_join_alarm + 5))
self.mom.log_match(
"sister_join_job_alarm wait time %d secs exceeded" % (
sis_join_alarm,), starttime=stime, max_attempts=8)
# Verify the logs and make sure job_launch_delay is honored
logs = self.mom.log_match(
"Executing prolo1",
allmatch=True, starttime=stime, max_attempts=8)
log1 = logs[0][1]
logs = self.mom.log_match(
"Executing launch",
allmatch=True, starttime=stime, max_attempts=8)
log2 = logs[0][1]
pattern = '%m/%d/%Y %H:%M:%S'
tmp = log1.split(';')
# Convert the time into epoch time
time1 = int(time.mktime(time.strptime(tmp[0], pattern)))
tmp = log2.split(';')
time2 = int(time.mktime(time.strptime(tmp[0], pattern)))
diff = time2 - time1
self.logger.info(
"Time diff between prolo1 hook and launch hook is " +
str(diff) + " seconds")
# Leave a little wiggle room for slow systems
job_launch_delay = alarm1 + alarm2
self.assertTrue((diff >= job_launch_delay) and
diff <= (job_launch_delay + 3))
# Check various vnode status.
jobs_assn1 = "%s/0" % (jid,)
self.match_vnode_status(
[self.nAv0, self.nAv1, self.nE, self.nEv0],
'job-busy', jobs_assn1, 1, '1048576kb')
self.match_vnode_status([self.nAv2],
'job-busy', jobs_assn1, 1, '0kb')
jobs_assn3 = "%s/0, %s/1, %s/2" % (jid, jid, jid)
self.match_vnode_status([self.nD], 'free', jobs_assn3,
3, '2097152kb')
self.match_vnode_status([self.nA, self.nAv3, self.nB, self.nBv0,
self.nBv1, self.nBv2, self.nBv3, self.nC,
self.nEv1, self.nEv2, self.nEv3], 'free')
# check server/queue counts
self.server.expect(SERVER, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
attrop=PTL_AND)
self.server.expect(QUEUE, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
id='workq', attrop=PTL_AND)
self.assertTrue(
self.pbs_nodefile_match_exec_host(jid, self.job1_exec_host))
# Verify mom_logs
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+" % (jid, self.hostC) +
"could not IM_EXEC_PROLOGUE", n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+" % (jid, self.hostC) +
"as job is tolerant of node failures", n=10, regexp=True)
self.momA.log_match(
"not all prologue hooks to sister moms completed, " +
"but job will proceed to execute", n=10)
# Check vnode_list[] parameter in execjob_prologue hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;prolo2: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list[] parameter in execjob_launch hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;launch: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_launch hook
vnode_list_fail = [self.nC]
for vn in vnode_list_fail:
self.momA.log_match(
"Job;%s;launch: found vnode_list_fail[%s]" % (jid, vn), n=10)
# Check result of pbs.event().job.release_nodes(keep_select) call
self.momA.log_match("Job;%s;launch: job.exec_vnode=%s" % (
jid, self.job1_exec_vnode), n=10)
self.momA.log_match("Job;%s;launch: job.schedselect=%s" % (
jid, self.job1_schedselect), n=10)
self.momA.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, self.job1_iexec_vnode), n=10)
self.momA.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, self.job1_exec_vnode), n=10)
# Check accounting_logs
self.match_accounting_log('S', jid, self.job1_iexec_host_esc,
self.job1_iexec_vnode_esc, "10gb", 13, 5,
self.job1_place,
self.job1_isel_esc)
self.match_accounting_log('s', jid, self.job1_exec_host_esc,
self.job1_exec_vnode_esc,
"6gb", 8, 3,
self.job1_place,
self.job1_sel_esc)
self.momA.log_match("Job;%s;task.+started, hostname" % (jid,),
n=10, max_attempts=60, interval=2, regexp=True)
self.momA.log_match("Job;%s;copy file request received" % (jid,),
n=10, max_attempts=10, interval=2)
# validate output
expected_out = """/var/spool/pbs/aux/%s
%s
%s
%s
FIB TESTS
pbsdsh -n 1 fib 37
%d
pbsdsh -n 2 fib 37
%d
fib 37
%d
HOSTNAME TESTS
pbsdsh -n 0 hostname
%s
pbsdsh -n 1 hostname
%s
pbsdsh -n 2 hostname
%s
PBS_NODEFILE tests
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
""" % (jid, self.momA.hostname, self.momD.hostname, self.momE.hostname,
self.fib37_value, self.fib37_value, self.fib37_value,
self.momA.shortname, self.momD.shortname, self.momE.shortname,
self.momA.hostname, self.momA.hostname, self.momA.shortname,
self.momD.hostname, self.momD.hostname, self.momD.shortname,
self.momE.hostname, self.momE.hostname, self.momE.shortname)
self.logger.info("expected out=%s" % (expected_out,))
job_out = ""
with open(job_output_file, 'r') as fd:
job_out = fd.read()
self.logger.info("job_out=%s" % (job_out,))
self.assertEquals(job_out, expected_out)
@timeout(400)
def test_t4(self):
"""
Test: tolerating job_start 1 node failure that is used
to satisfy a multi-chunk request, after adding
extra nodes to the job, pruning
job's assigned resources to match up to the original
select spec.
1. Submit a job that has been submitted with a select
spec of 2 super-chunks say (A) and (B), and 1 chunk
of (C), along with place spec of "scatter",
resulting in the following assignment:
exec_vnode = (A)+(B)+(C)
and -Wtolerate_node_failures=job_start
2. Have a queuejob hook that adds 1 extra node to each
chunk (except the MS (first) chunk), resulting in the
assignment:
exec_vnode = (A)+(B)+(D)+(C)+(E)
where D mirrors super-chunk B specs while E mirrors
chunk C.
3. Have an execjob_begin hook that fails (causes rejection)
when executed by mom managing vnodes in (B).
4. Then create an execjob_launch hook that
prunes back the job's exec_vnode assignment back to
satisfying the original 3-node select spec,
choosing only healthy nodes.
5. Result:
a. This results in the following reassignment of chunks:
exec_vnode = (A)+(D)+(C)
since (B) contain vnodes from failed moms.
b. The accounting log start record 'S' will reflect the
select request where additional chunks were added, while
the secondary start record 's' will reflect the assigned
resources after pruning the original select request via
the pbs.release_nodes(keep_select=...) call
inside execjob_launch hook.
"""
# instantiate queuejob hook
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_begin hook
hook_event = "execjob_begin"
hook_name = "begin"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.begin_hook_body)
# instantiate execjob_launch hook
hook_event = "execjob_launch"
hook_name = "launch"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.launch_hook_body)
# First, turn off scheduling
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
jid = self.create_and_submit_job('job1')
# Job gets queued and reflects the incremented values from queuejob
# hook
self.server.expect(JOB, {'job_state': 'Q',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 13,
'Resource_List.nodect': 5,
'Resource_List.select': self.job1_iselect,
'Resource_List.site': self.job1_oselect,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_ischedselect},
id=jid, attrop=PTL_AND)
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
# Job eventually launches reflecting the pruned back values
# to the original select spec
# There's a max_attempts=60 for it would take up to 60 seconds
# for primary mom to wait for the sisters to join
# (default $sister_join_job_alarm of 30 seconds) and to wait for
# sisters to execjob_prologue hooks (default $job_launch_delay
# value of 30 seconds)
self.server.expect(JOB, {'job_state': 'R',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '6gb',
'Resource_List.ncpus': 8,
'Resource_List.nodect': 3,
'Resource_List.select': self.job1v2_select,
'Resource_List.place': self.job1_place,
'schedselect': self.job1v2_schedselect,
'exec_host': self.job1v2_exec_host,
'exec_vnode': self.job1v2_exec_vnode},
id=jid, interval=1, attrop=PTL_AND, max_attempts=70)
thisjob = self.server.status(JOB, id=jid)
if thisjob:
job_output_file = thisjob[0]['Output_Path'].split(':')[1]
# Check various vnode status.
jobs_assn1 = "%s/0" % (jid,)
self.match_vnode_status([self.nAv0, self.nAv1],
'job-busy', jobs_assn1, 1, '1048576kb')
self.match_vnode_status([self.nAv2],
'job-busy', jobs_assn1, 1, '0kb')
jobs_assn2 = "%s/0, %s/1" % (jid, jid)
self.match_vnode_status([self.nC], 'job-busy', jobs_assn2,
2, '2097152kb')
jobs_assn3 = "%s/0, %s/1, %s/2" % (jid, jid, jid)
self.match_vnode_status([self.nD], 'free', jobs_assn3,
3, '2097152kb')
self.match_vnode_status([self.nA, self.nAv3, self.nB, self.nBv0,
self.nBv1, self.nBv2, self.nBv3, self.nE,
self.nEv0, self.nEv1, self.nEv2,
self.nEv3], 'free')
# check server/queue counts
self.server.expect(SERVER, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
attrop=PTL_AND)
self.server.expect(QUEUE, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
id='workq', attrop=PTL_AND)
self.assertTrue(
self.pbs_nodefile_match_exec_host(jid, self.job1v2_exec_host))
# Verify mom_logs
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+could not JOIN_JOB" % (
jid, self.hostB), n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+as job " % (jid, self.hostB) +
"is tolerant of node failures",
regexp=True, n=10)
# Check vnode_list[] parameter in execjob_launch hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;launch: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_launch hook
vnode_list_fail = [self.nB, self.nBv0, self.nBv1]
for vn in vnode_list_fail:
self.momA.log_match("Job;%s;launch: found vnode_list_fail[%s]" % (
jid, vn), n=10)
# Check result of pbs.event().job.release_nodes(keep_select) call
self.momA.log_match("Job;%s;launch: job.exec_vnode=%s" % (
jid, self.job1v2_exec_vnode), n=10)
self.momA.log_match("Job;%s;launch: job.schedselect=%s" % (
jid, self.job1v2_schedselect), n=10)
self.momA.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, self.job1_iexec_vnode), n=10)
self.momA.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, self.job1v2_exec_vnode), n=10)
# Check accounting_logs
self.match_accounting_log('S', jid, self.job1_iexec_host_esc,
self.job1_iexec_vnode_esc, "10gb", 13, 5,
self.job1_place,
self.job1_isel_esc)
self.match_accounting_log('s', jid, self.job1v2_exec_host_esc,
self.job1v2_exec_vnode_esc,
"6gb", 8, 3,
self.job1_place,
self.job1v2_sel_esc)
self.momA.log_match("Job;%s;task.+started, hostname" % (jid,),
n=10, max_attempts=60, interval=2, regexp=True)
self.momA.log_match("Job;%s;copy file request received" % (jid,),
n=10, max_attempts=10, interval=2)
# validate output
expected_out = """/var/spool/pbs/aux/%s
%s
%s
%s
FIB TESTS
pbsdsh -n 1 fib 37
%d
pbsdsh -n 2 fib 37
%d
fib 37
%d
HOSTNAME TESTS
pbsdsh -n 0 hostname
%s
pbsdsh -n 1 hostname
%s
pbsdsh -n 2 hostname
%s
PBS_NODEFILE tests
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
""" % (jid, self.momA.hostname, self.momD.hostname, self.momC.hostname,
self.fib37_value, self.fib37_value, self.fib37_value,
self.momA.shortname, self.momD.shortname, self.momC.shortname,
self.momA.hostname, self.momA.hostname, self.momA.shortname,
self.momD.hostname, self.momD.hostname, self.momD.shortname,
self.momC.hostname, self.momC.hostname, self.momC.shortname)
self.logger.info("expected out=%s" % (expected_out,))
job_out = ""
with open(job_output_file, 'r') as fd:
job_out = fd.read()
self.logger.info("job_out=%s" % (job_out,))
self.assertEquals(job_out, expected_out)
@timeout(400)
def test_t5(self):
"""
Test: tolerating job_start 1 node failure used in a regular
chunk after adding extra nodes to the job, pruning
job's assigned resources to match up to the original
select spec.
1. Submit a job that has been submitted with a select
spec of 2 super-chunks say (A) and (B), and 1 chunk
of (C), along with place spec of "scatter",
resulting in the following assignment:
exec_vnode = (A)+(B)+(C)
and -Wtolerate_node_failures=job_start
2. Have a queuejob hook that adds 1 extra node to each
chunk (except the MS (first) chunk), resulting in the
assignment:
exec_vnode = (A)+(B)+(D)+(C)+(E)
where D mirrors super-chunk B specs while E mirrors
chunk C.
3. Have an execjob_prologue hook that fails (causes
rejection) when executed by mom managing vnodes in (C).
4. Then create an execjob_launch hook that
prunes back the job's exec_vnode assignment back to
satisfying the original 3-node select spec,
choosing only healthy nodes.
5. Result:
a. This results in the following reassignment of chunks:
exec_vnode = (A)+(B)+(E)
since (C) contain vnodes from failed moms.
b. The accounting log start record 'S' will reflect the
select request where additional chunks were added, while
the secondary start record 's' will reflect the assigned
resources after pruning the original select request via
the pbs.release_nodes(keep_select=...) call
inside execjob_launch hook.
"""
# instantiate queuejob hook
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_prologue hook
hook_event = "execjob_prologue"
hook_name = "prolo"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.prolo_hook_body2)
# instantiate execjob_launch hook
hook_event = "execjob_launch"
hook_name = "launch"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.launch_hook_body)
# First, turn off scheduling
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
jid = self.create_and_submit_job('job1')
# Job gets queued and reflects the incremented values from queuejob
# hook
self.server.expect(JOB, {'job_state': 'Q',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 13,
'Resource_List.nodect': 5,
'Resource_List.select': self.job1_iselect,
'Resource_List.site': self.job1_oselect,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_ischedselect},
id=jid, attrop=PTL_AND)
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
# Job eventually launches reflecting the pruned back values
# to the original select spec
# There's a max_attempts=60 for it would take up to 60 seconds
# for primary mom to wait for the sisters to join
# (default $sister_join_job_alarm of 30 seconds) and to wait for
# sisters to execjob_prologue hooks (default $job_launch_delay
# value of 30 seconds)
self.server.expect(JOB, {'job_state': 'R',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '6gb',
'Resource_List.ncpus': 8,
'Resource_List.nodect': 3,
'Resource_List.select': self.job1v3_select,
'Resource_List.place': self.job1_place,
'schedselect': self.job1v3_schedselect,
'exec_host': self.job1v3_exec_host,
'exec_vnode': self.job1v3_exec_vnode},
id=jid, interval=1, attrop=PTL_AND, max_attempts=70)
thisjob = self.server.status(JOB, id=jid)
if thisjob:
job_output_file = thisjob[0]['Output_Path'].split(':')[1]
# Check various vnode status.
jobs_assn1 = "%s/0" % (jid,)
self.match_vnode_status([self.nAv0, self.nAv1, self.nB, self.nBv0,
self.nE, self.nEv0], 'job-busy', jobs_assn1,
1, '1048576kb')
self.match_vnode_status([self.nAv2, self.nBv1],
'job-busy', jobs_assn1, 1, '0kb')
self.match_vnode_status([self.nA, self.nAv3, self.nBv2, self.nBv3,
self.nC, self.nD, self.nEv1, self.nEv2,
self.nEv3], 'free')
# check server/queue counts
self.server.expect(SERVER, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
attrop=PTL_AND)
self.server.expect(QUEUE, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
id='workq', attrop=PTL_AND)
self.assertTrue(
self.pbs_nodefile_match_exec_host(jid, self.job1v3_exec_host))
# Verify mom_logs
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+" % (jid, self.hostC) +
"could not IM_EXEC_PROLOGUE", n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+" % (jid, self.hostC) +
"as job is tolerant of node failures", n=10, regexp=True)
# Check vnode_list[] parameter in execjob_launch hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;launch: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_launch hook
vnode_list_fail = [self.nC]
for vn in vnode_list_fail:
self.momA.log_match(
"Job;%s;launch: found vnode_list_fail[%s]" % (jid, vn), n=10)
# Check result of pbs.event().job.release_nodes(keep_select) call
self.momA.log_match("Job;%s;launch: job.exec_vnode=%s" % (
jid, self.job1v3_exec_vnode), n=10)
self.momA.log_match("Job;%s;launch: job.schedselect=%s" % (
jid, self.job1v3_schedselect), n=10)
self.momA.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, self.job1_iexec_vnode), n=10)
self.momA.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, self.job1v3_exec_vnode), n=10)
# Check accounting_logs
self.match_accounting_log('S', jid, self.job1_iexec_host_esc,
self.job1_iexec_vnode_esc, "10gb", 13, 5,
self.job1_place,
self.job1_isel_esc)
self.match_accounting_log('s', jid, self.job1v3_exec_host_esc,
self.job1v3_exec_vnode_esc,
"6gb", 8, 3,
self.job1_place,
self.job1v3_sel_esc)
self.momA.log_match("Job;%s;task.+started, hostname" % (jid,),
n=10, max_attempts=60, interval=2, regexp=True)
self.momA.log_match("Job;%s;copy file request received" % (jid,),
n=10, max_attempts=10, interval=2)
# validate output
expected_out = """/var/spool/pbs/aux/%s
%s
%s
%s
FIB TESTS
pbsdsh -n 1 fib 37
%d
pbsdsh -n 2 fib 37
%d
fib 37
%d
HOSTNAME TESTS
pbsdsh -n 0 hostname
%s
pbsdsh -n 1 hostname
%s
pbsdsh -n 2 hostname
%s
PBS_NODEFILE tests
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
""" % (jid, self.momA.hostname, self.momB.hostname, self.momE.hostname,
self.fib37_value, self.fib37_value, self.fib37_value,
self.momA.shortname, self.momB.shortname, self.momE.shortname,
self.momA.hostname, self.momA.hostname, self.momA.shortname,
self.momB.hostname, self.momB.hostname, self.momB.shortname,
self.momE.hostname, self.momE.hostname, self.momE.shortname)
self.logger.info("expected out=%s" % (expected_out,))
job_out = ""
with open(job_output_file, 'r') as fd:
job_out = fd.read()
self.logger.info("job_out=%s" % (job_out,))
self.assertEquals(job_out, expected_out)
def test_t6(self):
"""
Test: tolerating job_start of 2 node failures used to
satisfy the smaller chunks, after adding extra nodes
to the job, pruning job's assigned resources to match up
to the original select spec.
1. Submit a job that has been submitted with a select
spec of 2 super-chunks say (A) and (B), and 1 chunk
of (C), along with place spec of "scatter",
resulting in the following assignment:
exec_vnode = (A)+(B)+(C)
and -Wtolerate_node_failures=job_start
2. Have a queuejob hook that adds 1 extra node to each
chunk (except the MS (first) chunk), resulting in the
assignment:
exec_vnode = (A)+(B)+(D)+(C)+(E)
where D mirrors super-chunk B specs while E mirrors
chunk C. (C) and (E) are of smaller chunks than (B)
and (D). For example:
(D) = "(nadal:ncpus=3:mem=2097152kb)"
(C) = "(lendl:ncpus=2:mem=2097152kb)"
3. Have an execjob_begin hook that fails (causes
rejection) when executed by mom managing vnodes in (C).
4. Have an execjob_prologue hook that fails (causes
rejection) when executed by mom managing vnodes in (E).
5. Then create an execjob_launch hook that
prunes back the job's exec_vnode assignment back to
satisfying the original 3-node select spec,
choosing only healthy nodes.
6. Result:
a. This results in the following reassignment of chunks:
exec_vnode = (A)+(B)+(D)
since (C) and (E) contain vnodes from failed moms.
Note that from (D), only allocate enough resources
to satisfy the smaller third requested chunk.
if (D) originally has "(nadal:ncpus=3:mem=2097152kb)",
reassigning this would only be
"(nadal:ncpus=2:mem=2097152kb)".
b. The accounting log start record 'S' will reflect the
select request where additional chunks were added, while
the secondary start record 's' will reflect the assigned
resources after pruning the original select request via
the pbs.release_nodes(keep_select=...) call
inside execjob_launch hook.
"""
# instantiate queuejob hook
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_begin hook
hook_event = "execjob_begin"
hook_name = "begin"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.begin_hook_body3)
# instantiate execjob_prologue hook
hook_event = "execjob_prologue"
hook_name = "prolo"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.prolo_hook_body2)
# instantiate execjob_launch hook
hook_event = "execjob_launch"
hook_name = "launch"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.launch_hook_body)
# First, turn off scheduling
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
jid = self.create_and_submit_job('job1')
# Job gets queued and reflects the incremented values from queuejob
# hook
self.server.expect(JOB, {'job_state': 'Q',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 13,
'Resource_List.nodect': 5,
'Resource_List.select': self.job1_iselect,
'Resource_List.site': self.job1_oselect,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_ischedselect},
id=jid, attrop=PTL_AND)
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
# Job eventually launches reflecting the pruned back values
# to the original select spec
# There's a max_attempts=60 for it would take up to 60 seconds
# for primary mom to wait for the sisters to join
# (default $sister_join_job_alarm of 30 seconds) and to wait for
# sisters to execjob_prologue hooks (default $job_launch_delay
# value of 30 seconds)
self.server.expect(JOB, {'job_state': 'R',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '6gb',
'Resource_List.ncpus': 8,
'Resource_List.nodect': 3,
'Resource_List.select': self.job1v4_select,
'Resource_List.place': self.job1_place,
'schedselect': self.job1v4_schedselect,
'exec_host': self.job1v4_exec_host,
'exec_vnode': self.job1v4_exec_vnode},
id=jid, interval=1, attrop=PTL_AND, max_attempts=70)
thisjob = self.server.status(JOB, id=jid)
if thisjob:
job_output_file = thisjob[0]['Output_Path'].split(':')[1]
# Check various vnode status.
jobs_assn1 = "%s/0" % (jid,)
self.match_vnode_status([self.nAv0, self.nAv1, self.nB, self.nBv0],
'job-busy', jobs_assn1, 1, '1048576kb')
jobs_assn2 = "%s/0, %s/1" % (jid, jid)
self.match_vnode_status([self.nD], 'free', jobs_assn2,
2, '2097152kb')
self.match_vnode_status([self.nAv2, self.nBv1],
'job-busy', jobs_assn1, 1, '0kb')
self.match_vnode_status([self.nA, self.nAv3, self.nBv2, self.nBv3,
self.nC, self.nD, self.nEv1, self.nEv2,
self.nEv3], 'free')
# check server/queue counts
self.server.expect(SERVER, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
attrop=PTL_AND)
self.server.expect(QUEUE, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
id='workq', attrop=PTL_AND)
self.assertTrue(
self.pbs_nodefile_match_exec_host(jid, self.job1v4_exec_host))
# Verify mom_logs
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+could not JOIN_JOB" % (
jid, self.hostE), n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+as job " % (jid, self.hostE) +
"is tolerant of node failures",
regexp=True, n=10)
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+" % (jid, self.hostC) +
"could not IM_EXEC_PROLOGUE", n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+" % (jid, self.hostC) +
"as job is tolerant of node failures", n=10, regexp=True)
# Check vnode_list[] parameter in execjob_launch hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;launch: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_launch hook
vnode_list_fail = [self.nC, self.nE]
for vn in vnode_list_fail:
self.momA.log_match("Job;%s;launch: found vnode_list_fail[%s]" % (
jid, vn), n=10)
# Check result of pbs.event().job.release_nodes(keep_select) call
self.momA.log_match("Job;%s;launch: job.exec_vnode=%s" % (
jid, self.job1v4_exec_vnode), n=10)
self.momA.log_match("Job;%s;launch: job.schedselect=%s" % (
jid, self.job1v4_schedselect), n=10)
self.momA.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, self.job1_iexec_vnode), n=10)
self.momA.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, self.job1v4_exec_vnode), n=10)
# Check accounting_logs
self.match_accounting_log('S', jid, self.job1_iexec_host_esc,
self.job1_iexec_vnode_esc, "10gb", 13, 5,
self.job1_place,
self.job1_isel_esc)
self.match_accounting_log('s', jid, self.job1v4_exec_host_esc,
self.job1v4_exec_vnode_esc,
"6gb", 8, 3,
self.job1_place,
self.job1v4_sel_esc)
self.momA.log_match("Job;%s;task.+started, hostname" % (jid,),
n=10, max_attempts=60, interval=2, regexp=True)
self.momA.log_match("Job;%s;copy file request received" % (jid,),
n=10, max_attempts=10, interval=2)
# validate output
expected_out = """/var/spool/pbs/aux/%s
%s
%s
%s
FIB TESTS
pbsdsh -n 1 fib 37
%d
pbsdsh -n 2 fib 37
%d
fib 37
%d
HOSTNAME TESTS
pbsdsh -n 0 hostname
%s
pbsdsh -n 1 hostname
%s
pbsdsh -n 2 hostname
%s
PBS_NODEFILE tests
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
""" % (jid, self.momA.hostname, self.momB.hostname, self.momD.hostname,
self.fib37_value, self.fib37_value, self.fib37_value,
self.momA.shortname, self.momB.shortname, self.momD.shortname,
self.momA.hostname, self.momA.hostname, self.momA.shortname,
self.momB.hostname, self.momB.hostname, self.momB.shortname,
self.momD.hostname, self.momD.hostname, self.momD.shortname)
self.logger.info("expected out=%s" % (expected_out,))
job_out = ""
with open(job_output_file, 'r') as fd:
job_out = fd.read()
self.logger.info("job_out=%s" % (job_out,))
self.assertEquals(job_out, expected_out)
def test_t7(self):
"""
Test: tolerating job_start of 2 node failures used to
satisfy the larger chunks, after adding extra nodes
to the job. Pruning job's assigned resources to match up
to the original select spec would fail, as the
unsatisfied chunk requests cannot be handled by
by the remaining smaller sized nodes. The failure
to prune job is followed by a pbs.event().rerun()
action and a job hold. Also, this test
setting tolerate_node_falures=none.
"""
# instantiate queuejob hook
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_begin hook
hook_event = "execjob_begin"
hook_name = "begin"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.begin_hook_body)
# instantiate execjob_prologue hook
hook_body = """
import pbs
e=pbs.event()
pbs.logmsg(pbs.LOG_DEBUG, "Executing prologue")
localnode=pbs.get_local_nodename()
if not e.job.in_ms_mom() and (localnode == '%s'):
x
""" % (self.nD,)
hook_event = "execjob_prologue"
hook_name = "prolo"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, hook_body)
# instantiate execjob_launch hook
hook_event = "execjob_launch"
hook_name = "launch"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.launch_hook_body)
# First, turn off scheduling
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
jid = self.create_and_submit_job('job1')
# Job gets queued and reflects the incremented values from queuejob
# hook
self.server.expect(JOB, {'job_state': 'Q',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 13,
'Resource_List.nodect': 5,
'Resource_List.select': self.job1_iselect,
'Resource_List.site': self.job1_oselect,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_ischedselect},
id=jid, attrop=PTL_AND)
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
# Job eventually launches reflecting the pruned back values
# to the original select spec
# There's a max_attempts=60 for it would take up to 60 seconds
# for primary mom to wait for the sisters to join
# (default $sister_join_job_alarm of 30 seconds) and to wait for
# sisters to execjob_prologue hooks (default $job_launch_delay
# value of 30 seconds)
# Verify mom_logs
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+could not JOIN_JOB" % (
jid, self.hostB), n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+as job " % (jid, self.hostB) +
"is tolerant of node failures",
regexp=True, n=10)
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+" % (jid, self.hostD) +
"could not IM_EXEC_PROLOGUE", n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+" % (jid, self.hostD) +
"as job is tolerant of node failures", n=10, regexp=True)
self.momA.log_match("Job;%s;could not satisfy select chunk" % (jid,),
n=10)
self.momA.log_match("Job;%s;NEED chunks for keep_select" % (jid,),
n=10)
self.momA.log_match(
"Job;%s;HAVE chunks from job's exec_vnode" % (jid,), n=10)
self.momA.log_match("execjob_launch request rejected by 'launch'",
n=10)
errmsg = "unsuccessful at LAUNCH"
self.momA.log_match("Job;%s;%s" % (jid, errmsg,), n=10)
self.server.expect(JOB, {'job_state': 'H'},
id=jid, interval=1, max_attempts=70)
# turn off queuejob
self.server.manager(MGR_CMD_SET, HOOK, {'enabled': 'false'}, 'qjob')
# modify job so as to not tolerate_node_failures
a = {ATTR_tolerate_node_failures: "none"}
self.server.alterjob(jobid=jid, attrib=a)
# release hold on job
self.server.rlsjob(jobid=jid, holdtype='s')
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
# Verify mom_logs
self.momA.log_match(
"Job;%s;job_start_error.+could not JOIN_JOB" % (
jid), n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+as job " % (jid, self.hostE) +
"is tolerant of node failures",
regexp=True, n=10, existence=False, max_attempts=10)
self.server.expect(JOB, {'job_state': 'H'},
id=jid, interval=1, max_attempts=15)
# turn off begin hook, leaving prologue hook in place
self.server.manager(MGR_CMD_SET, HOOK, {'enabled': 'false'}, 'begin')
# release hold on job
self.server.rlsjob(jobid=jid, holdtype='s')
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
self.momA.log_match(
"Job;%s;job_start_error.+could not IM_EXEC_PROLOGUE" % (jid,),
n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+" % (jid, self.hostC) +
"as job is tolerant of node failures", n=10, regexp=True,
existence=False, max_attempts=10)
self.server.expect(JOB, {'job_state': 'H'},
id=jid, interval=1, max_attempts=15)
# turn off prologue hook, so only launch hook remains.
self.server.manager(MGR_CMD_SET, HOOK, {'enabled': 'false'}, 'prolo')
# release hold on job
self.server.rlsjob(jobid=jid, holdtype='s')
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
self.server.expect(JOB, {'job_state': 'R',
'tolerate_node_failures': 'none',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 13,
'Resource_List.nodect': 5,
'Resource_List.select': self.job1_iselect,
'Resource_List.site': self.job1_oselect,
'Resource_List.place': self.job1_place,
'exec_host': self.job1_iexec_host,
'exec_vnode': self.job1_iexec_vnode,
'schedselect': self.job1_ischedselect},
id=jid, attrop=PTL_AND)
# tolerate_node_failures=none and launch hook calls release_nodes()
emsg = "no nodes released as job does not tolerate node failures"
self.momA.log_match("%s: %s" % (jid, emsg), n=30)
def test_t8(self):
"""
Test tolerating node failures at job startup with no
failed moms.
1. Submit a job that has been submitted with a select
spec of 2 super-chunks say (A) and (B), and 1 chunk
of (C), along with place spec of "scatter",
resulting in the following assignment:
exec_vnode = (A)+(B)+(C)
and -Wtolerate_node_failures=all
2. Have a queuejob hook that adds 1 extra node to each
chunk (except the MS (first) chunk), resulting in the
assignment:
exec_vnode = (A)+(B)+(D)+(C)+(E)
where D mirrors super-chunk B specs while E mirrors
chunk C.
3. Have an execjob_begin, execjob_prologue hooks that don't
fail any of the sister moms.
when executed by mom managing vnodes in (C).
4. Then create an execjob_launch that prunes back the job's
exec_vnode assignment back to satisfying the original 3-node
select spec, choosing only healthy nodes.
5. Result:
a. This results in the following reassignment of chunks:
exec_vnode = (A)+(B)+(C)
b. The accounting log start record 'S' will reflect the
select request where additional chunks were added, while
the secondary start record 's' will reflect the assigned
resources after pruning the original select request via
the pbs.release_nodes(keep_select=...) call
inside execjob_launch hook.
"""
# instantiate queuejob hook
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_begin hook
hook_event = "execjob_begin"
hook_name = "begin"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.begin_hook_body2)
# instantiate execjob_prologue hook
hook_event = "execjob_prologue"
hook_name = "prolo"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.prolo_hook_body3)
# instantiate execjob_launch hook
hook_event = "execjob_launch"
hook_name = "launch"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.launch_hook_body)
# First, turn off scheduling
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
jid = self.create_and_submit_job('job1')
# Job gets queued and reflects the incremented values from queuejob
# hook
self.server.expect(JOB, {'job_state': 'Q',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 13,
'Resource_List.nodect': 5,
'Resource_List.select': self.job1_iselect,
'Resource_List.site': self.job1_oselect,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_ischedselect},
id=jid, attrop=PTL_AND)
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
# Job eventually launches reflecting the pruned back values
# to the original select spec
# There's a max_attempts=60 for it would take up to 60 seconds
# for primary mom to wait for the sisters to join
# (default $sister_join_job_alarm of 30 seconds) and to wait for
# sisters to execjob_prologue hooks (default $job_launch_delay
# value of 30 seconds)
self.server.expect(JOB, {'job_state': 'R',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '6gb',
'Resource_List.ncpus': 8,
'Resource_List.nodect': 3,
'Resource_List.select': self.job1v5_select,
'Resource_List.place': self.job1_place,
'schedselect': self.job1v5_schedselect,
'exec_host': self.job1v5_exec_host,
'exec_vnode': self.job1v5_exec_vnode},
id=jid, interval=1, attrop=PTL_AND, max_attempts=60)
thisjob = self.server.status(JOB, id=jid)
if thisjob:
job_output_file = thisjob[0]['Output_Path'].split(':')[1]
# Check various vnode status.
jobs_assn1 = "%s/0" % (jid,)
self.match_vnode_status(
[self.nAv0, self.nAv1, self.nB, self.nBv0],
'job-busy', jobs_assn1, 1, '1048576kb')
self.match_vnode_status([self.nAv2, self.nBv1],
'job-busy', jobs_assn1, 1, '0kb')
jobs_assn2 = "%s/0, %s/1" % (jid, jid)
self.match_vnode_status([self.nC], 'job-busy', jobs_assn2,
2, '2097152kb')
self.match_vnode_status([self.nA, self.nAv3, self.nBv2, self.nBv3,
self.nE, self.nEv0, self.nEv1, self.nEv2,
self.nEv3], 'free')
# Check server/queue counts.
self.server.expect(SERVER, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
attrop=PTL_AND)
self.server.expect(QUEUE, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
id='workq', attrop=PTL_AND)
self.assertTrue(
self.pbs_nodefile_match_exec_host(jid, self.job1v5_exec_host))
# Check vnode_list[] parameter in execjob_prologue hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;prolo: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list[] parameter in execjob_launch hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;launch: found vnode_list[%s]" % (
jid, vn), n=10)
# Check result of pbs.event().job.release_nodes(keep_select) call
self.momA.log_match("Job;%s;launch: job.exec_vnode=%s" % (
jid, self.job1v5_exec_vnode), n=10)
self.momA.log_match("Job;%s;launch: job.schedselect=%s" % (
jid, self.job1v5_schedselect), n=10)
self.momA.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, self.job1_iexec_vnode), n=10)
self.momA.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, self.job1v5_exec_vnode), n=10)
# Check accounting_logs
self.match_accounting_log('S', jid, self.job1_iexec_host_esc,
self.job1_iexec_vnode_esc, "10gb", 13, 5,
self.job1_place,
self.job1_isel_esc)
self.match_accounting_log('s', jid, self.job1v5_exec_host_esc,
self.job1v5_exec_vnode_esc,
"6gb", 8, 3,
self.job1_place,
self.job1v5_sel_esc)
self.momA.log_match("Job;%s;task.+started, hostname" % (jid,),
n=10, max_attempts=60, interval=2, regexp=True)
self.momA.log_match("Job;%s;copy file request received" % (jid,),
n=10, max_attempts=10, interval=2)
# validate output
expected_out = """/var/spool/pbs/aux/%s
%s
%s
%s
FIB TESTS
pbsdsh -n 1 fib 37
%d
pbsdsh -n 2 fib 37
%d
fib 37
%d
HOSTNAME TESTS
pbsdsh -n 0 hostname
%s
pbsdsh -n 1 hostname
%s
pbsdsh -n 2 hostname
%s
PBS_NODEFILE tests
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
""" % (jid, self.momA.hostname, self.momB.hostname, self.momC.hostname,
self.fib37_value, self.fib37_value, self.fib37_value,
self.momA.shortname, self.momB.shortname, self.momC.shortname,
self.momA.hostname, self.momA.hostname, self.momA.shortname,
self.momB.hostname, self.momB.hostname, self.momB.shortname,
self.momC.hostname, self.momC.hostname, self.momC.shortname)
self.logger.info("expected out=%s" % (expected_out,))
job_out = ""
with open(job_output_file, 'r') as fd:
job_out = fd.read()
self.logger.info("job_out=%s" % (job_out,))
self.assertEquals(job_out, expected_out)
@timeout(400)
def test_t9(self):
"""
Test tolerating 'all' node failures at job startup and
within the life of the job.
1. Submit a job that has been submitted with a select
spec of 2 super-chunks say (A) and (B), and 1 chunk
of (C), along with place spec of "scatter",
resulting in the following assignment:
exec_vnode = (A)+(B)+(C)
and -Wtolerate_node_failures=all
2. Have a queuejob hook that adds 1 extra node to each
chunk (except the MS (first) chunk), resulting in the
assignment:
exec_vnode = (A)+(B)+(D)+(C)+(E)
where D mirrors super-chunk B specs while E mirrors
chunk C.
3. Have an execjob_begin hook that fails (causes rejection)
when executed by mom managing vnodes in (B).
4. Have an execjob_prologue hook that fails (causes rejection)
when executed by mom managing vnodes in (C).
5. Then create an execjob_launch that prunes back the job's
exec_vnode assignment back to satisfying the original 3-node
select spec, choosing only healthy nodes.
6. Now kill -KILL mom host hostD.
7. Result:
a. This results in the following reassignment of chunks:
exec_vnode = (A)+(D)+(E)
since (B) and (C) contain vnodes from failed moms.
b. Job continues to run even after nodeD goes down with
only an indication in mom_logs with the message:
im_eof, Premature end of message from addr n stream 4
"""
# set this so as to not linger on delaying job kill.
c = {'$max_poll_downtime': 10}
self.momA.add_config(c)
# instantiate queuejob hook, tolerate_node_failure is set to 'all'
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body2)
# instantiate execjob_begin hook
hook_event = "execjob_begin"
hook_name = "begin"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.begin_hook_body)
# instantiate execjob_prologue hook
hook_event = "execjob_prologue"
hook_name = "prolo"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.prolo_hook_body)
# instantiate execjob_launch hook
hook_event = "execjob_launch"
hook_name = "launch"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.launch_hook_body)
# First, turn off scheduling
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
jid = self.create_and_submit_job('job1_2')
# Job gets queued and reflects the incremented values from queuejob
# hook
self.server.expect(JOB, {'job_state': 'Q',
'tolerate_node_failures': 'all',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 13,
'Resource_List.nodect': 5,
'Resource_List.select': self.job1_iselect,
'Resource_List.site': self.job1_oselect,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_ischedselect},
id=jid, attrop=PTL_AND)
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
# Job eventually launches reflecting the pruned back values
# to the original select spec
# There's a max_attempts=60 for it would take up to 60 seconds
# for primary mom to wait for the sisters to join
# (default $sister_join_job_alarm of 30 seconds) and to wait for
# sisters to execjob_prologue hooks (default $job_launch_delay
# value of 30 seconds)
self.server.expect(JOB, {'job_state': 'R',
'tolerate_node_failures': 'all',
'Resource_List.mem': '6gb',
'Resource_List.ncpus': 8,
'Resource_List.nodect': 3,
'Resource_List.select': self.job1_select,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_schedselect,
'exec_host': self.job1_exec_host,
'exec_vnode': self.job1_exec_vnode},
id=jid, interval=1, attrop=PTL_AND, max_attempts=60)
thisjob = self.server.status(JOB, id=jid)
if thisjob:
job_output_file = thisjob[0]['Output_Path'].split(':')[1]
# Check various vnode status.
jobs_assn1 = "%s/0" % (jid,)
self.match_vnode_status(
[self.nAv0, self.nAv1, self.nE, self.nEv0],
'job-busy', jobs_assn1, 1, '1048576kb')
self.match_vnode_status([self.nAv2],
'job-busy', jobs_assn1, 1, '0kb')
jobs_assn3 = "%s/0, %s/1, %s/2" % (jid, jid, jid)
self.match_vnode_status([self.nD], 'free', jobs_assn3,
3, '2097152kb')
self.match_vnode_status([self.nA, self.nAv3, self.nB, self.nBv0,
self.nBv1, self.nBv2, self.nBv3, self.nC,
self.nEv1, self.nEv2, self.nEv3], 'free')
# Check server/queue counts.
self.server.expect(SERVER, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
attrop=PTL_AND)
self.server.expect(QUEUE, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
id='workq', attrop=PTL_AND)
self.assertTrue(
self.pbs_nodefile_match_exec_host(jid, self.job1_exec_host))
# Verify mom_logs
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+could not JOIN_JOB" % (
jid, self.hostB), n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+as job " % (jid, self.hostB) +
"is tolerant of node failures",
regexp=True, n=10)
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+" % (jid, self.hostC) +
"could not IM_EXEC_PROLOGUE", n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+" % (jid, self.hostC) +
"as job is tolerant of node failures", n=10, regexp=True)
# Check vnode_list[] parameter in execjob_prologue hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;prolo: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_prologue hook
vnode_list_fail = [self.nB, self.nBv0, self.nBv1]
for vn in vnode_list_fail:
self.momA.log_match("Job;%s;prolo: found vnode_list_fail[%s]" % (
jid, vn), n=10)
# Check vnode_list[] parameter in execjob_launch hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;launch: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_launch hook
vnode_list_fail = [self.nB, self.nBv0, self.nBv1, self.nC]
for vn in vnode_list_fail:
self.momA.log_match(
"Job;%s;launch: found vnode_list_fail[%s]" % (jid, vn), n=10)
# Check result of pbs.event().job.release_nodes(keep_select) call
self.momA.log_match("Job;%s;launch: job.exec_vnode=%s" % (
jid, self.job1_exec_vnode), n=10)
self.momA.log_match("Job;%s;launch: job.schedselect=%s" % (
jid, self.job1_schedselect), n=10)
self.momA.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, self.job1_iexec_vnode), n=10)
self.momA.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, self.job1_exec_vnode), n=10)
# Check accounting_logs
self.match_accounting_log('S', jid, self.job1_iexec_host_esc,
self.job1_iexec_vnode_esc, "10gb", 13, 5,
self.job1_place,
self.job1_isel_esc)
self.match_accounting_log('s', jid, self.job1_exec_host_esc,
self.job1_exec_vnode_esc,
"6gb", 8, 3,
self.job1_place,
self.job1_sel_esc)
# temporarily suspend momD, simulating a failed mom host.
self.momD.signal("-KILL")
self.momA.log_match("im_eof, Premature end of message.+on stream 4",
n=10, max_attempts=30, interval=2, regexp=True)
self.momA.log_match("Job;%s;task.+started, hostname" % (jid,),
n=10, max_attempts=60, interval=2, regexp=True)
self.momA.log_match("Job;%s;copy file request received" % (jid,),
n=10, max_attempts=10, interval=2)
# validate output
expected_out = """/var/spool/pbs/aux/%s
%s
%s
%s
FIB TESTS
pbsdsh -n 2 fib 37
%d
fib 37
%d
HOSTNAME TESTS
pbsdsh -n 0 hostname
%s
pbsdsh -n 2 hostname
%s
""" % (jid, self.momA.hostname, self.momD.hostname, self.momE.hostname,
self.fib37_value, self.fib37_value, self.momA.shortname,
self.momE.shortname)
self.logger.info("expected out=%s" % (expected_out,))
job_out = ""
with open(job_output_file, 'r') as fd:
job_out = fd.read()
self.logger.info("job_out=%s" % (job_out,))
self.assertEquals(job_out, expected_out)
self.momD.start()
def test_t10(self):
"""
Test tolerating node failures at job startup but also
cause a failure on one of the nodes after the job has
started.
1. Submit a job that has been submitted with a select
spec of 2 super-chunks say (A) and (B), and 1 chunk
of (C), along with place spec of "scatter",
resulting in the following assignment:
exec_vnode = (A)+(B)+(C)
and -Wtolerate_node_failures=all
2. Have a queuejob hook that adds 1 extra node to each
chunk (except the MS (first) chunk), resulting in the
assignment:
exec_vnode = (A)+(B)+(D)+(C)+(E)
where D mirrors super-chunk B specs while E mirrors
chunk C.
3. Have an execjob_begin hook that fails (causes rejection)
when executed by mom managing vnodes in (B).
4. Have an execjob_prologue hook that fails (causes rejection)
when executed by mom managing vnodes in (C).
5. Then create an execjob_launch that prunes back the job's
exec_vnode assignment back to satisfying the original 3-node
select spec, choosing only healthy nodes.
6. Now kill -KILL mom host hostD.
7. Result:
a. This results in the following reassignment of chunks:
exec_vnode = (A)+(D)+(E)
since (B) and (C) contain vnodes from failed moms.
b. Job eventually aborts after nodeD goes down with
an indication in mom_logs with the message:
"im_eof, lost communication with "
"node EOF 1 ()"
"kill_job"
"""
# set this so as to not linger on delaying job kill.
c = {'$max_poll_downtime': 10}
self.momA.add_config(c)
# instantiate queuejob hook
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_begin hook
hook_event = "execjob_begin"
hook_name = "begin"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.begin_hook_body)
# instantiate execjob_prologue hook
hook_event = "execjob_prologue"
hook_name = "prolo"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.prolo_hook_body)
# instantiate execjob_launch hook
hook_event = "execjob_launch"
hook_name = "launch"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.launch_hook_body)
# First, turn off scheduling
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
jid = self.create_and_submit_job('job1_3')
# Job gets queued and reflects the incremented values from queuejob
# hook
self.server.expect(JOB, {'job_state': 'Q',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 13,
'Resource_List.nodect': 5,
'Resource_List.select': self.job1_iselect,
'Resource_List.site': self.job1_oselect,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_ischedselect},
id=jid, attrop=PTL_AND)
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
# Job eventually launches reflecting the pruned back values
# to the original select spec
# There's a max_attempts=60 for it would take up to 60 seconds
# for primary mom to wait for the sisters to join
# (default $sister_join_job_alarm of 30 seconds) and to wait for
# sisters to execjob_prologue hooks (default $job_launch_delay
# value of 30 seconds)
self.server.expect(JOB, {'job_state': 'R',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '6gb',
'Resource_List.ncpus': 8,
'Resource_List.nodect': 3,
'Resource_List.select': self.job1_select,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_schedselect,
'exec_host': self.job1_exec_host,
'exec_vnode': self.job1_exec_vnode},
id=jid, interval=1, attrop=PTL_AND, max_attempts=60)
# Check various vnode status.
jobs_assn1 = "%s/0" % (jid,)
self.match_vnode_status(
[self.nAv0, self.nAv1, self.nE, self.nEv0],
'job-busy', jobs_assn1, 1, '1048576kb')
self.match_vnode_status([self.nAv2],
'job-busy', jobs_assn1, 1, '0kb')
jobs_assn3 = "%s/0, %s/1, %s/2" % (jid, jid, jid)
self.match_vnode_status([self.nD], 'free', jobs_assn3,
3, '2097152kb')
self.match_vnode_status([self.nA, self.nAv3, self.nB, self.nBv0,
self.nBv1, self.nBv2, self.nBv3, self.nC,
self.nEv1, self.nEv2, self.nEv3], 'free')
# Check server/queue counts.
self.server.expect(SERVER, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
attrop=PTL_AND)
self.server.expect(QUEUE, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
id='workq', attrop=PTL_AND)
self.assertTrue(
self.pbs_nodefile_match_exec_host(jid, self.job1_exec_host))
# Verify mom_logs
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+could not JOIN_JOB" % (
jid, self.hostB), n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+as job " % (jid, self.hostB) +
"is tolerant of node failures",
regexp=True, n=10)
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+" % (jid, self.hostC) +
"could not IM_EXEC_PROLOGUE", n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+" % (jid, self.hostC) +
"as job is tolerant of node failures", n=10, regexp=True)
# Check vnode_list[] parameter in execjob_prologue hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;prolo: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_prologue hook
vnode_list_fail = [self.nB, self.nBv0, self.nBv1]
for vn in vnode_list_fail:
self.momA.log_match("Job;%s;prolo: found vnode_list_fail[%s]" % (
jid, vn), n=10)
# Check vnode_list[] parameter in execjob_launch hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;launch: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_launch hook
vnode_list_fail = [self.nB, self.nBv0, self.nBv1, self.nC]
for vn in vnode_list_fail:
self.momA.log_match("Job;%s;launch: found vnode_list_fail[%s]" % (
jid, vn), n=10)
# Check result of pbs.event().job.release_nodes(keep_select) call
self.momA.log_match("Job;%s;launch: job.exec_vnode=%s" % (
jid, self.job1_exec_vnode), n=10)
self.momA.log_match("Job;%s;launch: job.schedselect=%s" % (
jid, self.job1_schedselect), n=10)
self.momA.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, self.job1_iexec_vnode), n=10)
self.momA.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, self.job1_exec_vnode), n=10)
# Check accounting_logs
self.match_accounting_log('S', jid, self.job1_iexec_host_esc,
self.job1_iexec_vnode_esc, "10gb", 13, 5,
self.job1_place,
self.job1_isel_esc)
self.match_accounting_log('s', jid, self.job1_exec_host_esc,
self.job1_exec_vnode_esc,
"6gb", 8, 3,
self.job1_place,
self.job1_sel_esc)
# temporarily suspend momD, simulating a failed mom host.
self.momD.signal("-KILL")
self.momA.log_match(
"Job;%s;im_eof, lost communication with %s.+killing job now" % (
jid, self.nD), n=10, max_attempts=30, interval=2, regexp=True)
self.momA.log_match("Job;%s;kill_job" % (jid,),
n=10, max_attempts=60, interval=2)
self.momD.start()
def test_t11(self):
"""
Test: tolerating node failures at job startup with
job having an ncpus=0 assignment. This ensures
the hooks will have the info for the ncpus=0 chunks
in pbs.event().vnode_list[].
"""
# instantiate queuejob hook
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_begin hook
hook_event = "execjob_begin"
hook_name = "begin"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.begin_hook_body)
# instantiate execjob_prologue hook
hook_event = "execjob_prologue"
hook_name = "prolo"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.prolo_hook_body)
# instantiate execjob_launch hook
hook_event = "execjob_launch"
hook_name = "launch"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.launch_hook_body)
# First, turn off scheduling
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
jid = self.create_and_submit_job('job2')
# Job gets queued and reflects the incremented values from queuejob
# hook
self.server.expect(JOB, {'job_state': 'Q',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 9,
'Resource_List.nodect': 5,
'Resource_List.select': self.job2_iselect,
'Resource_List.site': self.job2_oselect,
'Resource_List.place': self.job2_place,
'schedselect': self.job2_ischedselect},
max_attempts=10, id=jid, attrop=PTL_AND)
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
# Job eventually launches reflecting the pruned back values
# to the original select spec
# There's a max_attempts=60 for it would take up to 60 seconds
# for primary mom to wait for the sisters to join
# (default $sister_join_job_alarm of 30 seconds) and to wait for
# sisters to execjob_prologue hooks (default $job_launch_delay
# value of 30 seconds)
self.server.expect(JOB, {'job_state': 'R',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '6gb',
'Resource_List.ncpus': 6,
'Resource_List.nodect': 3,
'Resource_List.select': self.job2_select,
'Resource_List.place': self.job2_place,
'schedselect': self.job2_schedselect,
'exec_host': self.job2_exec_host,
'exec_vnode': self.job2_exec_vnode},
id=jid, interval=1, attrop=PTL_AND, max_attempts=60)
# Check various vnode status.
jobs_assn1 = "%s/0" % (jid,)
self.match_vnode_status([self.nAv0, self.nAv1],
'job-busy', jobs_assn1, 1, '1048576kb')
self.match_vnode_status([self.nE, self.nEv0],
'free', jobs_assn1, 0, '1048576kb')
self.match_vnode_status([self.nAv2],
'job-busy', jobs_assn1, 1, '0kb')
jobs_assn3 = "%s/0, %s/1, %s/2" % (jid, jid, jid)
self.match_vnode_status([self.nD], 'free', jobs_assn3,
3, '2097152kb')
self.match_vnode_status([self.nA, self.nAv3, self.nB, self.nBv0,
self.nBv1, self.nBv2, self.nBv3, self.nC,
self.nEv1, self.nEv2, self.nEv3], 'free')
# Check server/queue counts.
self.server.expect(SERVER, {'resources_assigned.ncpus': 6,
'resources_assigned.mem': '6291456kb'},
attrop=PTL_AND)
self.server.expect(QUEUE, {'resources_assigned.ncpus': 6,
'resources_assigned.mem': '6291456kb'},
id='workq', attrop=PTL_AND)
self.assertTrue(
self.pbs_nodefile_match_exec_host(jid, self.job2_exec_host_nfile))
# Verify mom_logs
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+could not JOIN_JOB" % (
jid, self.hostB), n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+as job " % (jid, self.hostB) +
"is tolerant of node failures",
regexp=True, n=10)
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+" % (jid, self.hostC) +
"could not IM_EXEC_PROLOGUE", n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+" % (jid, self.hostC) +
"as job is tolerant of node failures", n=10, regexp=True)
# Check vnode_list[] parameter in execjob_prologue hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;prolo: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_prologue hook
vnode_list_fail = [self.nB, self.nBv0, self.nBv1]
for vn in vnode_list_fail:
self.momA.log_match("Job;%s;prolo: found vnode_list_fail[%s]" % (
jid, vn), n=10)
# Check vnode_list[] parameter in execjob_launch hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;launch: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_launch hook
vnode_list_fail = [self.nB, self.nBv0, self.nBv1, self.nC]
for vn in vnode_list_fail:
self.momA.log_match(
"Job;%s;launch: found vnode_list_fail[%s]" % (jid, vn), n=10)
# Check result of pbs.event().job.release_nodes(keep_select) call
self.momA.log_match("Job;%s;launch: job.exec_vnode=%s" % (
jid, self.job2_exec_vnode), n=10)
self.momA.log_match("Job;%s;launch: job.schedselect=%s" % (
jid, self.job2_schedselect), n=10)
self.momA.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, self.job2_iexec_vnode), n=10)
self.momA.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, self.job2_exec_vnode), n=10)
# Check accounting_logs
self.match_accounting_log('S', jid, self.job2_iexec_host_esc,
self.job2_iexec_vnode_esc, "10gb", 9, 5,
self.job2_place,
self.job2_isel_esc)
self.match_accounting_log('s', jid, self.job2_exec_host_esc,
self.job2_exec_vnode_esc,
"6gb", 6, 3,
self.job2_place,
self.job2_sel_esc)
def test_t12(self):
"""
Test: tolerating node failures at job startup with
extra resources requested such as mpiprocs and
ompthtreads which would affect content of $PBS_NODEFILE.
"""
# instantiate queuejob hook
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_begin hook
hook_event = "execjob_begin"
hook_name = "begin"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.begin_hook_body)
# instantiate execjob_prologue hook
hook_event = "execjob_prologue"
hook_name = "prolo"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.prolo_hook_body)
# instantiate execjob_launch hook
hook_event = "execjob_launch"
hook_name = "launch"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.launch_hook_body)
# First, turn off scheduling
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
jid = self.create_and_submit_job('job3')
# Job gets queued and reflects the incremented values from queuejob
# hook
self.server.expect(JOB, {'job_state': 'Q',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 13,
'Resource_List.nodect': 5,
'Resource_List.select': self.job3_iselect,
'Resource_List.site': self.job3_oselect,
'Resource_List.place': self.job3_place,
'schedselect': self.job3_ischedselect},
max_attempts=10, id=jid, attrop=PTL_AND)
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
# Job eventually launches reflecting the pruned back values
# to the original select spec
# There's a max_attempts=60 for it would take up to 60 seconds
# for primary mom to wait for the sisters to join
# (default $sister_join_job_alarm of 30 seconds) and to wait for
# sisters to execjob_prologue hooks (default $job_launch_delay
# value of 30 seconds)
self.server.expect(JOB, {'job_state': 'R',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '6gb',
'Resource_List.ncpus': 8,
'Resource_List.nodect': 3,
'Resource_List.select': self.job3_select,
'Resource_List.place': self.job3_place,
'schedselect': self.job3_schedselect,
'exec_host': self.job3_exec_host,
'exec_vnode': self.job3_exec_vnode},
id=jid, interval=1, attrop=PTL_AND, max_attempts=60)
# Check various vnode status.
jobs_assn1 = "%s/0" % (jid,)
self.match_vnode_status([self.nAv0, self.nAv1],
'job-busy', jobs_assn1, 1, '1048576kb')
self.match_vnode_status([self.nE, self.nEv0],
'job-busy', jobs_assn1, 1, '1048576kb')
self.match_vnode_status([self.nAv2],
'job-busy', jobs_assn1, 1, '0kb')
jobs_assn3 = "%s/0, %s/1, %s/2" % (jid, jid, jid)
self.match_vnode_status([self.nD], 'free', jobs_assn3,
3, '2097152kb')
self.match_vnode_status([self.nA, self.nAv3, self.nB, self.nBv0,
self.nBv1, self.nBv2, self.nBv3, self.nC,
self.nEv1, self.nEv2, self.nEv3], 'free')
# Check server/queue counts.
self.server.expect(SERVER, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
attrop=PTL_AND)
self.server.expect(QUEUE, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
id='workq', attrop=PTL_AND)
self.assertTrue(
self.pbs_nodefile_match_exec_host(jid, self.job3_exec_host,
self.job3_schedselect))
# Verify mom_logs
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+could not JOIN_JOB" % (
jid, self.hostB), n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+as job " % (jid, self.hostB) +
"is tolerant of node failures",
regexp=True, n=10)
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+" % (jid, self.hostC) +
"could not IM_EXEC_PROLOGUE", n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+" % (jid, self.hostC) +
"as job is tolerant of node failures", n=10, regexp=True)
# Check vnode_list[] parameter in execjob_prologue hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;prolo: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_prologue hook
vnode_list_fail = [self.nB, self.nBv0, self.nBv1]
for vn in vnode_list_fail:
self.momA.log_match("Job;%s;prolo: found vnode_list_fail[%s]" % (
jid, vn), n=10)
# Check vnode_list[] parameter in execjob_launch hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;launch: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_launch hook
vnode_list_fail = [self.nB, self.nBv0, self.nBv1, self.nC]
for vn in vnode_list_fail:
self.momA.log_match("Job;%s;launch: found vnode_list_fail[%s]" % (
jid, vn), n=10)
# Check result of pbs.event().job.release_nodes(keep_select) call
self.momA.log_match("Job;%s;launch: job.exec_vnode=%s" % (
jid, self.job3_exec_vnode), n=10)
self.momA.log_match("Job;%s;launch: job.schedselect=%s" % (
jid, self.job3_schedselect), n=10)
self.momA.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, self.job3_iexec_vnode), n=10)
self.momA.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, self.job3_exec_vnode), n=10)
# Check accounting_logs
self.match_accounting_log('S', jid, self.job3_iexec_host_esc,
self.job3_iexec_vnode_esc, "10gb", 13, 5,
self.job3_place,
self.job3_isel_esc)
self.match_accounting_log('s', jid, self.job3_exec_host_esc,
self.job3_exec_vnode_esc,
"6gb", 8, 3,
self.job3_place,
self.job3_sel_esc)
def test_t13(self):
"""
Test: pbs.event().job.select.increment_chunks() method.
"""
# instantiate queuejob hook
hook_body = """
import pbs
e=pbs.event()
sel=pbs.select("ncpus=3:mem=1gb+1:ncpus=2:mem=2gb+2:ncpus=1:mem=3gb")
inp=2
isel=sel.increment_chunks(inp)
pbs.logmsg(pbs.LOG_DEBUG, "sel=%s" % (sel,))
pbs.logmsg(pbs.LOG_DEBUG, "sel.increment_chunks(%d)=%s" % (inp,isel))
inp="3"
isel=sel.increment_chunks(inp)
pbs.logmsg(pbs.LOG_DEBUG, "sel.increment_chunks(%s)=%s" % (inp,isel))
inp="23.5%"
isel=sel.increment_chunks(inp)
pbs.logmsg(pbs.LOG_DEBUG, "sel.increment_chunks(%s)=%s" % (inp,isel))
inp={0: 0, 1: 4, 2: "50%"}
isel=sel.increment_chunks(inp)
pbs.logmsg(pbs.LOG_DEBUG, "sel.increment_chunks(%s)=%s" % (inp,isel))
sel=pbs.select("5:ncpus=3:mem=1gb+1:ncpus=2:mem=2gb+2:ncpus=1:mem=3gb")
pbs.logmsg(pbs.LOG_DEBUG, "sel=%s" % (sel,))
inp={0: "50%", 1: "50%", 2: "50%"}
isel=sel.increment_chunks(inp)
pbs.logmsg(pbs.LOG_DEBUG, "sel.increment_chunks(%s)=%s" % (inp,isel))
"""
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, hook_body)
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
j1 = Job(TEST_USER)
j1.set_sleep_time(10)
self.server.submit(j1)
# Verify server_logs
self.server.log_match(
"sel=ncpus=3:mem=1gb+1:ncpus=2:mem=2gb+2:ncpus=1:mem=3gb", n=10)
self.server.log_match(
"sel.increment_chunks(2)=1:ncpus=3:mem=1gb+" +
"3:ncpus=2:mem=2gb+4:ncpus=1:mem=3gb", n=10)
self.server.log_match(
"sel.increment_chunks(3)=1:ncpus=3:mem=1gb+" +
"4:ncpus=2:mem=2gb+5:ncpus=1:mem=3gb", n=10)
self.server.log_match(
"sel.increment_chunks(23.5%)=1:ncpus=3:mem=1gb+" +
"2:ncpus=2:mem=2gb+3:ncpus=1:mem=3gb", n=10)
self.server.log_match(
"sel.increment_chunks({0: 0, 1: 4, 2: \'50%\'})=1:ncpus=3:" +
"mem=1gb+5:ncpus=2:mem=2gb+3:ncpus=1:mem=3gb", n=10)
self.server.log_match(
"sel=5:ncpus=3:mem=1gb+1:ncpus=2:mem=2gb+2:ncpus=1:mem=3gb",
n=10)
self.server.log_match(
"sel.increment_chunks({0: \'50%\', 1: \'50%\', 2: \'50%\'})=" +
"7:ncpus=3:mem=1gb+2:ncpus=2:mem=2gb+3:ncpus=1:mem=3gb", n=10)
def test_t14(self):
"""
Test: tolerating job_start of no node failures,
but pruning job's assigned nodes to satisfy the original
select spec + 1 additional node.
Basically, given an original spec requiring
3 nodes, and a queuejob hook has added 2 more nodes,
resulting in a new assignment:
exec_vnode=(A)+(B)+(C)+(D)+(E) where
(C) mirrors (B) and satisfy the second chunk, and (E)
mirrors (D) and satisfy the third chunk.
Now pruning the assigned nodes to need 4 nodes, would
result in:
exec_vnode=(A)+(B)+(D)+(e1)
where (E) is a super-chunk of the form (e1+e2) and only
need 'e1' part.
"""
# instantiate queuejob hook
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_launch hook
hook_body = """
import pbs
e=pbs.event()
if 'PBS_NODEFILE' not in e.env:
e.accept()
pbs.logmsg(pbs.LOG_DEBUG, "Executing launch")
for vn in e.vnode_list:
v = e.vnode_list[vn]
pbs.logjobmsg(e.job.id, "launch: found vnode_list[" + v.name + "]")
for vn in e.vnode_list_fail:
v = e.vnode_list_fail[vn]
pbs.logjobmsg(e.job.id, "launch: found vnode_list_fail[" + v.name + "]")
if e.job.in_ms_mom():
new_jsel = e.job.Resource_List["site"] + "+ncpus=1:mem=1gb"
pj = e.job.release_nodes(keep_select=new_jsel)
pbs.logmsg(pbs.LOG_DEBUG, "release_nodes(keep_select=%s)" % (new_jsel,))
if pj != None:
pbs.logjobmsg(e.job.id, "launch: job.exec_vnode=%s" % (pj.exec_vnode,))
pbs.logjobmsg(e.job.id, "launch: job.exec_host=%s" % (pj.exec_host,))
pbs.logjobmsg(e.job.id,
"launch: job.schedselect=%s" % (pj.schedselect,))
else:
e.job.delete()
msg = "unsuccessful at LAUNCH"
e.reject("unsuccessful at LAUNCH")
"""
hook_event = "execjob_launch"
hook_name = "launch"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, hook_body)
# First, turn off scheduling
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
jid = self.create_and_submit_job('job1_4')
# Job gets queued and reflects the incremented values from queuejob
# hook
self.server.expect(JOB, {'job_state': 'Q',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 13,
'Resource_List.nodect': 5,
'Resource_List.select': self.job1_iselect,
'Resource_List.site': self.job1_oselect,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_ischedselect},
id=jid, attrop=PTL_AND)
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
self.server.expect(JOB, {'job_state': 'R',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '7gb',
'Resource_List.ncpus': 9,
'Resource_List.nodect': 4,
'Resource_List.select': self.job1v6_select,
'Resource_List.place': self.job1_place,
'schedselect': self.job1v6_schedselect,
'exec_host': self.job1v6_exec_host,
'exec_vnode': self.job1v6_exec_vnode},
id=jid, interval=1, attrop=PTL_AND, max_attempts=60)
thisjob = self.server.status(JOB, id=jid)
if thisjob:
job_output_file = thisjob[0]['Output_Path'].split(':')[1]
# Check various vnode status.
jobs_assn1 = "%s/0" % (jid,)
self.match_vnode_status(
[self.nAv0, self.nAv1, self.nB, self.nBv0, self.nE],
'job-busy', jobs_assn1, 1, '1048576kb')
self.match_vnode_status([self.nAv2, self.nBv1],
'job-busy', jobs_assn1, 1, '0kb')
jobs_assn2 = "%s/0, %s/1" % (jid, jid)
self.match_vnode_status([self.nC], 'job-busy', jobs_assn2,
2, '2097152kb')
self.match_vnode_status([self.nA, self.nAv3, self.nBv2, self.nBv3,
self.nEv0, self.nEv1, self.nEv2,
self.nEv3], 'free')
# Check server/queue counts.
self.server.expect(SERVER, {'resources_assigned.ncpus': 9,
'resources_assigned.mem': '7340032kb'},
attrop=PTL_AND)
self.server.expect(QUEUE, {'resources_assigned.ncpus': 9,
'resources_assigned.mem': '7340032kb'},
id='workq', attrop=PTL_AND)
self.assertTrue(
self.pbs_nodefile_match_exec_host(jid, self.job1v6_exec_host))
# Check vnode_list[] parameter in execjob_launch hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;launch: found vnode_list[%s]" % (
jid, vn), n=10)
# Check result of pbs.event().job.release_nodes(keep_select) call
self.momA.log_match("Job;%s;launch: job.exec_vnode=%s" % (
jid, self.job1v6_exec_vnode), n=10)
self.momA.log_match("Job;%s;launch: job.schedselect=%s" % (
jid, self.job1v6_schedselect), n=10)
self.momA.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, self.job1_iexec_vnode), n=10)
self.momA.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, self.job1v6_exec_vnode), n=10)
# Check accounting_logs
self.match_accounting_log('S', jid, self.job1_iexec_host_esc,
self.job1_iexec_vnode_esc, "10gb", 13, 5,
self.job1_place,
self.job1_isel_esc)
self.match_accounting_log('s', jid, self.job1v6_exec_host_esc,
self.job1v6_exec_vnode_esc,
"7gb", 9, 4,
self.job1_place,
self.job1v6_sel_esc)
self.momA.log_match("Job;%s;task.+started, hostname" % (jid,),
n=10, max_attempts=60, interval=2, regexp=True)
self.momA.log_match("Job;%s;copy file request received" % (jid,),
n=10, max_attempts=10, interval=2)
# validate output
expected_out = """/var/spool/pbs/aux/%s
%s
%s
%s
%s
FIB TESTS
pbsdsh -n 1 fib 37
%d
pbsdsh -n 2 fib 37
%d
pbsdsh -n 3 fib 37
%d
fib 37
%d
HOSTNAME TESTS
pbsdsh -n 0 hostname
%s
pbsdsh -n 1 hostname
%s
pbsdsh -n 2 hostname
%s
pbsdsh -n 3 hostname
%s
PBS_NODEFILE tests
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
""" % (jid, self.momA.hostname, self.momB.hostname, self.momC.hostname,
self.momE.hostname,
self.fib37_value, self.fib37_value, self.fib37_value,
self.fib37_value,
self.momA.shortname, self.momB.shortname, self.momC.shortname,
self.momE.shortname,
self.momA.hostname, self.momA.hostname, self.momA.shortname,
self.momB.hostname, self.momB.hostname, self.momB.shortname,
self.momC.hostname, self.momC.hostname, self.momC.shortname,
self.momE.hostname, self.momE.hostname, self.momE.shortname)
self.logger.info("expected out=%s" % (expected_out,))
job_out = ""
with open(job_output_file, 'r') as fd:
job_out = fd.read()
self.logger.info("job_out=%s" % (job_out,))
self.assertEquals(job_out, expected_out)
def test_t15(self):
"""
Test: tolerating job_start of no node failures,
but pruning job's assigned nodes to satisfy the original
select spec minus 1 node, except one of the chunks is.
unsatisfiable. This time, the action pbs.event().delete()
action is specified on a failure to prune the job.
"""
# instantiate queuejob hook
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_launch hook
hook_body = """
import pbs
e=pbs.event()
if 'PBS_NODEFILE' not in e.env:
e.accept()
pbs.logmsg(pbs.LOG_DEBUG, "Executing launch")
for vn in e.vnode_list:
v = e.vnode_list[vn]
pbs.logjobmsg(e.job.id, "launch: found vnode_list[" + v.name + "]")
for vn in e.vnode_list_fail:
v = e.vnode_list_fail[vn]
pbs.logjobmsg(e.job.id, "launch: found vnode_list_fail[" + v.name + "]")
if e.job.in_ms_mom():
new_jsel ="ncpus=3:mem=2gb+ncpus=5:mem=3gb"
pj = e.job.release_nodes(keep_select=new_jsel)
pbs.logmsg(pbs.LOG_DEBUG, "release_nodes(keep_select=%s)" % (new_jsel,))
if pj != None:
pbs.logjobmsg(e.job.id, "launch: job.exec_vnode=%s" % (pj.exec_vnode,))
pbs.logjobmsg(e.job.id, "launch: job.exec_host=%s" % (pj.exec_host,))
pbs.logjobmsg(e.job.id,
"launch: job.schedselect=%s" % (pj.schedselect,))
else:
e.job.delete()
msg = "unsuccessful at LAUNCH"
e.reject("unsuccessful at LAUNCH")
"""
hook_event = "execjob_launch"
hook_name = "launch"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, hook_body)
# First, turn off scheduling
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
jid = self.create_and_submit_job('job1_4')
# Job gets queued and reflects the incremented values from queuejob
# hook
self.server.expect(JOB, {'job_state': 'Q',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 13,
'Resource_List.nodect': 5,
'Resource_List.select': self.job1_iselect,
'Resource_List.site': self.job1_oselect,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_ischedselect},
id=jid, attrop=PTL_AND)
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
self.momA.log_match("Job;%s;could not satisfy select chunk" % (jid,),
n=10, max_attempts=60, interval=2)
self.momA.log_match("Job;%s;NEED chunks for keep_select" % (jid,),
n=10)
self.momA.log_match(
"Job;%s;HAVE chunks from job's exec_vnode" % (jid,), n=10)
self.momA.log_match("execjob_launch request rejected by 'launch'",
n=10)
errmsg = "unsuccessful at LAUNCH"
self.momA.log_match("Job;%s;%s" % (jid, errmsg,), n=10)
self.server.expect(JOB, 'queue', op=UNSET, id=jid)
def test_t16(self):
"""
Test: tolerating node failures at job startup with
a job submitted with -l place="scatter:excl".
Like jobs submitted with only "-l place=scatter"
except the vnodes assigned would have a
"job-exclusive" state.
"""
# instantiate queuejob hook
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_begin hook
hook_body = """
import pbs
e=pbs.event()
pbs.logmsg(pbs.LOG_DEBUG, "Executing begin")
localnode=pbs.get_local_nodename()
if not e.job.in_ms_mom() and (localnode == '%s'):
x
""" % (self.nB,)
hook_event = "execjob_begin"
hook_name = "begin"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, hook_body)
# instantiate execjob_prologue hook
hook_event = "execjob_prologue"
hook_name = "prolo"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.prolo_hook_body)
# instantiate execjob_launch hook
hook_event = "execjob_launch"
hook_name = "launch"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.launch_hook_body)
# First, turn off scheduling
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
jid = self.create_and_submit_job('job4')
# Job gets queued and reflects the incremented values from queuejob
# hook
self.server.expect(JOB, {'job_state': 'Q',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 13,
'Resource_List.nodect': 5,
'Resource_List.select': self.job4_iselect,
'Resource_List.site': self.job4_oselect,
'Resource_List.place': self.job4_place,
'schedselect': self.job4_ischedselect},
max_attempts=10, id=jid, attrop=PTL_AND)
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
# Job eventually launches reflecting the pruned back values
# to the original select spec
# There's a max_attempts=60 for it would take up to 60 seconds
# for primary mom to wait for the sisters to join
# (default $sister_join_job_alarm of 30 seconds) and to wait for
# sisters to execjob_prologue hooks (default $job_launch_delay
# value of 30 seconds)
self.server.expect(JOB, {'job_state': 'R',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '6gb',
'Resource_List.ncpus': 8,
'Resource_List.nodect': 3,
'Resource_List.select': self.job4_select,
'Resource_List.place': self.job4_place,
'schedselect': self.job4_schedselect,
'exec_host': self.job4_exec_host,
'exec_vnode': self.job4_exec_vnode},
id=jid, interval=1, attrop=PTL_AND, max_attempts=60)
# Check various vnode status.
jobs_assn1 = "%s/0" % (jid,)
self.match_vnode_status([self.nAv0, self.nAv1],
'job-exclusive', jobs_assn1, 1, '1048576kb')
self.match_vnode_status([self.nE, self.nEv0],
'job-exclusive', jobs_assn1, 1, '1048576kb')
self.match_vnode_status([self.nAv2],
'job-exclusive', jobs_assn1, 1, '0kb')
jobs_assn3 = "%s/0, %s/1, %s/2" % (jid, jid, jid)
self.match_vnode_status([self.nD], 'job-exclusive', jobs_assn3,
3, '2097152kb')
self.match_vnode_status([self.nA, self.nAv3, self.nB, self.nBv0,
self.nBv1, self.nBv2, self.nBv3, self.nC,
self.nEv1, self.nEv2, self.nEv3], 'free')
# Check server/queue counts.
self.server.expect(SERVER, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
attrop=PTL_AND)
self.server.expect(QUEUE, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
id='workq', attrop=PTL_AND)
self.assertTrue(
self.pbs_nodefile_match_exec_host(jid, self.job4_exec_host))
# Verify mom_logs
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+could not JOIN_JOB" % (
jid, self.hostB), n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+as job " % (jid, self.hostB) +
"is tolerant of node failures",
regexp=True, n=10)
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+" % (jid, self.hostC) +
"could not IM_EXEC_PROLOGUE", n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+" % (jid, self.hostC) +
"as job is tolerant of node failures", n=10, regexp=True)
# Check vnode_list[] parameter in execjob_prologue hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;prolo: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_prologue hook
vnode_list_fail = [self.nB, self.nBv0, self.nBv1]
for vn in vnode_list_fail:
self.momA.log_match("Job;%s;prolo: found vnode_list_fail[%s]" % (
jid, vn), n=10)
# Check vnode_list[] parameter in execjob_launch hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;launch: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_launch hook
vnode_list_fail = [self.nB, self.nBv0, self.nBv1, self.nC]
for vn in vnode_list_fail:
self.momA.log_match("Job;%s;launch: found vnode_list_fail[%s]" % (
jid, vn), n=10)
# Check result of pbs.event().job.release_nodes(keep_select) call
self.momA.log_match("Job;%s;launch: job.exec_vnode=%s" % (
jid, self.job4_exec_vnode), n=10)
self.momA.log_match("Job;%s;launch: job.schedselect=%s" % (
jid, self.job4_schedselect), n=10)
self.momA.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, self.job4_iexec_vnode), n=10)
self.momA.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, self.job4_exec_vnode), n=10)
# Check accounting_logs
self.match_accounting_log('S', jid, self.job4_iexec_host_esc,
self.job4_iexec_vnode_esc, "10gb", 13, 5,
self.job4_place,
self.job4_isel_esc)
self.match_accounting_log('s', jid, self.job4_exec_host_esc,
self.job4_exec_vnode_esc,
"6gb", 8, 3,
self.job4_place,
self.job4_sel_esc)
def test_t17(self):
"""
Test: tolerating 1 node failure at job startup with
a job submitted with -l place="free".
Like jobs submitted with only "-l place=scatter"
except some vnodes from the same mom would get
allocated to satisfy different chunks.
This test breaks apart one of the multi-chunks of
the form (b1+b2+b3) so that upon reassignment,
(b1+b2) is used.
"""
# instantiate queuejob hook
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_begin hook
hook_event = "execjob_begin"
hook_name = "begin"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.begin_hook_body4)
# instantiate execjob_prologue hook
hook_event = "execjob_prologue"
hook_name = "prolo"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.prolo_hook_body3)
# instantiate execjob_launch hook
hook_event = "execjob_launch"
hook_name = "launch"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.launch_hook_body2)
# First, turn off scheduling
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
jid = self.create_and_submit_job('job5')
# Job gets queued and reflects the incremented values from queuejob
# hook
self.server.expect(JOB, {'job_state': 'Q',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 13,
'Resource_List.nodect': 5,
'Resource_List.select': self.job5_iselect,
'Resource_List.site': self.job5_oselect,
'Resource_List.place': self.job5_place,
'schedselect': self.job5_ischedselect},
max_attempts=10, id=jid, attrop=PTL_AND)
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
# Job eventually launches reflecting the pruned back values
# to the original select spec
# There's a max_attempts=60 for it would take up to 60 seconds
# for primary mom to wait for the sisters to join
# (default $sister_join_job_alarm of 30 seconds) and to wait for
# sisters to execjob_prologue hooks (default $job_launch_delay
# value of 30 seconds)
self.server.expect(JOB, {'job_state': 'R',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '5gb',
'Resource_List.ncpus': 7,
'Resource_List.nodect': 3,
'Resource_List.select': self.job5_select,
'Resource_List.place': self.job5_place,
'schedselect': self.job5_schedselect,
'exec_host': self.job5_exec_host,
'exec_vnode': self.job5_exec_vnode},
id=jid, interval=1, attrop=PTL_AND, max_attempts=60)
# Check various vnode status.
jobs_assn1 = "%s/0" % (jid,)
self.match_vnode_status(
[self.nAv0, self.nAv1, self.nB, self.nBv0],
'job-busy', jobs_assn1, 1, '1048576kb')
self.match_vnode_status([self.nAv2, self.nBv2],
'job-busy', jobs_assn1, 1, '0kb')
# due to free placement, job appears twice as it's been allocated
# twice, one for mem only and the other for ncpus
jobs_assn2 = "%s/0, %s/0" % (jid, jid)
self.match_vnode_status([self.nBv1],
'job-busy', jobs_assn2, 1, '1048576kb')
self.match_vnode_status([self.nA, self.nAv3, self.nC, self.nD,
self.nD, self.nE, self.nEv0, self.nEv1,
self.nEv2, self.nEv3, self.nBv3], 'free')
# Check server/queue counts.
self.server.expect(SERVER, {'resources_assigned.ncpus': 7,
'resources_assigned.mem': '5242880kb'},
attrop=PTL_AND)
self.server.expect(QUEUE, {'resources_assigned.ncpus': 7,
'resources_assigned.mem': '5242880kb'},
id='workq', attrop=PTL_AND)
self.assertTrue(
self.pbs_nodefile_match_exec_host(jid, self.job5_exec_host))
# Verify mom_logs
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+could not JOIN_JOB" % (
jid, self.hostD), n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+as job " % (jid, self.hostD) +
"is tolerant of node failures",
regexp=True, n=10)
# Check vnode_list[] parameter in execjob_prologue hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD]
for vn in vnode_list:
self.momA.log_match("Job;%s;prolo: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list[] parameter in execjob_launch hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD]
for vn in vnode_list:
self.momA.log_match("Job;%s;launch: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_launch hook
vnode_list_fail = [self.nD]
for vn in vnode_list_fail:
self.momA.log_match("Job;%s;launch: found vnode_list_fail[%s]" % (
jid, vn), n=10)
# Check result of pbs.event().job.release_nodes(keep_select) call
self.momA.log_match("Job;%s;launch: job.exec_vnode=%s" % (
jid, self.job5_exec_vnode), n=10)
self.momA.log_match("Job;%s;launch: job.schedselect=%s" % (
jid, self.job5_schedselect), n=10)
self.momA.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, self.job5_iexec_vnode), n=10)
self.momA.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, self.job5_exec_vnode), n=10)
# Check accounting_logs
self.match_accounting_log('S', jid, self.job5_iexec_host_esc,
self.job5_iexec_vnode_esc, "10gb", 13, 5,
self.job5_place,
self.job5_isel_esc)
self.match_accounting_log('s', jid, self.job5_exec_host_esc,
self.job5_exec_vnode_esc,
"5gb", 7, 3,
self.job5_place,
self.job5_sel_esc)
def test_t18(self):
"""
Test: having a node failure tolerant job waiting for healthy nodes
to get rerun (i.e. qrerun). Upon qrerun, job should get
killed, requeued, and restarted.
"""
# instantiate queuejob hook
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_begin hook
hook_event = "execjob_begin"
hook_name = "begin"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.begin_hook_body)
# instantiate execjob_launch hook
hook_event = "execjob_launch"
hook_name = "launch"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.launch_hook_body)
jid = self.create_and_submit_job('job1')
# job's substate is 41 (PRERUN) since it would be waiting for
# healthy nodes being a node failure tolerant job.
# With no prologue hook, MS would wait the default 30
# seconds for healthy nodes.
self.server.expect(JOB, {'job_state': 'R',
'substate': 41,
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 13,
'Resource_List.nodect': 5,
'exec_host': self.job1_iexec_host,
'exec_vnode': self.job1_iexec_vnode,
'Resource_List.select': self.job1_iselect,
'Resource_List.site': self.job1_oselect,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_ischedselect},
id=jid, attrop=PTL_AND)
# Check various vnode status.
jobs_assn1 = "%s/0" % (jid,)
self.match_vnode_status([self.nAv0, self.nAv1, self.nB, self.nBv0,
self.nE, self.nEv0],
'job-busy', jobs_assn1, 1, '1048576kb')
self.match_vnode_status([self.nAv2, self.nBv1],
'job-busy', jobs_assn1, 1, '0kb')
jobs_assn2 = "%s/0, %s/1" % (jid, jid)
self.match_vnode_status([self.nC], 'job-busy', jobs_assn2,
2, '2097152kb')
jobs_assn3 = "%s/0, %s/1, %s/2" % (jid, jid, jid)
self.match_vnode_status([self.nD], 'free', jobs_assn3,
3, '2097152kb')
self.match_vnode_status([self.nA, self.nAv3, self.nBv2, self.nBv3,
self.nEv1, self.nEv2, self.nEv3], 'free')
# check server/queue counts
self.server.expect(SERVER, {'resources_assigned.ncpus': 13,
'resources_assigned.mem': '10485760'},
attrop=PTL_AND)
self.server.expect(QUEUE, {'resources_assigned.ncpus': 13,
'resources_assigned.mem': '10485760'},
id='workq', attrop=PTL_AND)
# Verify mom_logs
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+could not JOIN_JOB" % (
jid, self.hostB), n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+as job " % (jid, self.hostB) +
"is tolerant of node failures",
regexp=True, n=10)
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
self.server.rerunjob(jid)
self.server.expect(JOB, {'job_state': 'Q'}, id=jid)
self.match_vnode_status([self.nA, self.nAv0, self.nAv1, self.nAv2,
self.nAv3,
self.nB, self.nBv0, self.nBv1, self.nBv2,
self.nBv3, self.nC, self.nD, self.nE,
self.nEv0, self.nEv1, self.nEv2,
self.nEv3], 'free')
# check server/queue counts
self.server.expect(SERVER, {'resources_assigned.ncpus': 0,
'resources_assigned.mem': '0kb'},
attrop=PTL_AND)
self.server.expect(QUEUE, {'resources_assigned.ncpus': 0,
'resources_assigned.mem': '0kb'},
id='workq', attrop=PTL_AND)
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
# Now job should start running again
self.server.expect(JOB, {'job_state': 'R',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '6gb',
'Resource_List.ncpus': 8,
'Resource_List.nodect': 3,
'Resource_List.select': self.job1v2_select,
'Resource_List.place': self.job1_place,
'schedselect': self.job1v2_schedselect,
'exec_host': self.job1v2_exec_host,
'exec_vnode': self.job1v2_exec_vnode},
id=jid, interval=1, attrop=PTL_AND, max_attempts=70)
thisjob = self.server.status(JOB, id=jid)
if thisjob:
job_output_file = thisjob[0]['Output_Path'].split(':')[1]
# Check various vnode status.
jobs_assn1 = "%s/0" % (jid,)
self.match_vnode_status([self.nAv0, self.nAv1],
'job-busy', jobs_assn1, 1, '1048576kb')
self.match_vnode_status([self.nAv2],
'job-busy', jobs_assn1, 1, '0kb')
jobs_assn2 = "%s/0, %s/1" % (jid, jid)
self.match_vnode_status([self.nC], 'job-busy', jobs_assn2,
2, '2097152kb')
jobs_assn3 = "%s/0, %s/1, %s/2" % (jid, jid, jid)
self.match_vnode_status([self.nD], 'free', jobs_assn3,
3, '2097152kb')
self.match_vnode_status([self.nA, self.nAv3, self.nB, self.nBv0,
self.nBv1, self.nBv2, self.nBv3, self.nE,
self.nEv0, self.nEv1, self.nEv2,
self.nEv3], 'free')
# check server/queue counts
self.server.expect(SERVER, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
attrop=PTL_AND)
self.server.expect(QUEUE, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
id='workq', attrop=PTL_AND)
self.assertTrue(
self.pbs_nodefile_match_exec_host(jid, self.job1v2_exec_host))
# Verify mom_logs
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+could not JOIN_JOB" % (
jid, self.hostB), n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+as job " % (jid, self.hostB) +
"is tolerant of node failures",
regexp=True, n=10)
# Check vnode_list[] parameter in execjob_launch hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;launch: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_launch hook
vnode_list_fail = [self.nB, self.nBv0, self.nBv1]
for vn in vnode_list_fail:
self.momA.log_match("Job;%s;launch: found vnode_list_fail[%s]" % (
jid, vn), n=10)
# Check result of pbs.event().job.release_nodes(keep_select) call
self.momA.log_match("Job;%s;launch: job.exec_vnode=%s" % (
jid, self.job1v2_exec_vnode), n=10)
self.momA.log_match("Job;%s;launch: job.schedselect=%s" % (
jid, self.job1v2_schedselect), n=10)
self.momA.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, self.job1_iexec_vnode), n=10)
self.momA.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, self.job1v2_exec_vnode), n=10)
# Check accounting_logs
self.match_accounting_log('S', jid, self.job1_iexec_host_esc,
self.job1_iexec_vnode_esc, "10gb", 13, 5,
self.job1_place,
self.job1_isel_esc)
self.match_accounting_log('s', jid, self.job1v2_exec_host_esc,
self.job1v2_exec_vnode_esc,
"6gb", 8, 3,
self.job1_place,
self.job1v2_sel_esc)
self.momA.log_match("Job;%s;task.+started, hostname" % (jid,),
n=10, max_attempts=60, interval=2, regexp=True)
self.momA.log_match("Job;%s;copy file request received" % (jid,),
n=10, max_attempts=10, interval=2)
# validate output
expected_out = """/var/spool/pbs/aux/%s
%s
%s
%s
FIB TESTS
pbsdsh -n 1 fib 37
%d
pbsdsh -n 2 fib 37
%d
fib 37
%d
HOSTNAME TESTS
pbsdsh -n 0 hostname
%s
pbsdsh -n 1 hostname
%s
pbsdsh -n 2 hostname
%s
PBS_NODEFILE tests
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
""" % (jid, self.momA.hostname, self.momD.hostname, self.momC.hostname,
self.fib37_value, self.fib37_value, self.fib37_value,
self.momA.shortname, self.momD.shortname, self.momC.shortname,
self.momA.hostname, self.momA.hostname, self.momA.shortname,
self.momD.hostname, self.momD.hostname, self.momD.shortname,
self.momC.hostname, self.momC.hostname, self.momC.shortname)
job_out = ""
with open(job_output_file, 'r') as fd:
job_out = fd.read()
self.assertEquals(job_out, expected_out)
# Re-check vnode_list[] parameter in execjob_launch hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;launch: found vnode_list[%s]" % (
jid, vn), n=10)
# Re-check vnode_list_fail[] parameter in execjob_launch hook
vnode_list_fail = [self.nB, self.nBv0, self.nBv1]
for vn in vnode_list_fail:
self.momA.log_match("Job;%s;launch: found vnode_list_fail[%s]" % (
jid, vn), n=10)
# Check result of pbs.event().job.release_nodes(keep_select) call
self.momA.log_match("Job;%s;launch: job.exec_vnode=%s" % (
jid, self.job1v2_exec_vnode), n=10)
self.momA.log_match("Job;%s;launch: job.schedselect=%s" % (
jid, self.job1v2_schedselect), n=10)
self.momA.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, self.job1_iexec_vnode), n=10)
self.momA.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, self.job1v2_exec_vnode), n=10)
# Check accounting_logs
self.match_accounting_log('S', jid, self.job1_iexec_host_esc,
self.job1_iexec_vnode_esc, "10gb", 13, 5,
self.job1_place,
self.job1_isel_esc)
self.match_accounting_log('s', jid, self.job1v2_exec_host_esc,
self.job1v2_exec_vnode_esc,
"6gb", 8, 3,
self.job1_place,
self.job1v2_sel_esc)
self.momA.log_match("Job;%s;task.+started, hostname" % (jid,),
n=10, max_attempts=60, interval=2, regexp=True)
self.momA.log_match("Job;%s;copy file request received" % (jid,),
n=10, max_attempts=10, interval=2)
# validate output
expected_out = """/var/spool/pbs/aux/%s
%s
%s
%s
FIB TESTS
pbsdsh -n 1 fib 37
%d
pbsdsh -n 2 fib 37
%d
fib 37
%d
HOSTNAME TESTS
pbsdsh -n 0 hostname
%s
pbsdsh -n 1 hostname
%s
pbsdsh -n 2 hostname
%s
PBS_NODEFILE tests
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
""" % (jid, self.momA.hostname, self.momD.hostname, self.momC.hostname,
self.fib37_value, self.fib37_value, self.fib37_value,
self.momA.shortname, self.momD.shortname, self.momC.shortname,
self.momA.hostname, self.momA.hostname, self.momA.shortname,
self.momD.hostname, self.momD.hostname, self.momD.shortname,
self.momC.hostname, self.momC.hostname, self.momC.shortname)
job_out = ""
with open(job_output_file, 'r') as fd:
job_out = fd.read()
self.assertEquals(job_out, expected_out)
def test_t19(self):
"""
Test: having a node tolerant job waiting for healthy nodes
to get issued a request to release nodes. The call
to pbs_release_nodes would fail given that the job
is not fully running yet, still figuring out which nodes
assigned are deemed good.
"""
# instantiate queuejob hook
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_begin hook
hook_event = "execjob_begin"
hook_name = "begin"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.begin_hook_body)
# instantiate execjob_launch hook
hook_event = "execjob_launch"
hook_name = "launch"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.launch_hook_body)
jid = self.create_and_submit_job('job1')
# job's substate is 41 (PRERUN) since it would be waiting for
# healthy nodes being a node failure tolerant job
self.server.expect(JOB, {'job_state': 'R',
'substate': 41,
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 13,
'Resource_List.nodect': 5,
'exec_host': self.job1_iexec_host,
'exec_vnode': self.job1_iexec_vnode,
'Resource_List.select': self.job1_iselect,
'Resource_List.site': self.job1_oselect,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_ischedselect},
id=jid, attrop=PTL_AND)
# Verify mom_logs
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+could not JOIN_JOB" % (
jid, self.hostB), n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+as job " % (jid, self.hostB) +
"is tolerant of node failures",
regexp=True, n=10)
# Run pbs_release_nodes on a job whose state is running but
# substate is under PRERUN
pbs_release_nodes_cmd = os.path.join(
self.server.pbs_conf['PBS_EXEC'], 'bin', 'pbs_release_nodes')
cmd = [pbs_release_nodes_cmd, '-j', jid, '-a']
ret = self.server.du.run_cmd(self.server.hostname, cmd,
runas=TEST_USER)
self.assertNotEqual(ret['rc'], 0)
self.assertTrue(ret['err'][0].startswith(
'pbs_release_nodes: Request invalid for state of job'))
def test_t20(self):
"""
Test: node failure tolerant job array, with multiple subjobs
starting at the same time, and job's assigned resources
are pruned to match up to the original select spec using
an execjob_prologue hook this time.
"""
# instantiate queuejob hook
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_begin hook
hook_event = "execjob_begin"
hook_name = "begin"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.begin_hook_body5)
# instantiate execjob_prologue hook
hook_event = "execjob_prologue"
hook_name = "prolo"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.prolo_hook_body4)
# First, turn off scheduling
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
jid = self.create_and_submit_job('jobA')
# Job gets queued and reflects the incremented values from queuejob
# hook
self.server.expect(JOB, {'job_state': 'Q',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '5gb',
'Resource_List.ncpus': 5,
'Resource_List.nodect': 5,
'Resource_List.select': self.jobA_iselect,
'Resource_List.site': self.jobA_oselect,
'Resource_List.place': self.jobA_place,
'schedselect': self.jobA_ischedselect},
id=jid, attrop=PTL_AND)
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
self.server.expect(JOB, {'job_state': 'B',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '5gb',
'Resource_List.ncpus': 5,
'Resource_List.nodect': 5,
'Resource_List.select': self.jobA_iselect,
'Resource_List.site': self.jobA_oselect,
'Resource_List.place': self.jobA_place,
'schedselect': self.jobA_ischedselect},
id=jid, attrop=PTL_AND)
self.server.expect(JOB, {'job_state=R': 3}, extend='t')
for idx in range(1, 4):
sjid = create_subjob_id(jid, idx)
if idx == 1:
iexec_host_esc = self.jobA_iexec_host1_esc
iexec_vnode = self.jobA_iexec_vnode1
iexec_vnode_esc = self.jobA_iexec_vnode1_esc
exec_host = self.jobA_exec_host1
exec_host_esc = self.jobA_exec_host1_esc
exec_vnode = self.jobA_exec_vnode1
exec_vnode_esc = self.jobA_exec_vnode1_esc
vnode_list = [self.nAv0, self.nB, self.nC,
self.nD, self.nE]
elif idx == 2:
iexec_host_esc = self.jobA_iexec_host2_esc
iexec_vnode = self.jobA_iexec_vnode2
iexec_vnode_esc = self.jobA_iexec_vnode2_esc
exec_host = self.jobA_exec_host2
exec_host_esc = self.jobA_exec_host2_esc
exec_vnode = self.jobA_exec_vnode2
exec_vnode_esc = self.jobA_exec_vnode2_esc
vnode_list = [self.nAv1, self.nBv0, self.nC,
self.nD, self.nEv0]
elif idx == 3:
iexec_host_esc = self.jobA_iexec_host3_esc
iexec_vnode = self.jobA_iexec_vnode3
iexec_vnode_esc = self.jobA_iexec_vnode3_esc
exec_host = self.jobA_exec_host3
exec_host_esc = self.jobA_exec_host3_esc
exec_vnode = self.jobA_exec_vnode3
exec_vnode_esc = self.jobA_exec_vnode3_esc
vnode_list = [self.nAv2, self.nBv1, self.nC,
self.nD, self.nE]
self.server.expect(JOB, {'job_state': 'R',
'substate': 41,
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '3gb',
'Resource_List.ncpus': 3,
'Resource_List.nodect': 3,
'exec_host': exec_host,
'exec_vnode': exec_vnode,
'Resource_List.select': self.jobA_select,
'Resource_List.site': self.jobA_oselect,
'Resource_List.place': self.jobA_place,
'schedselect': self.jobA_schedselect},
id=sjid, attrop=PTL_AND)
# Verify mom_logs
sjid_esc = sjid.replace(
"[", "\[").replace("]", "\]").replace("(", "\(").replace(
")", "\)").replace("+", "\+")
self.momA.log_match(
"Job;%s;job_start_error.+from node %s.+could not JOIN_JOB" % (
sjid_esc, self.hostC), n=10, regexp=True)
self.momA.log_match(
"Job;%s;ignoring error from %s.+as job " % (
sjid_esc, self.hostC) + "is tolerant of node failures",
regexp=True, n=10)
for vn in vnode_list:
self.momA.log_match("Job;%s;prolo: found vnode_list[%s]" % (
sjid, vn), n=10)
vnode_list_fail = [self.nC]
for vn in vnode_list_fail:
self.momA.log_match(
"Job;%s;prolo: found vnode_list_fail[%s]" % (
sjid, vn), n=10)
# Check result of pbs.event().job.release_nodes(keep_select)
# call
self.momA.log_match("Job;%s;prolo: job.exec_vnode=%s" % (
sjid, exec_vnode), n=10)
self.momA.log_match("Job;%s;prolo: job.schedselect=%s" % (
sjid, self.jobA_schedselect), n=10)
self.momA.log_match("Job;%s;pruned from exec_vnode=%s" % (
sjid, iexec_vnode), n=10)
self.momA.log_match("Job;%s;pruned to exec_vnode=%s" % (
sjid, exec_vnode), n=10)
# Check accounting_logs
self.match_accounting_log('S', sjid_esc, iexec_host_esc,
iexec_vnode_esc, "5gb", 5, 5,
self.jobA_place,
self.jobA_isel_esc)
self.match_accounting_log('s', sjid_esc, exec_host_esc,
exec_vnode_esc,
"3gb", 3, 3,
self.jobA_place,
self.jobA_sel_esc)
@timeout(400)
def test_t21(self):
"""
Test: radio silent moms causing the primary mom to not get
any acks from the sister moms executing prologue hooks.
After some 'job_launch_delay' time has passed, primary
mom will consider node hosts that have not acknowledged
the prologue hook execution as failed hosts, and will
not use their vnodes in the pruning of jobs.
"""
job_launch_delay = 120
c = {'$job_launch_delay': job_launch_delay}
self.momA.add_config(c)
# instantiate queuejob hook
hook_event = "queuejob"
hook_name = "qjob"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_prologue hook
hook_event = "execjob_prologue"
hook_name = "prolo"
a = {'event': hook_event, 'enabled': 'true', 'alarm': 60}
self.server.create_import_hook(hook_name, a, self.prolo_hook_body5)
# instantiate execjob_launch hook
hook_event = "execjob_launch"
hook_name = "launch"
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.launch_hook_body)
# First, turn off scheduling
a = {'scheduling': 'false'}
self.server.manager(MGR_CMD_SET, SERVER, a)
jid = self.create_and_submit_job('job1')
# Job gets queued and reflects the incremented values from queuejob
# hook
self.server.expect(JOB, {'job_state': 'Q',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '10gb',
'Resource_List.ncpus': 13,
'Resource_List.nodect': 5,
'Resource_List.select': self.job1_iselect,
'Resource_List.site': self.job1_oselect,
'Resource_List.place': self.job1_place,
'schedselect': self.job1_ischedselect},
id=jid, attrop=PTL_AND)
a = {'scheduling': 'true'}
self.server.manager(MGR_CMD_SET, SERVER, a)
self.momE.log_match(
"Job;%s;sleeping for 30 secs" % (jid, ), n=10)
# temporarily suspend momE, simulating a radio silent mom.
self.momE.signal("-STOP")
self.momC.log_match(
"Job;%s;sleeping for 30 secs" % (jid, ), n=10)
# temporarily suspend momC, simulating a radio silent mom.
self.momC.signal("-STOP")
# sleep as long as the time primary mom waits for all
# prologue hook acknowledgement from the sister moms
self.logger.info("sleeping for %d secs waiting for healthy nodes" % (
job_launch_delay,))
time.sleep(job_launch_delay)
# Job eventually launches reflecting the pruned back values
# to the original select spec
# There's a max_attempts=60 for it would take up to 60 seconds
# for primary mom to wait for the sisters to join
# (default $sister_join_job_alarm of 30 seconds) and to wait for
# sisters to execjob_prologue hooks (default $job_launch_delay
# value of 30 seconds)
self.server.expect(JOB, {'job_state': 'R',
'tolerate_node_failures': 'job_start',
'Resource_List.mem': '6gb',
'Resource_List.ncpus': 8,
'Resource_List.nodect': 3,
'Resource_List.select': self.job1v4_select,
'Resource_List.place': self.job1_place,
'schedselect': self.job1v4_schedselect,
'exec_host': self.job1v4_exec_host,
'exec_vnode': self.job1v4_exec_vnode},
id=jid, interval=1, attrop=PTL_AND, max_attempts=70)
thisjob = self.server.status(JOB, id=jid)
if thisjob:
job_output_file = thisjob[0]['Output_Path'].split(':')[1]
# Check various vnode status.
jobs_assn1 = "%s/0" % (jid,)
self.match_vnode_status([self.nAv0, self.nAv1, self.nB, self.nBv0],
'job-busy', jobs_assn1, 1, '1048576kb')
jobs_assn2 = "%s/0, %s/1" % (jid, jid)
self.match_vnode_status([self.nD], 'free', jobs_assn2,
2, '2097152kb')
self.match_vnode_status([self.nAv2, self.nBv1],
'job-busy', jobs_assn1, 1, '0kb')
self.match_vnode_status([self.nA, self.nAv3, self.nBv2, self.nBv3,
self.nC, self.nD, self.nEv1, self.nEv2,
self.nEv3, self.nE, self.nEv0], 'free')
# check server/queue counts
self.server.expect(SERVER, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
attrop=PTL_AND)
self.server.expect(QUEUE, {'resources_assigned.ncpus': 8,
'resources_assigned.mem': '6291456kb'},
id='workq', attrop=PTL_AND)
self.assertTrue(
self.pbs_nodefile_match_exec_host(jid, self.job1v4_exec_host))
# Check vnode_list[] parameter in execjob_launch hook
vnode_list = [self.nAv0, self.nAv1, self.nAv2,
self.nB, self.nBv0, self.nBv1,
self.nC, self.nD, self.nE, self.nEv0]
for vn in vnode_list:
self.momA.log_match("Job;%s;launch: found vnode_list[%s]" % (
jid, vn), n=10)
# Check vnode_list_fail[] parameter in execjob_launch hook
vnode_list_fail = [self.nC, self.nE, self.nEv0]
for vn in vnode_list_fail:
self.momA.log_match("Job;%s;launch: found vnode_list_fail[%s]" % (
jid, vn), n=10)
# Check result of pbs.event().job.release_nodes(keep_select) call
self.momA.log_match("Job;%s;launch: job.exec_vnode=%s" % (
jid, self.job1v4_exec_vnode), n=10)
self.momA.log_match("Job;%s;launch: job.schedselect=%s" % (
jid, self.job1v4_schedselect), n=10)
self.momA.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, self.job1_iexec_vnode), n=10)
self.momA.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, self.job1v4_exec_vnode), n=10)
# Check accounting_logs
self.match_accounting_log('S', jid, self.job1_iexec_host_esc,
self.job1_iexec_vnode_esc, "10gb", 13, 5,
self.job1_place,
self.job1_isel_esc)
self.match_accounting_log('s', jid, self.job1v4_exec_host_esc,
self.job1v4_exec_vnode_esc,
"6gb", 8, 3,
self.job1_place,
self.job1v4_sel_esc)
self.momA.log_match("Job;%s;task.+started, hostname" % (jid,),
n=10, max_attempts=60, interval=2, regexp=True)
self.momA.log_match("Job;%s;copy file request received" % (jid,),
n=10, max_attempts=10, interval=2)
# validate output
expected_out = """/var/spool/pbs/aux/%s
%s
%s
%s
FIB TESTS
pbsdsh -n 1 fib 37
%d
pbsdsh -n 2 fib 37
%d
fib 37
%d
HOSTNAME TESTS
pbsdsh -n 0 hostname
%s
pbsdsh -n 1 hostname
%s
pbsdsh -n 2 hostname
%s
PBS_NODEFILE tests
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
HOST=%s
pbs_tmrsh %s hostname
%s
""" % (jid, self.momA.hostname, self.momB.hostname, self.momD.hostname,
self.fib37_value, self.fib37_value, self.fib37_value,
self.momA.shortname, self.momB.shortname, self.momD.shortname,
self.momA.hostname, self.momA.hostname, self.momA.shortname,
self.momB.hostname, self.momB.hostname, self.momB.shortname,
self.momD.hostname, self.momD.hostname, self.momD.shortname)
job_out = ""
with open(job_output_file, 'r') as fd:
job_out = fd.read()
self.assertEquals(job_out, expected_out)