# coding: utf-8
# Copyright (C) 1994-2018 Altair Engineering, Inc.
# For more information, contact Altair at www.altair.com.
#
# This file is part of the PBS Professional ("PBS Pro") software.
#
# Open Source License Information:
#
# PBS Pro is free software. You can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# PBS Pro is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.
# See the GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
#
# Commercial License Information:
#
# For a copy of the commercial license terms and conditions,
# go to: (http://www.pbspro.com/UserArea/agreement.html)
# or contact the Altair Legal Department.
#
# Altair’s dual-license business model allows companies, individuals, and
# organizations to create proprietary derivative works of PBS Pro and
# distribute them - whether embedded or bundled with other software -
# under a commercial license agreement.
#
# Use of Altair’s trademarks, including but not limited to "PBS™",
# "PBS Professional®", and "PBS Pro™" and Altair’s logos is subject to Altair's
# trademark licensing policies.
from tests.functional import *
import glob
def have_swap():
"""
Returns 1 if swap space is not 0 otherwise returns 0
"""
tt = 0
with open(os.path.join(os.sep, 'proc', 'meminfo'), 'r') as fd:
for line in fd:
entry = line.split()
if ((entry[0] == 'SwapFree:') and (entry[1] != '0')):
tt = 1
return tt
def is_memsw_enabled(mem_path):
"""
Check if system has swapcontrol enabled, then return true
else return false
"""
# List all files and check if memsw files exists
for files in os.listdir(mem_path):
if 'memory.memsw' in files:
return 'true'
return 'false'
def systemd_escape(buf):
"""
Escape strings for usage in system unit names
Some distros don't provide the systemd-escape command
"""
if not isinstance(buf, basestring):
raise ValueError('Not a basetype string')
ret = ''
for i, char in enumerate(buf):
if i < 1 and char == '.':
ret += '\\x' + char.encode('hex')
continue
if char.isalnum() or char in '_.':
ret += char
elif char == '/':
ret += '-'
else:
hexval = char.encode('hex')
for j in range(0, len(hexval), 2):
ret += '\\x' + hexval[j:j + 2]
return ret
@tags('mom', 'multi_node')
class TestCgroupsHook(TestFunctional):
"""
This test suite targets Linux Cgroups hook functionality.
"""
def setUp(self):
TestFunctional.setUp(self)
# Some of the tests requires 2 nodes.
# Hence setting the values to default when no mom is specified
self.vntypenameA = 'no_cgroups'
self.vntypenameB = self.vntypenameA
self.iscray = False
self.noprefix = False
self.tempfile = []
if self.moms:
if len(self.moms) == 1:
self.momA = self.moms.values()[0]
self.momB = self.momA
if self.momA.is_cray():
self.iscray = True
self.hostA = self.momA.shortname
self.hostB = self.hostA
if self.iscray:
self.nodeA = self.get_hostname(self.momA.shortname)
self.nodeB = self.hostA
else:
self.nodeA = self.momA.shortname
self.nodeB = self.hostA
self.vntypenameA = self.get_vntype(self.hostA)
self.vntypenameB = self.vntypenameA
self.momA.delete_vnode_defs()
self.server.manager(MGR_CMD_DELETE, NODE, None, "")
self.server.manager(MGR_CMD_CREATE, NODE, id=self.hostA)
elif len(self.moms) == 2:
self.momA = self.moms.values()[0]
self.momB = self.moms.values()[1]
if self.momA.is_cray() or self.momB.is_cray():
self.iscray = True
self.hostA = self.momA.shortname
self.hostB = self.momB.shortname
if self.iscray:
self.nodeA = self.get_hostname(self.momA.shortname)
self.nodeB = self.get_hostname(self.momB.shortname)
else:
self.nodeA = self.momA.shortname
self.nodeB = self.momB.shortname
self.vntypenameA = self.get_vntype(self.hostA)
self.vntypenameB = self.get_vntype(self.hostB)
if self.momA.is_cray() or self.momB.is_cray():
self.iscray = True
self.momA.delete_vnode_defs()
self.momB.delete_vnode_defs()
self.server.manager(MGR_CMD_DELETE, NODE, None, "")
self.server.manager(MGR_CMD_CREATE, NODE, id=self.nodeA)
self.server.manager(MGR_CMD_CREATE, NODE, id=self.nodeB)
else:
self.skipTest('Tests require one or two MoMs, '
'use -p moms=:')
self.serverA = self.servers.values()[0].name
self.paths = self.get_paths()
if not (self.paths['cpuset'] and self.paths['memory']):
self.skipTest('cpuset or memory cgroup subsystem not mounted')
self.swapctl = is_memsw_enabled(self.paths['memsw'])
self.server.set_op_mode(PTL_CLI)
self.server.cleanup_jobs(extend='force')
if not self.iscray:
self.remove_vntype()
self.eatmem_script = """
import sys
import time
MB = 2 ** 20
iterations = 1
chunkSizeMb = 1
sleeptime = 0
if (len(sys.argv) > 1):
iterations = int(sys.argv[1])
if (len(sys.argv) > 2):
chunkSizeMb = int(sys.argv[2])
if (len(sys.argv) > 3):
sleeptime = int(sys.argv[3])
if (iterations < 1):
print('Iteration count must be greater than zero.')
exit(1)
if (chunkSizeMb < 1):
print('Chunk size must be greater than zero.')
exit(1)
totalSizeMb = chunkSizeMb * iterations
print('Allocating %d chunk(s) of size %dMB. (%dMB total)' %
(iterations, chunkSizeMb, totalSizeMb))
buf = ''
for i in range(iterations):
print('allocating %dMB' % ((i + 1) * chunkSizeMb))
buf += ('#' * MB * chunkSizeMb)
if sleeptime > 0:
time.sleep(sleeptime)
"""
self.eatmem_job1 = \
'#PBS -joe\n' \
'#PBS -S /bin/bash\n' \
'sleep 4\n' \
'python - 80 10 10 </dev/null ; then
echo "Disabled cgroup subsystems are populated with the job id"
fi
done
}
jobnum=${PBS_JOBID%%.*}
cpuset_base=`grep cgroup /proc/mounts | grep cpuset | cut -d' ' -f2`
if [ -d "$cpuset_base/propbs" ]; then
cpuset_job="$cpuset_base/propbs/$PBS_JOBID"
else
cpuset_job="$cpuset_base/propbs.slice/propbs-${jobnum}.*.slice"
fi
cpuacct_base=`grep cgroup /proc/mounts | grep cpuacct | cut -d' ' -f2`
if [ -d "$cpuacct_base/propbs" ]; then
cpuacct_job="$cpuacct_base/propbs/$PBS_JOBID"
else
cpuacct_job="$cpuacct_base/propbs.slice/propbs-${jobnum}.*.slice"
fi
memory_base=`grep cgroup /proc/mounts | grep memory | cut -d' ' -f2`
if [ -d "$memory_base/propbs" ]; then
memory_job="$memory_base/propbs/$PBS_JOBID"
else
memory_job="$memory_base/propbs.slice/propbs-${jobnum}.*.slice"
fi
devices_base=`grep cgroup /proc/mounts | grep devices | cut -d' ' -f2`
if [ -d "$devices_base/propbs" ]; then
devices_job="$devices_base/propbs/$PBS_JOBID"
else
devices_job="$devices_base/propbs.slice/propbs-${jobnum}.*.slice"
fi
echo ====
ls -l $devices_base
echo ====
ls -l $devices_job
echo ====
if [ -d $devices_job ]; then
device_list=`cat $devices_job/devices.list`
echo "${device_list}"
sysd=`systemctl --version | grep systemd | awk '{print $2}'`
if [ "$sysd" -ge 205 ]; then
if [ -d $cpuacct_job ]; then
check_file_diff $cpuacct_base/propbs.slice/ $cpuacct_job
fi
if [ -d $cpuset_job ]; then
check_file_diff $cpuset_base/propbs.slice/ $cpuset_job
fi
if [ -d $memory_job ] ; then
check_file_diff $memory_base/propbs.slice/ $memory_job
fi
else
if [ -d $cpuacct_job -o -d $cpuset_job -o -d $memory_job ]; then
echo "Disabled cgroup subsystems are populated with the job id"
fi
fi
else
echo "Devices directory should be populated"
fi
sleep 10
"""
self.check_gpu_script = """#!/bin/bash
#PBS -joe
jobnum=${PBS_JOBID%%.*}
devices_base=`grep cgroup /proc/mounts | grep devices | cut -d' ' -f2`
if [ -d "$devices_base/propbs" ]; then
devices_job="$devices_base/propbs/$PBS_JOBID"
else
devices_job="$devices_base/propbs.slice/propbs-${jobnum}.*.slice"
fi
device_list=`cat $devices_job/devices.list`
grep "195" $devices_job/devices.list
echo "There are `nvidia-smi -q -x | grep "GPU" | wc -l` GPUs"
sleep 10
"""
self.sleep15_job = """#!/bin/bash
#PBS -joe
sleep 15
"""
self.sleep5_job = """#!/bin/bash
#PBS -joe
sleep 5
"""
self.eat_cpu_script = """#!/bin/bash
#PBS -joe
for i in 1 2 3 4; do while : ; do : ; done & done
"""
self.job_scr2 = """#!/bin/bash
#PBS -l select=host=%s:ncpus=1+ncpus=4:mem=2gb
#PBS -l place=vscatter
#PBS -W umask=022
#PBS -koe
echo "$PBS_NODEFILE"
cat $PBS_NODEFILE
sleep 300
""" % self.hostB
self.job_scr3 = """#!/bin/bash
#PBS -l select=2:ncpus=4:mem=2gb
#PBS -l place=pack
#PBS -W umask=022
#PBS -W tolerate_node_failures=job_start
#PBS -koe
echo "$PBS_NODEFILE"
cat $PBS_NODEFILE
sleep 300
"""
self.cfg0 = """{
"cgroup_prefix" : "pbspro",
"exclude_hosts" : [],
"exclude_vntypes" : [],
"run_only_on_hosts" : [],
"periodic_resc_update" : false,
"vnode_per_numa_node" : false,
"online_offlined_nodes" : false,
"use_hyperthreads" : false,
"cgroup" : {
"cpuacct" : {
"enabled" : false
},
"cpuset" : {
"enabled" : false
},
"devices" : {
"enabled" : false
},
"hugetlb" : {
"enabled" : false
},
"memory" : {
"enabled" : false
},
"memsw" : {
"enabled" : false
}
}
}
"""
self.cfg1 = """{
"cgroup_prefix" : "pbspro",
"exclude_hosts" : [%s],
"exclude_vntypes" : [%s],
"run_only_on_hosts" : [%s],
"periodic_resc_update" : true,
"vnode_per_numa_node" : false,
"online_offlined_nodes" : true,
"use_hyperthreads" : false,
"cgroup":
{
"cpuacct":
{
"enabled" : true,
"exclude_hosts" : [],
"exclude_vntypes" : []
},
"cpuset":
{
"enabled" : true,
"exclude_hosts" : [%s],
"exclude_vntypes" : []
},
"devices":
{
"enabled" : false
},
"hugetlb":
{
"enabled" : false
},
"memory":
{
"enabled" : true,
"exclude_hosts" : [],
"exclude_vntypes" : [],
"soft_limit" : false,
"default" : "96MB",
"reserve_percent" : "0",
"reserve_amount" : "0MB"
},
"memsw":
{
"enabled" : %s,
"exclude_hosts" : [],
"exclude_vntypes" : [],
"default" : "96MB",
"reserve_percent" : "0",
"reserve_amount" : "128MB"
}
}
}
"""
self.cfg2 = """{
"cgroup_prefix" : "propbs",
"exclude_hosts" : [],
"exclude_vntypes" : [],
"run_only_on_hosts" : [],
"periodic_resc_update" : false,
"vnode_per_numa_node" : false,
"online_offlined_nodes" : false,
"use_hyperthreads" : false,
"cgroup":
{
"cpuacct":
{
"enabled" : false
},
"cpuset":
{
"enabled" : false
},
"devices":
{
"enabled" : true,
"exclude_hosts" : [],
"exclude_vntypes" : [],
"allow" : [
"b *:* rwm",
["console","rwm"],
["tty0","rwm", "*"],
"c 1:* rwm",
"c 10:* rwm"
]
},
"hugetlb":
{
"enabled" : false
},
"memory":
{
"enabled" : false
},
"memsw":
{
"enabled" : false
}
}
}
"""
self.cfg3 = """{
"cgroup_prefix" : "pbspro",
"exclude_hosts" : [],
"exclude_vntypes" : [%s],
"run_only_on_hosts" : [],
"periodic_resc_update" : true,
"vnode_per_numa_node" : false,
"online_offlined_nodes" : true,
"use_hyperthreads" : true,
"cgroup":
{
"cpuacct":
{
"enabled" : true,
"exclude_hosts" : [],
"exclude_vntypes" : []
},
"cpuset":
{
"enabled" : true,
"exclude_hosts" : [],
"exclude_vntypes" : [%s]
},
"devices":
{
"enabled" : false
},
"hugetlb":
{
"enabled" : false
},
"memory":
{
"enabled" : true,
"default" : "96MB",
"reserve_amount" : "50MB",
"exclude_hosts" : [],
"exclude_vntypes" : [%s]
},
"memsw":
{
"enabled" : %s,
"default" : "96MB",
"reserve_amount" : "45MB",
"exclude_hosts" : [],
"exclude_vntypes" : [%s]
}
}
}
"""
self.cfg4 = """{
"cgroup_prefix" : "pbspro",
"exclude_hosts" : [],
"exclude_vntypes" : ["no_cgroups"],
"run_only_on_hosts" : [],
"periodic_resc_update" : true,
"vnode_per_numa_node" : false,
"online_offlined_nodes" : true,
"use_hyperthreads" : false,
"cgroup":
{
"cpuacct":
{
"enabled" : true,
"exclude_hosts" : [],
"exclude_vntypes" : []
},
"cpuset":
{
"enabled" : true,
"exclude_hosts" : [],
"exclude_vntypes" : ["no_cgroups_cpus"]
},
"devices":
{
"enabled" : false
},
"hugetlb":
{
"enabled" : false
},
"memory":
{
"enabled" : true,
"default" : "96MB",
"reserve_amount" : "100MB",
"exclude_hosts" : [],
"exclude_vntypes" : ["no_cgroups_mem"]
},
"memsw":
{
"enabled" : %s,
"default" : "96MB",
"reserve_amount" : "90MB",
"exclude_hosts" : [],
"exclude_vntypes" : []
}
}
}
"""
self.cfg5 = """{
"vnode_per_numa_node" : %s,
"cgroup" : {
"cpuset" : {
"enabled" : true,
"exclude_cpus" : [%s],
"mem_fences" : %s,
"mem_hardwall" : %s,
"memory_spread_page" : %s
},
"memory" : {
"enabled" : true
},
"memsw" : {
"enabled" : %s
}
}
}
"""
self.cfg6 = """{
"vnode_per_numa_node" : false,
"cgroup" : {
"memory":
{
"enabled" : true,
"default" : "64MB",
"reserve_percent" : "0",
"reserve_amount" : "0MB"
},
"memsw":
{
"enabled" : %s,
"default" : "64MB",
"reserve_percent" : "0",
"reserve_amount" : "0MB"
}
}
}
"""
self.cfg7 = """{
"cgroup_prefix" : "pbspro",
"exclude_hosts" : [],
"exclude_vntypes" : [],
"run_only_on_hosts" : [],
"periodic_resc_update" : true,
"vnode_per_numa_node" : true,
"online_offlined_nodes" : true,
"use_hyperthreads" : false,
"cgroup" : {
"cpuacct" : {
"enabled" : true,
"exclude_hosts" : [],
"exclude_vntypes" : []
},
"cpuset" : {
"enabled" : true,
"exclude_cpus" : [],
"exclude_hosts" : [],
"exclude_vntypes" : []
},
"devices" : {
"enabled" : false
},
"hugetlb" : {
"enabled" : false
},
"memory" : {
"enabled" : true,
"exclude_hosts" : [],
"exclude_vntypes" : [],
"default" : "256MB",
"reserve_amount" : "64MB"
},
"memsw" : {
"enabled" : true,
"exclude_hosts" : [],
"exclude_vntypes" : [],
"default" : "256MB",
"reserve_amount" : "64MB"
}
}
}
"""
Job.dflt_attributes[ATTR_k] = 'oe'
# Increase the log level
a = {'log_events': '4095'}
self.server.manager(MGR_CMD_SET, SERVER, a, expect=True)
# Configure the scheduler to schedule using vmem
a = {'resources': 'ncpus,mem,vmem,host,vnode,ngpus,nmics'}
self.scheduler.set_sched_config(a)
# Configure the mom
c = {'$logevent': '0xffffffff', '$clienthost': self.server.name,
'$min_check_poll': 8, '$max_check_poll': 12}
self.momA.add_config(c)
if self.hostA != self.hostB:
self.momB.add_config(c)
# Create resource as root
attr = {'type': 'long', 'flag': 'nh'}
self.server.manager(MGR_CMD_CREATE, RSC, attr, id='nmics',
runas=ROOT_USER, logerr=False)
self.server.manager(MGR_CMD_CREATE, RSC, attr, id='ngpus',
runas=ROOT_USER, logerr=False)
# Import the hook
self.hook_name = 'pbs_cgroups'
self.hook_file = os.path.join(self.server.pbs_conf['PBS_EXEC'],
'lib',
'python',
'altair',
'pbs_hooks',
'pbs_cgroups.PY')
self.load_hook(self.hook_file)
events = '"execjob_begin,execjob_launch,execjob_attach,'
events += 'execjob_epilogue,execjob_end,exechost_startup,'
events += 'exechost_periodic,execjob_resize"'
# Enable the cgroups hook
conf = {'enabled': 'True', 'freq': 10, 'alarm': 30, 'event': events}
self.server.manager(MGR_CMD_SET, HOOK, conf, self.hook_name)
# Restart mom so exechost_startup hook is run
self.momA.signal('-HUP')
if self.hostA != self.hostB:
self.momB.signal('-HUP')
self.logger.info('vntype set for %s is %s' %
(self.momA, self.vntypenameA))
self.logger.info('vntype set for %s is %s' %
(self.momB, self.vntypenameB))
# queuejob hook
self.qjob_hook_body = """
import pbs
e=pbs.event()
pbs.logmsg(pbs.LOG_DEBUG, "queuejob hook executed")
# Save current select spec in resource 'site'
e.job.Resource_List["site"] = str(e.job.Resource_List["select"])
# Add 1 chunk to each chunk (except the first chunk) in the job's select s
new_select = e.job.Resource_List["select"].increment_chunks(1)
e.job.Resource_List["select"] = new_select
# Make job tolerate node failures that occur only during start.
e.job.tolerate_node_failures = "job_start"
"""
# launch hook
self.launch_hook_body = """
import pbs
import time
e=pbs.event()
pbs.logmsg(pbs.LOG_DEBUG, "Executing launch")
# print out the vnode_list[] values
for vn in e.vnode_list:
v = e.vnode_list[vn]
pbs.logjobmsg(e.job.id, "launch: found vnode_list[" + v.name + "]")
# print out the vnode_list_fail[] values:
for vn in e.vnode_list_fail:
v = e.vnode_list_fail[vn]
pbs.logjobmsg(e.job.id, "launch: found vnode_list_fail[" + v.name + "]")
if e.job.in_ms_mom():
pj = e.job.release_nodes(keep_select=%s)
if pj is None:
e.job.Hold_Types = pbs.hold_types("s")
e.job.rerun()
e.reject("unsuccessful at LAUNCH")
"""
# resize hook
self.resize_hook_body = """
import pbs
e=pbs.event()
if %s e.job.in_ms_mom():
e.reject("Cannot resize the job")
"""
def get_paths(self):
"""
Returns a dictionary containing the location where each cgroup
is mounted.
"""
paths = {'pids': None,
'blkio': None,
'systemd': None,
'cpuset': None,
'memory': None,
'memsw': None,
'cpuacct': None,
'devices': None}
# Loop through the mounts and collect the ones for cgroups
with open(os.path.join(os.sep, 'proc', 'mounts'), 'r') as fd:
for line in fd:
entries = line.split()
if entries[2] != 'cgroup':
continue
flags = entries[3].split(',')
if 'noprefix' in flags:
self.noprefix = True
subsys = os.path.basename(entries[1])
paths[subsys] = entries[1]
if 'memory' in flags:
paths['memsw'] = paths[subsys]
paths['memory'] = paths[subsys]
if 'cpuacct' in flags:
paths['cpuacct'] = paths[subsys]
if 'devices' in flags:
paths['devices'] = paths[subsys]
return paths
def is_dir(self, cpath, host):
"""
Returns True if path exists otherwise false
"""
for _ in range(5):
rv = self.du.isdir(hostname=host, path=cpath, sudo=True)
if rv:
return True
time.sleep(0.1)
return False
def is_file(self, cpath, host):
"""
Returns True if path exists otherwise false
"""
for _ in range(5):
rv = self.du.isfile(hostname=host, path=cpath, sudo=True)
if rv:
return True
time.sleep(0.1)
return False
def get_cgroup_job_dir(self, subsys, jobid, host):
"""
Returns path of subsystem
"""
basedir = self.paths[subsys]
if self.du.isdir(hostname=host, path=os.path.join(basedir, 'pbspro')):
return os.path.join(basedir, 'pbspro', jobid)
else:
return os.path.join(basedir, 'pbspro.slice',
'pbspro-%s.slice' % systemd_escape(jobid))
def load_hook(self, filename):
"""
Import and enable a hook pointed to by the URL specified.
"""
try:
with open(filename, 'r') as fd:
script = fd.read()
except IOError:
self.assertTrue(False, 'Failed to open hook file %s' % filename)
events = '"execjob_begin,execjob_launch,execjob_attach,'
events += 'execjob_epilogue,execjob_end,exechost_startup,'
events += 'exechost_periodic"'
a = {'enabled': 'True',
'freq': '10',
'event': events}
# Sometimes the deletion of the old hook is still pending
failed = True
for _ in range(5):
try:
self.server.create_import_hook(self.hook_name, a, script,
overwrite=True)
except Exception:
time.sleep(2)
else:
failed = False
break
if failed:
self.skipTest('pbs_cgroups_hook: failed to load hook')
# Add the configuration
self.load_config(self.cfg0)
def load_config(self, cfg):
"""
Create a hook configuration file with the provided contents.
"""
fn = self.du.create_temp_file(hostname=self.serverA, body=cfg)
self.tempfile.append(fn)
a = {'content-type': 'application/x-config',
'content-encoding': 'default',
'input-file': fn}
self.server.manager(MGR_CMD_IMPORT, HOOK, a, self.hook_name)
self.momA.log_match('pbs_cgroups.CF;copy hook-related '
'file request received',
starttime=self.server.ctime)
self.logger.info('Current config: %s' % cfg)
# Restart MoM to work around PP-993
self.momA.restart()
if self.hostA != self.hostB:
self.momB.restart()
def set_vntype(self, host, typestring='myvntype'):
"""
Set the vnode type for the local mom.
"""
pbs_home = self.server.pbs_conf['PBS_HOME']
vntype_file = os.path.join(pbs_home, 'mom_priv', 'vntype')
self.logger.info('Setting vntype to %s in %s on mom %s' %
(typestring, vntype_file, host))
localhost = socket.gethostname()
fn = self.du.create_temp_file(hostname=localhost, body=typestring)
self.tempfile.append(fn)
ret = self.du.run_copy(hosts=host, src=fn,
dest=vntype_file, sudo=True, uid='root',
gid='root', mode=0644)
if ret['rc'] != 0:
self.skipTest('pbs_cgroups_hook: failed to set vntype')
def remove_vntype(self):
"""
Unset the vnode type for the local mom.
"""
pbs_home = self.server.pbs_conf['PBS_HOME']
vntype_file = os.path.join(pbs_home, 'mom_priv', 'vntype')
self.logger.info('Deleting vntype files from moms')
ret = self.du.rm(hostname=self.hostA, path=vntype_file,
force=True, sudo=True, logerr=False)
if not ret:
self.skipTest('pbs_cgroups_hook: failed to remove vntype')
if self.hostA != self.hostB:
ret = self.du.rm(hostname=self.hostB, path=vntype_file,
force=True, sudo=True, logerr=False)
if not ret:
self.skipTest('pbs_cgroups_hook: failed to remove vntype')
def get_vntype(self, host):
"""
Get the vntype if it exists for example on cray
"""
vntype = 'no_cgroups'
pbs_home = self.server.pbs_conf['PBS_HOME']
vntype_f = os.path.join(pbs_home, 'mom_priv', 'vntype')
self.logger.info('Reading the vntype value for mom %s' % host)
if self.du.isfile(hostname=host, path=vntype_f):
output = self.du.cat(hostname=host, filename=vntype_f, sudo=True)
vntype = output['out'][0]
return vntype
def wait_and_read_file(self, host, filename=''):
"""
Make several attempts to read a file and return its contents
"""
self.logger.info('Reading file: %s on host: %s' % (filename, host))
if not filename:
raise ValueError('Invalid filename')
for _ in range(30):
if self.du.isfile(hostname=host, path=filename):
break
time.sleep(0.5)
self.assertTrue(self.du.isfile(hostname=host, path=filename),
'File %s not found on host %s' % (filename, host))
# Wait for output to flush
time.sleep(2)
output = self.du.cat(hostname=host, filename=filename, sudo=True)
return output['out']
def get_hostname(self, host):
"""
get hostname of the mom.
This is needed since cgroups logs hostname not mom name
"""
cmd = 'hostname'
rv = self.du.run_cmd(hosts=host, cmd=cmd)
ret = rv['out'][0].split('.')[0]
return ret
def get_host_names(self, host):
"""
get shortname and hostname of the mom. This is needed
for some systems where hostname and shortname is different.
"""
cmd1 = 'hostname -s'
rv1 = self.du.run_cmd(hosts=host, cmd=cmd1)
host2 = self.get_hostname(host)
hostlist = '"' + host2 + '"'
moms = [hostlist]
mlog = ["'" + host2 + "'"]
# if shortname and hostname is not same then construct a
# list including both to be passed to cgroups hook
if (str(rv1['out'][0]) != host2):
moms.append('"' + str(rv1['out'][0]) + '"')
mlog.append("'" + str(rv1['out'][0]) + "'")
if len(moms) > 1:
mom1 = ','.join(moms)
log1 = ', '.join(mlog)
else:
mom1 = '"' + host2 + '"'
log1 = "'" + host2 + "'"
return mom1, log1
def test_cgroup_vntype_excluded(self):
"""
Test to verify that cgroups are not enforced on nodes
that have an exclude vntype file set
"""
# Test requires 2 nodes
if len(self.moms) != 2:
self.skipTest('Test requires two Moms as input, '
'use -p moms=')
name = 'CGROUP8'
if self.vntypenameA == 'no_cgroups':
self.logger.info('Adding vntype %s to mom %s ' %
(self.vntypenameA, self.momA))
self.set_vntype(typestring=self.vntypenameA, host=self.hostA)
self.load_config(self.cfg1 %
('', '"' + self.vntypenameA + '"',
'', '', self.swapctl))
a = {'Resource_List.select': '1:ncpus=1:mem=300mb:host=%s' %
self.hostA, ATTR_N: name}
j = Job(TEST_USER, attrs=a)
j.create_script(self.sleep15_job)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, ATTR_o, jid)
o = j.attributes[ATTR_o]
self.tempfile.append(o)
self.logger.info('memory subsystem is at location %s' %
self.paths['memory'])
cpath = self.get_cgroup_job_dir('memory', jid, self.hostA)
self.assertFalse(self.is_dir(cpath, self.hostA))
self.momA.log_match("%s is in the excluded vnode type list: ['%s']"
% (self.vntypenameA, self.vntypenameA),
starttime=self.server.ctime)
self.logger.info('vntypes on both hosts are: %s and %s'
% (self.vntypenameA, self.vntypenameB))
if self.vntypenameB == self.vntypenameA:
self.logger.info('Skipping the second part of this test '
'since hostB also has same vntype value')
return
a = {'Resource_List.select': '1:ncpus=1:mem=300mb:host=%s' %
self.hostB, ATTR_N: name}
j1 = Job(TEST_USER, attrs=a)
j1.create_script(self.sleep15_job)
jid2 = self.server.submit(j1)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid2)
self.server.status(JOB, ATTR_o, jid2)
o = j1.attributes[ATTR_o]
self.tempfile.append(o)
cpath = self.get_cgroup_job_dir('memory', jid2, self.hostB)
self.assertTrue(self.is_dir(cpath, self.hostB))
def test_cgroup_host_excluded(self):
"""
Test to verify that cgroups are not enforced on nodes
that have the exclude_hosts set
"""
# Test requires 2 nodes
if len(self.moms) != 2:
self.skipTest('Test requires two Moms as input, '
'use -p moms=')
name = 'CGROUP9'
mom, log = self.get_host_names(self.hostA)
self.load_config(self.cfg1 % ('%s' % mom, '', '', '', self.swapctl))
a = {'Resource_List.select': '1:ncpus=1:mem=300mb:host=%s' %
self.hostA, ATTR_N: name}
j = Job(TEST_USER, attrs=a)
j.create_script(self.sleep15_job)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, ATTR_o, jid)
o = j.attributes[ATTR_o]
self.tempfile.append(o)
cpath = self.get_cgroup_job_dir('memory', jid, self.hostA)
self.assertFalse(self.is_dir(cpath, self.hostA))
host = self.get_hostname(self.hostA)
self.momA.log_match('%s is in the excluded host list: [%s]' %
(host, log), starttime=self.server.ctime)
a = {'Resource_List.select': '1:ncpus=1:mem=300mb:host=%s' %
self.hostB, ATTR_N: name}
j = Job(TEST_USER, attrs=a)
j.create_script(self.sleep15_job)
jid2 = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid2)
self.server.status(JOB, ATTR_o, jid2)
o = j.attributes[ATTR_o]
self.tempfile.append(o)
cpath = self.get_cgroup_job_dir('memory', jid2, self.hostB)
self.assertTrue(self.is_dir(cpath, self.hostB))
def test_cgroup_exclude_vntype_mem(self):
"""
Test to verify that cgroups are not enforced on nodes
that have an exclude vntype file set
"""
# Test requires 2 nodes
if len(self.moms) != 2:
self.skipTest('Test requires two Moms as input, '
'use -p moms=')
name = 'CGROUP12'
if self.vntypenameA == 'no_cgroups':
self.logger.info('Adding vntype %s to mom %s' %
(self.vntypenameA, self.momA))
self.set_vntype(typestring='no_cgroups', host=self.hostA)
self.load_config(self.cfg3 % ('', '', '"' + self.vntypenameA + '"',
self.swapctl,
'"' + self.vntypenameA + '"'))
a = {'Resource_List.select': '1:ncpus=1:mem=100mb:host=%s'
% self.hostA, ATTR_N: name}
j = Job(TEST_USER, attrs=a)
j.create_script(self.sleep15_job)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, ATTR_o, jid)
o = j.attributes[ATTR_o]
self.tempfile.append(o)
self.momA.log_match('cgroup excluded for subsystem memory '
'on vnode type %s' % self.vntypenameA,
starttime=self.server.ctime)
self.logger.info('vntype values for each hosts are: %s and %s'
% (self.vntypenameA, self.vntypenameB))
if self.vntypenameB == self.vntypenameA:
self.logger.info('Skipping the second part of this test '
'since hostB also has same vntype value')
return
a = {'Resource_List.select': '1:ncpus=1:mem=100mb:host=%s' %
self.hostB, ATTR_N: name}
j1 = Job(TEST_USER, attrs=a)
j1.create_script(self.sleep15_job)
jid2 = self.server.submit(j1)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid2)
self.server.status(JOB, ATTR_o, jid2)
o = j1.attributes[ATTR_o]
self.tempfile.append(o)
cpath = self.get_cgroup_job_dir('memory', jid2, self.hostB)
self.assertTrue(self.is_dir(cpath, self.hostB))
@timeout(300)
def test_cgroup_periodic_update_check_values(self):
"""
Test to verify that cgroups are reporting usage for cput and mem
"""
name = 'CGROUP13'
conf = {'freq': 2}
self.server.manager(MGR_CMD_SET, HOOK, conf, self.hook_name)
self.load_config(self.cfg3 % ('', '', '', self.swapctl, ''))
a = {'Resource_List.select': '1:ncpus=1:mem=500mb:host=%s' %
self.hostA, ATTR_N: name}
j = Job(TEST_USER, attrs=a)
j.create_script(self.eatmem_job3)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, ATTR_o, jid)
o = j.attributes[ATTR_o]
self.tempfile.append(o)
# Scouring the logs for initial values takes too long
resc_list = ['resources_used.cput', 'resources_used.mem']
if self.swapctl == 'true':
resc_list.append('resources_used.vmem')
qstat = self.server.status(JOB, resc_list, id=jid)
cput = qstat[0]['resources_used.cput']
self.assertEqual(cput, '00:00:00')
mem = qstat[0]['resources_used.mem']
match = re.match(r'(\d+)kb', mem)
self.assertFalse(match is None)
usage = int(match.groups()[0])
self.assertGreater(30000, usage)
if self.swapctl == 'true':
vmem = qstat[0]['resources_used.vmem']
match = re.match(r'(\d+)kb', vmem)
self.assertFalse(match is None)
usage = int(match.groups()[0])
self.assertGreater(30000, usage)
err_msg = "Unexpected error in pbs_cgroups " + \
"handling exechost_periodic event: TypeError"
self.mom.log_match(err_msg, max_attempts=3,
interval=1, n=100, existence=False)
# Allow some time to pass for values to be updated
begin = int(time.time())
self.logger.info('Waiting for periodic hook to update usage data.')
time.sleep(15)
if self.paths['cpuacct']:
lines = self.momA.log_match(
'%s;update_job_usage: CPU usage:' %
jid, allmatch=True, starttime=begin)
usage = 0.0
for line in lines:
match = re.search(r'CPU usage: ([0-9.]+) secs', line[1])
if not match:
continue
usage = float(match.groups()[0])
if usage > 1.0:
break
self.assertGreater(usage, 1.0)
if self.paths['memory']:
lines = self.momA.log_match(
'%s;update_job_usage: Memory usage: mem=' % jid,
allmatch=True, max_attempts=5, starttime=begin)
usage = 0
for line in lines:
match = re.search(r'mem=(\d+)kb', line[1])
if not match:
continue
usage = int(match.groups()[0])
if usage > 400000:
break
self.assertGreater(usage, 400000, 'Max memory usage: %dkb' % usage)
if self.swapctl == 'true':
lines = self.momA.log_match(
'%s;update_job_usage: Memory usage: vmem=' % jid,
allmatch=True, max_attempts=5, starttime=begin)
usage = 0
for line in lines:
match = re.search(r'vmem=(\d+)kb', line[1])
if not match:
continue
usage = int(match.groups()[0])
if usage > 400000:
break
self.assertGreater(usage, 400000)
def test_cgroup_cpuset_and_memory(self):
"""
Test to verify that the job cgroup is created correctly
Check to see that cpuset.cpus=0, cpuset.mems=0 and that
memory.limit_in_bytes = 314572800
"""
name = 'CGROUP1'
self.load_config(self.cfg3 % ('', '', '', self.swapctl, ''))
a = {'Resource_List.select': '1:ncpus=1:mem=300mb',
ATTR_N: name, ATTR_k: 'oe'}
j = Job(TEST_USER, attrs=a)
j.create_script(self.cpuset_mem_script)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, [ATTR_o, 'exec_host'], jid)
filename = j.attributes[ATTR_o]
self.tempfile.append(filename)
ehost = j.attributes['exec_host']
tmp_file = filename.split(':')[1]
tmp_host = ehost.split('/')[0]
tmp_out = self.wait_and_read_file(filename=tmp_file, host=tmp_host)
self.logger.info("Job output is %s\n" % tmp_out)
self.assertTrue(jid in tmp_out)
self.logger.info('job dir check passed')
if self.paths['cpuacct']:
self.assertTrue('CpuIDs=0' in tmp_out)
self.logger.info('CpuIDs check passed')
if self.paths['memory']:
self.assertTrue('MemorySocket=0' in tmp_out)
self.logger.info('MemorySocket check passed')
if self.swapctl == 'true':
self.assertTrue('MemoryLimit=314572800' in tmp_out)
self.logger.info('MemoryLimit check passed')
def test_cgroup_cpuset_and_memsw(self):
"""
Test to verify that the job cgroup is created correctly
using the default memory and vmem
Check to see that cpuset.cpus=0, cpuset.mems=0 and that
memory.limit_in_bytes = 268435456
memory.memsw.limit_in_bytes = 268435456
"""
name = 'CGROUP2'
self.load_config(self.cfg3 % ('', '', '', self.swapctl, ''))
a = {'Resource_List.select': '1:ncpus=1:host=%s' %
self.hostA, ATTR_N: name}
j = Job(TEST_USER, attrs=a)
j.create_script(self.cpuset_mem_script)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, [ATTR_o, 'exec_host'], jid)
filename = j.attributes[ATTR_o]
self.tempfile.append(filename)
ehost = j.attributes['exec_host']
tmp_file = filename.split(':')[1]
tmp_host = ehost.split('/')[0]
tmp_out = self.wait_and_read_file(filename=tmp_file, host=tmp_host)
self.logger.info("Job output is %s\n" % tmp_out)
self.assertTrue(jid in tmp_out)
self.logger.info('job dir check passed')
if self.paths['cpuacct']:
self.assertTrue('CpuIDs=0' in tmp_out)
self.logger.info('CpuIDs check passed')
if self.paths['memory']:
self.assertTrue('MemorySocket=0' in tmp_out)
self.logger.info('MemorySocket check passed')
if self.swapctl == 'true':
self.assertTrue('MemoryLimit=100663296' in tmp_out)
self.assertTrue('MemswLimit=100663296' in tmp_out)
self.logger.info('MemoryLimit check passed')
def test_cgroup_prefix_and_devices(self):
"""
Test to verify that the cgroup prefix is set to pbs and that
only the devices subsystem is enabled with the correct devices
allowed
"""
if not self.paths['devices']:
self.skipTest('Skipping test since no devices subsystem defined')
name = 'CGROUP3'
self.load_config(self.cfg2)
a = {'Resource_List.select': '1:ncpus=1:mem=300mb', ATTR_N: name}
j = Job(TEST_USER, attrs=a)
j.create_script(self.check_dirs_script)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, [ATTR_o, 'exec_host'], jid)
filename = j.attributes[ATTR_o]
self.tempfile.append(filename)
ehost = j.attributes['exec_host']
tmp_file = filename.split(':')[1]
tmp_host = ehost.split('/')[0]
tmp_out = self.wait_and_read_file(filename=tmp_file, host=tmp_host)
check_devices = ['b *:* rwm',
'c 5:1 rwm',
'c 4:* rwm',
'c 1:* rwm',
'c 10:* rwm']
for device in check_devices:
self.assertTrue(device in tmp_out,
'"%s" not found in: %s' % (device, tmp_out))
self.logger.info('device_list check passed')
self.assertFalse('Disabled cgroup subsystems are populated '
'with the job id' in tmp_out,
'Found disabled cgroup subsystems populated')
self.logger.info('Disabled subsystems check passed')
def test_cgroup_cpuset(self):
"""
Test to verify that 2 jobs are not assigned the same cpus
"""
pcpus = 0
with open('/proc/cpuinfo', 'r') as desc:
for line in desc:
if re.match('^processor', line):
pcpus += 1
if pcpus < 2:
self.skipTest('Test requires at least two physical CPUs')
name = 'CGROUP4'
self.load_config(self.cfg3 % ('', '', '', self.swapctl, ''))
# Submit two jobs
a = {'Resource_List.select': '1:ncpus=1:mem=300mb:host=%s' %
self.hostA, ATTR_N: name + 'a'}
j1 = Job(TEST_USER, attrs=a)
j1.create_script(self.cpuset_mem_script)
jid1 = self.server.submit(j1)
b = {'Resource_List.select': '1:ncpus=1:mem=300mb:host=%s' %
self.hostA, ATTR_N: name + 'b'}
j2 = Job(TEST_USER, attrs=b)
j2.create_script(self.cpuset_mem_script)
jid2 = self.server.submit(j2)
a = {'job_state': 'R'}
# Make sure they are both running
self.server.expect(JOB, a, jid1)
self.server.expect(JOB, a, jid2)
# Status the jobs for their output files
attrib = [ATTR_o]
self.server.status(JOB, attrib, jid1)
filename1 = j1.attributes[ATTR_o]
self.logger.info('Job1 .o file: %s' % filename1)
self.tempfile.append(filename1)
self.server.status(JOB, attrib, jid2)
filename2 = j2.attributes[ATTR_o]
self.logger.info('Job2 .o file: %s' % filename2)
self.tempfile.append(filename2)
# Read the output files
tmp_file1 = filename1.split(':')[1]
tmp_out1 = self.wait_and_read_file(filename=tmp_file1, host=self.hostA)
self.logger.info("test output for job1: %s" % (tmp_out1))
self.assertTrue(
jid1 in tmp_out1, '%s not found in output on host %s'
% (jid1, self.hostA))
tmp_file2 = filename2.split(':')[1]
tmp_out2 = self.wait_and_read_file(filename=tmp_file2, host=self.hostA)
self.logger.info("test output for job2: %s" % (tmp_out2))
self.assertTrue(
jid2 in tmp_out2, '%s not found in output on host %s'
% (jid2, self.hostA))
self.logger.info('job dir check passed')
# Ensure the CPU ID for each job differs
cpuid1 = None
for kv in tmp_out1:
if 'CpuIDs=' in kv:
cpuid1 = kv
break
self.assertNotEqual(cpuid1, None, 'Could not read first CPU ID.')
cpuid2 = None
for kv in tmp_out2:
if 'CpuIDs=' in kv:
cpuid2 = kv
break
self.assertNotEqual(cpuid2, None, 'Could not read second CPU ID.')
self.logger.info("cpuid1 = %s and cpuid2 = %s" % (cpuid1, cpuid2))
self.assertNotEqual(cpuid1, cpuid2,
'Processes should be assigned to different CPUs')
self.logger.info('CpuIDs check passed')
def test_cgroup_enforce_memory(self):
"""
Test to verify that the job is killed when it tries to
use more memory then it requested
"""
name = 'CGROUP5'
self.load_config(self.cfg3 % ('', '', '', self.swapctl, ''))
a = {'Resource_List.select': '1:ncpus=1:mem=300mb:host=%s' %
self.hostA, ATTR_N: name}
j = Job(TEST_USER, attrs=a)
j.create_script(self.eatmem_job1)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, ATTR_o, jid)
o = j.attributes[ATTR_o]
self.tempfile.append(o)
# mem and vmem limit will both be set, and either could be detected
self.momA.log_match('%s;Cgroup mem(ory|sw) limit exceeded' % jid,
regexp=True,
max_attempts=20)
def test_cgroup_enforce_memsw(self):
"""
Test to verify that the job is killed when it tries to
use more vmem then it requested
"""
# run the test if swap space is available
if have_swap() == 0:
self.skipTest('no swap space available on the local host')
fn = self.get_cgroup_job_dir('memory', '123.foo', self.hostA)
# Get the grandparent directory
fn = os.path.dirname(fn)
fn = os.path.dirname(fn)
fn = os.path.join(fn, 'memory.memsw.limit_in_bytes')
if not self.is_file(fn, self.hostA):
self.skipTest('vmem resource not present on node')
name = 'CGROUP6'
self.load_config(self.cfg3 % ('', '', '', self.swapctl, ''))
a = {
'Resource_List.select':
'1:ncpus=1:mem=300mb:vmem=320mb:host=%s' % self.hostA,
ATTR_N: name}
j = Job(TEST_USER, attrs=a)
j.create_script(self.eatmem_job1)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, [ATTR_o, 'exec_host'], jid)
filename = j.attributes[ATTR_o]
self.tempfile.append(filename)
ehost = j.attributes['exec_host']
tmp_file = filename.split(':')[1]
tmp_host = ehost.split('/')[0]
tmp_out = self.wait_and_read_file(filename=tmp_file, host=tmp_host)
self.assertTrue('MemoryError' in tmp_out,
'MemoryError not present in output')
@timeout(300)
def test_cgroup_offline_node(self):
"""
Test to verify that the node is offlined when it can't clean up
the cgroup and brought back online once the cgroup is cleaned up
"""
name = 'CGROUP7'
if 'freezer' not in self.paths:
self.skipTest('Freezer cgroup is not mounted')
fdir = self.get_cgroup_job_dir('freezer', '123.foo', self.hostA)
# Get the grandparent directory
fdir = os.path.dirname(fdir)
fdir = os.path.dirname(fdir)
if not self.is_dir(fdir, self.hostA):
self.skipTest('Freezer cgroup is not found')
# Configure the hook
self.load_config(self.cfg3 % ('', '', '', self.swapctl, ''))
a = {'Resource_List.select': '1:ncpus=1:mem=300mb:host=%s' %
self.hostA, 'Resource_List.walltime': 3, ATTR_N: name}
j = Job(TEST_USER, attrs=a)
j.create_script(self.sleep15_job)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, ATTR_o, jid)
filename = j.attributes[ATTR_o]
self.tempfile.append(filename)
tmp_file = filename.split(':')[1]
# Query the pids in the cgroup
jdir = self.get_cgroup_job_dir('cpuset', jid, self.hostA)
tasks_file = os.path.join(jdir, 'tasks')
time.sleep(2)
ret = self.du.cat(self.hostA, tasks_file, sudo=True)
tasks = ret['out']
if len(tasks) < 2:
self.skipTest('pbs_cgroups_hook: only one task in cgroup')
self.logger.info('Tasks: %s' % tasks)
self.assertTrue(tasks, 'No tasks in cpuset cgroup for job')
# Make dir in freezer subsystem
fdir = os.path.join(fdir, 'PtlPbs')
if not self.du.isdir(fdir):
self.du.mkdir(hostname=self.hostA, path=fdir,
mode=0755, sudo=True)
self.tempfile.append(fdir)
# Write a PID into the tasks file for the freezer cgroup
task_file = os.path.join(fdir, 'tasks')
success = False
for pid in reversed(tasks[1:]):
fn = self.du.create_temp_file(hostname=self.hostA, body=pid)
self.tempfile.append(fn)
ret = self.du.run_copy(hosts=self.hostA, src=fn,
dest=task_file, sudo=True,
uid='root', gid='root',
mode=0644)
if ret['rc'] == 0:
success = True
break
self.logger.info('Failed to copy %s to %s on %s' %
(fn, task_file, self.hostA))
self.logger.info('rc = %d', ret['rc'])
self.logger.info('stdout = %s', ret['out'])
self.logger.info('stderr = %s', ret['err'])
if not success:
self.skipTest('pbs_cgroups_hook: Failed to copy freezer tasks')
# Freeze the cgroup
freezer_file = os.path.join(fdir, 'freezer.state')
state = 'FROZEN'
fn = self.du.create_temp_file(hostname=self.hostA, body=state)
self.tempfile.append(fn)
ret = self.du.run_copy(self.hostA, src=fn,
dest=freezer_file, sudo=True,
uid='root', gid='root',
mode=0644)
if ret['rc'] != 0:
self.skipTest('pbs_cgroups_hook: Failed to copy '
'freezer state FROZEN')
self.server.expect(NODE, {'state': (MATCH_RE, 'offline')},
id=self.nodeA, interval=3)
# Thaw the cgroup
state = 'THAWED'
fn = self.du.create_temp_file(hostname=self.hostA, body=state)
self.tempfile.append(fn)
ret = self.du.run_copy(self.hostA, src=fn,
dest=freezer_file, sudo=True,
uid='root', gid='root',
mode=0644)
if ret['rc'] != 0:
self.skipTest('pbs_cgroups_hook: Failed to copy '
'freezer state THAWED')
time.sleep(1)
self.du.rm(hostname=self.hostA, path=os.path.dirname(fdir),
force=True, recursive=True, sudo=True)
self.server.expect(NODE, {'state': 'free'},
id=self.nodeA, interval=3)
def test_cgroup_cpuset_host_excluded(self):
"""
Test to verify that cgroups subsystems are not enforced on nodes
that have the exclude_hosts set but are enforced on other systems
"""
# Test requires 2 nodes
if len(self.moms) != 2:
self.skipTest('Test requires two Moms as input, '
'use -p moms=')
name = 'CGROUP10'
mom, _ = self.get_host_names(self.hostA)
self.load_config(self.cfg1 % ('', '', '', '%s' % mom, self.swapctl))
a = {'Resource_List.select': '1:ncpus=1:mem=300mb:host=%s' %
self.hostA, ATTR_N: name}
j = Job(TEST_USER, attrs=a)
j.create_script(self.sleep15_job)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, ATTR_o, jid)
o = j.attributes[ATTR_o]
self.tempfile.append(o)
hostn = self.get_hostname(self.hostA)
self.momA.log_match('cgroup excluded for subsystem cpuset '
'on host %s' % hostn, starttime=self.server.ctime)
cpath = self.get_cgroup_job_dir('cpuset', jid, self.hostA)
self.assertFalse(self.is_dir(cpath, self.hostA))
# Now try a job on momB
a = {'Resource_List.select': '1:ncpus=1:mem=300mb:host=%s' %
self.hostB, ATTR_N: name}
j = Job(TEST_USER, attrs=a)
j.create_script(self.sleep15_job)
jid2 = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid2)
cpath = self.get_cgroup_job_dir('cpuset', jid2, self.hostB)
self.logger.info('Checking for %s on %s' % (cpath, self.momB))
self.assertTrue(self.is_dir(cpath, self.hostB))
def test_cgroup_run_on_host(self):
"""
Test to verify that the cgroup hook only runs on nodes
in the run_only_on_hosts
"""
# Test requires 2 nodes
if len(self.moms) != 2:
self.skipTest('Test requires two Moms as input, '
'use -p moms=')
name = 'CGROUP11'
mom, log = self.get_host_names(self.hostA)
self.load_config(self.cfg1 % ('', '', '%s' % mom, '', self.swapctl))
a = {'Resource_List.select': '1:ncpus=1:mem=300mb:host=%s' %
self.hostB, ATTR_N: name}
j = Job(TEST_USER, attrs=a)
j.create_script(self.sleep15_job)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, ATTR_o, jid)
o = j.attributes[ATTR_o]
self.tempfile.append(o)
time.sleep(1)
hostn = self.get_hostname(self.hostB)
self.momB.log_match('%s is not in the approved host list: [%s]' %
(hostn, log), starttime=self.server.ctime)
cpath = self.get_cgroup_job_dir('memory', jid, self.hostB)
self.assertFalse(self.is_dir(cpath, self.hostB))
a = {'Resource_List.select': '1:ncpus=1:mem=300mb:host=%s' %
self.hostA, ATTR_N: name}
j = Job(TEST_USER, attrs=a)
j.create_script(self.sleep15_job)
jid2 = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid2)
self.server.status(JOB, ATTR_o, jid2)
o = j.attributes[ATTR_o]
self.tempfile.append(o)
cpath = self.get_cgroup_job_dir('memory', jid2, self.hostA)
self.assertTrue(self.is_dir(cpath, self.hostA))
def test_cgroup_qstat_resources(self):
"""
Test to verify that cgroups are reporting usage for
mem, and vmem in qstat
"""
name = 'CGROUP14'
self.load_config(self.cfg3 % ('', '', '', self.swapctl, ''))
a = {'Resource_List.select': '1:ncpus=1:mem=500mb', ATTR_N: name}
j = Job(TEST_USER, attrs=a)
j.create_script(self.eatmem_job2)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, [ATTR_o, 'exec_host'], jid)
o = j.attributes[ATTR_o]
self.tempfile.append(o)
host = j.attributes['exec_host']
self.logger.info('OUTPUT: %s' % o)
resc_list = ['resources_used.cput']
resc_list += ['resources_used.mem']
resc_list += ['resources_used.vmem']
qstat1 = self.server.status(JOB, resc_list, id=jid)
for q in qstat1:
self.logger.info('Q1: %s' % q)
cput1 = qstat1[0]['resources_used.cput']
mem1 = qstat1[0]['resources_used.mem']
vmem1 = qstat1[0]['resources_used.vmem']
self.logger.info('Waiting 25 seconds for CPU time to accumulate')
time.sleep(25)
qstat2 = self.server.status(JOB, resc_list, id=jid)
for q in qstat2:
self.logger.info('Q2: %s' % q)
cput2 = qstat2[0]['resources_used.cput']
mem2 = qstat2[0]['resources_used.mem']
vmem2 = qstat2[0]['resources_used.vmem']
self.assertNotEqual(cput1, cput2)
self.assertNotEqual(mem1, mem2)
self.assertNotEqual(vmem1, vmem2)
@timeout(500)
def test_cgroup_reserve_mem(self):
"""
Test to verify that the mom reserve memory for OS
when there is a reserve mem request in the config.
Install cfg3 and then cfg4 and measure diffenece
between the amount of available memory and memsw.
For example, on a system with 1GB of physical memory
and 1GB of active swap. With cfg3 in place, we should
see 1GB - 50MB = 950MB of available memory and
2GB - (50MB + 45MB) = 1905MB of available vmem.
With cfg4 in place, we should see 1GB - 100MB = 900MB
of available memory and 2GB - (100MB + 90MB) = 1810MB
of available vmem. When we calculate the differences
we get:
mem: 950MB - 900MB = 50MB = 51200KB
vmem: 1905MB - 1810MB = 95MB = 97280KB
"""
self.load_config(self.cfg3 % ('', '', '', self.swapctl, ''))
self.server.expect(NODE, {'state': 'free'},
id=self.nodeA, interval=3, offset=10)
if self.swapctl == 'true':
vmem = self.server.status(NODE, 'resources_available.vmem',
id=self.nodeA)
self.logger.info('vmem: %s' % str(vmem))
vmem1 = PbsTypeSize(vmem[0]['resources_available.vmem'])
self.logger.info('Vmem-1: %s' % vmem1.value)
mem = self.server.status(NODE, 'resources_available.mem',
id=self.nodeA)
mem1 = PbsTypeSize(mem[0]['resources_available.mem'])
self.logger.info('Mem-1: %s' % mem1.value)
self.load_config(self.cfg4 % (self.swapctl))
self.server.expect(NODE, {'state': 'free'},
id=self.nodeA, interval=3, offset=10)
if self.swapctl == 'true':
vmem = self.server.status(NODE, 'resources_available.vmem',
id=self.nodeA)
vmem2 = PbsTypeSize(vmem[0]['resources_available.vmem'])
self.logger.info('Vmem-2: %s' % vmem2.value)
vmem_resv = vmem1 - vmem2
self.logger.info('Vmem resv: %s' % vmem_resv.value)
self.assertEqual(vmem_resv.value, 97280)
self.assertEqual(vmem_resv.unit, 'kb')
mem = self.server.status(NODE, 'resources_available.mem',
id=self.nodeA)
mem2 = PbsTypeSize(mem[0]['resources_available.mem'])
self.logger.info('Mem-2: %s' % mem2.value)
mem_resv = mem1 - mem2
self.logger.info('Mem resv: %s' % mem_resv.value)
self.assertEqual(mem_resv.value, 51200)
self.assertEqual(mem_resv.unit, 'kb')
def test_cgroup_multi_node(self):
"""
Test multi-node jobs with cgroups
"""
# Test requires 2 nodes
if len(self.moms) != 2:
self.skipTest('Test requires two Moms as input, '
'use -p moms=')
name = 'CGROUP16'
self.load_config(self.cfg6 % (self.swapctl))
a = {'Resource_List.select': '2:ncpus=1:mem=100mb',
'Resource_List.place': 'scatter', ATTR_N: name}
j = Job(TEST_USER, attrs=a)
j.create_script(self.sleep15_job)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, 'exec_host', jid)
ehost = j.attributes['exec_host']
tmp_host = ehost.split('+')
ehost1 = tmp_host[0].split('/')[0]
ehjd1 = self.get_cgroup_job_dir('memory', jid, ehost1)
self.assertTrue(self.is_dir(ehjd1, ehost1),
'Missing memory subdirectory: %s' % ehjd1)
ehost2 = tmp_host[1].split('/')[0]
ehjd2 = self.get_cgroup_job_dir('memory', jid, ehost2)
self.assertTrue(self.is_dir(ehjd2, ehost2))
# Wait for job to finish and make sure that cgroup directories
# has been cleaned up by the hook
self.server.expect(JOB, 'queue', op=UNSET, offset=15, interval=1,
id=jid)
self.assertFalse(self.is_dir(ehjd1, ehost1),
'Directory still present: %s' % ehjd1)
self.assertFalse(self.is_dir(ehjd2, ehost2),
'Directory still present: %s' % ehjd2)
def test_cgroup_job_array(self):
"""
Test that cgroups are created for subjobs like a regular job
"""
name = 'CGROUP17'
self.load_config(self.cfg1 % ('', '', '', '', self.swapctl))
a = {'Resource_List.select': '1:ncpus=1:mem=300mb:host=%s' %
self.hostA, ATTR_N: name, ATTR_J: '1-4',
'Resource_List.place': 'pack:excl'}
j = Job(TEST_USER, attrs=a)
j.set_sleep_time(60)
jid = self.server.submit(j)
a = {'job_state': 'B'}
self.server.expect(JOB, a, jid)
# Get subjob ID
subj1 = jid.replace('[]', '[1]')
self.server.expect(JOB, {'job_state': 'R'}, subj1)
rv = self.server.status(JOB, ['exec_host'], subj1)
ehost = rv[0].get('exec_host')
ehost1 = ehost.split('/')[0]
# Verify that cgroups files created for subjobs
# but not for parent job array
cpath = self.get_cgroup_job_dir('memory', subj1, ehost1)
self.assertTrue(self.is_dir(cpath, ehost1))
cpath = self.get_cgroup_job_dir('memory', jid, ehost1)
self.assertFalse(self.is_dir(cpath, ehost1))
# Verify that subjob4 is queued and no cgroups
# files are created for queued subjob
subj4 = jid.replace('[]', '[4]')
self.server.expect(JOB, {'job_state': 'Q'}, id=subj4)
cpath = self.get_cgroup_job_dir('memory', subj4, ehost1)
self.assertFalse(self.is_dir(cpath, self.hostA))
# Delete subjob1 and verify that cgroups files are cleaned up
self.server.delete(id=subj1)
self.server.expect(JOB, {'job_state': 'X'}, subj1)
cpath = self.get_cgroup_job_dir('memory', subj1, ehost1)
self.assertFalse(self.is_dir(cpath, ehost1))
# Verify if subjob2 is running
subj2 = jid.replace('[]', '[2]')
self.server.expect(JOB, {'job_state': 'R'}, id=subj2)
# Force delete the subjob and verify cgroups
# files are cleaned up
self.server.delete(id=subj2, extend='force')
self.server.expect(JOB, {'job_state': 'X'}, subj2)
# Adding extra sleep for file to clean up
time.sleep(2)
cpath = self.get_cgroup_job_dir('memory', subj2, ehost1)
self.assertFalse(self.is_dir(cpath, ehost1))
def test_cgroup_cleanup(self):
"""
Test that cgroups files are cleaned up after qdel
"""
# Test requires 2 nodes
if len(self.moms) != 2:
self.skipTest('Test requires two Moms as input, '
'use -p moms=')
self.load_config(self.cfg1 % ('', '', '', '', self.swapctl))
a = {'Resource_List.select': '2:ncpus=1:mem=100mb',
'Resource_List.place': 'scatter'}
j = Job(TEST_USER, attrs=a)
j.create_script(self.sleep15_job)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, ['exec_host'], jid)
ehost = j.attributes['exec_host']
tmp_host = ehost.split('+')
ehost1 = tmp_host[0].split('/')[0]
ehost2 = tmp_host[1].split('/')[0]
ehjd1 = self.get_cgroup_job_dir('cpuset', jid, ehost1)
self.assertTrue(self.is_dir(ehjd1, ehost1))
ehjd2 = self.get_cgroup_job_dir('cpuset', jid, ehost2)
self.assertTrue(self.is_dir(ehjd2, ehost2))
self.server.delete(id=jid, wait=True)
self.assertFalse(self.is_dir(ehjd1, ehost1))
self.assertFalse(self.is_dir(ehjd2, ehost2))
def test_cgroup_execjob_end_should_delete_cgroup(self):
"""
Test to verify that if execjob_epilogue hook failed to run or to
clean up cgroup files for a job, execjob_end hook should clean
them up
"""
self.load_config(self.cfg4 % (self.swapctl))
# remove epilogue and periodic from the list of events
attr = {'enabled': 'True',
'event': '"execjob_begin,execjob_launch,'
'execjob_attach,execjob_end,exechost_startup"'}
self.server.manager(MGR_CMD_SET, HOOK, attr, self.hook_name)
self.server.expect(NODE, {'state': 'free'}, id=self.nodeA)
j = Job(TEST_USER)
j.set_sleep_time(1)
jid = self.server.submit(j)
# wait for job to finish
self.server.expect(JOB, 'queue', id=jid, op=UNSET, max_attempts=20,
interval=1, offset=1)
# verify that cgroup files for this job are gone even if
# epilogue and periodic events are not disabled
for subsys, path in self.paths.items():
# only check under subsystems that are enabled
enabled_subsys = ['cpuacct', 'cpuset', 'memory', 'memsw']
if (any([x in subsys for x in enabled_subsys])):
continue
if path:
filename = os.path.join(path, 'pbspro', str(jid))
self.logger.info('Checking that file %s should not exist'
% filename)
self.assertFalse(os.path.isfile(filename))
@skipOnCray
def test_cgroup_assign_resources_mem_only_vnode(self):
"""
Test to verify that job requesting mem larger than any single vnode
works properly
"""
vn_attrs = {ATTR_rescavail + '.ncpus': 1,
ATTR_rescavail + '.mem': '500mb'}
self.load_config(self.cfg4 % (self.swapctl))
self.server.expect(NODE, {ATTR_NODE_state: 'free'}, id=self.nodeA)
self.server.create_vnodes('vnode', vn_attrs, 2, self.moms.values()[0])
self.server.expect(NODE, {ATTR_NODE_state: 'free'}, id=self.nodeA)
a = {'Resource_List.select': '1:ncpus=1:mem=500mb'}
j1 = Job(TEST_USER, attrs=a)
j1.create_script('date')
jid1 = self.server.submit(j1)
self.server.expect(JOB, 'queue', id=jid1, op=UNSET, max_attempts=20,
interval=1, offset=1)
a = {'Resource_List.select': '1:ncpus=1:mem=1000mb'}
j2 = Job(TEST_USER, attrs=a)
j2.create_script('date')
jid2 = self.server.submit(j2)
self.server.expect(JOB, 'queue', id=jid2, op=UNSET, max_attempts=30,
interval=1, offset=1)
a = {'Resource_List.select': '1:ncpus=1:mem=40gb'}
j3 = Job(TEST_USER, attrs=a)
j3.create_script('date')
jid3 = self.server.submit(j3)
a = {'job_state': 'Q',
'comment':
(MATCH_RE,
'.*Can Never Run: Insufficient amount of resource: mem.*')}
self.server.expect(JOB, a, attrop=PTL_AND, id=jid3, offset=10,
interval=1, max_attempts=30)
@timeout(500)
def test_cgroup_cpuset_exclude_cpu(self):
"""
Confirm that exclude_cpus reduces resources_available.ncpus
"""
# Fetch the unmodified value of resources_available.ncpus
self.load_config(self.cfg5 % ('false', '', 'false', 'false',
'false', self.swapctl))
self.server.expect(NODE, {'state': 'free'},
id=self.nodeA, interval=3, offset=10)
result = self.server.status(NODE, 'resources_available.ncpus',
id=self.nodeA)
orig_ncpus = int(result[0]['resources_available.ncpus'])
self.assertGreater(orig_ncpus, 0)
self.logger.info('Original value of ncpus: %d' % orig_ncpus)
if orig_ncpus < 2:
self.skipTest('Node must have at least two CPUs')
# Now exclude CPU zero
self.load_config(self.cfg5 % ('false', '0', 'false', 'false',
'false', self.swapctl))
self.server.expect(NODE, {'state': 'free'},
id=self.nodeA, interval=3, offset=10)
result = self.server.status(NODE, 'resources_available.ncpus',
id=self.nodeA)
new_ncpus = int(result[0]['resources_available.ncpus'])
self.assertGreater(new_ncpus, 0)
self.logger.info('New value with one CPU excluded: %d' % new_ncpus)
self.assertEqual((new_ncpus + 1), orig_ncpus)
# Repeat the process with vnode_per_numa_node set to true
vnode = '%s[0]' % self.nodeA
self.load_config(self.cfg5 % ('true', '', 'false', 'false',
'false', self.swapctl))
self.server.expect(NODE, {'state': 'free'},
id=vnode, interval=3, offset=10)
result = self.server.status(NODE, 'resources_available.ncpus',
id=vnode)
orig_ncpus = int(result[0]['resources_available.ncpus'])
self.assertGreater(orig_ncpus, 0)
self.logger.info('Original value of vnode ncpus: %d' % orig_ncpus)
# Exclude CPU zero again
self.load_config(self.cfg5 % ('true', '0', 'false', 'false',
'false', self.swapctl))
self.server.expect(NODE, {'state': 'free'},
id=vnode, interval=3, offset=10)
result = self.server.status(NODE, 'resources_available.ncpus',
id=vnode)
new_ncpus = int(result[0]['resources_available.ncpus'])
self.assertEqual((new_ncpus + 1), orig_ncpus)
def test_cgroup_cpuset_mem_fences(self):
"""
Confirm that mem_fences affects setting of cpuset.mems
"""
cpuset_base = self.get_cgroup_job_dir('cpuset', '123.foo', self.hostA)
# Get the grandparent directory
cpuset_base = os.path.dirname(cpuset_base)
cpuset_base = os.path.dirname(cpuset_base)
cpuset_mems = os.path.join(cpuset_base, 'cpuset.mems')
result = self.du.cat(hostname=self.hostA, filename=cpuset_mems,
sudo=True)
if result['rc'] != 0 or result['out'][0] == '0':
self.skipTest('Test requires two NUMA nodes')
# First try with mem_fences set to true (the default)
self.load_config(self.cfg5 % ('false', '', 'true', 'false',
'false', self.swapctl))
self.server.expect(NODE, {'state': 'free'},
id=self.nodeA, interval=3, offset=10)
a = {'Resource_List.select': '1:ncpus=1:mem=100mb:host=%s' %
self.hostA}
j = Job(TEST_USER, attrs=a)
j.create_script(self.sleep15_job)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, ATTR_o, jid)
o = j.attributes[ATTR_o]
self.tempfile.append(o)
fn = self.get_cgroup_job_dir('cpuset', jid, self.hostA)
fn = os.path.join(fn, 'cpuset.mems')
result = self.du.cat(hostname=self.hostA, filename=fn, sudo=True)
self.assertEqual(result['rc'], 0)
self.assertEqual(result['out'][0], '0')
# Now try with mem_fences set to false
self.load_config(self.cfg5 % ('false', '', 'false', 'false',
'false', self.swapctl))
self.server.expect(NODE, {'state': 'free'},
id=self.nodeA, interval=3, offset=10)
a = {'Resource_List.select': '1:ncpus=1:mem=100mb:host=%s' %
self.hostA}
j = Job(TEST_USER, attrs=a)
j.create_script(self.sleep15_job)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, ATTR_o, jid)
o = j.attributes[ATTR_o]
self.tempfile.append(o)
fn = self.get_cgroup_job_dir('cpuset', jid, self.hostA)
fn = os.path.join(fn, 'cpuset.mems')
result = self.du.cat(hostname=self.hostA, filename=fn, sudo=True)
self.assertEqual(result['rc'], 0)
self.assertNotEqual(result['out'][0], '0')
def test_cgroup_cpuset_mem_hardwall(self):
"""
Confirm that mem_hardwall affects setting of cpuset.mem_hardwall
"""
self.load_config(self.cfg5 % ('false', '', 'true', 'false',
'false', self.swapctl))
self.server.expect(NODE, {'state': 'free'},
id=self.nodeA, interval=3, offset=10)
a = {'Resource_List.select': '1:ncpus=1:mem=100mb:host=%s' %
self.hostA}
j = Job(TEST_USER, attrs=a)
j.create_script(self.sleep15_job)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, ATTR_o, jid)
o = j.attributes[ATTR_o]
self.tempfile.append(o)
memh_path = 'cpuset.mem_hardwall'
fn = self.get_cgroup_job_dir('cpuset', jid, self.hostA)
if self.noprefix:
memh_path = 'mem_hardwall'
fn = os.path.join(fn, memh_path)
result = self.du.cat(hostname=self.hostA, filename=fn, sudo=True)
self.assertEqual(result['rc'], 0)
self.assertEqual(result['out'][0], '0')
self.load_config(self.cfg5 % ('false', '', 'true', 'true',
'false', self.swapctl))
self.server.expect(NODE, {'state': 'free'},
id=self.nodeA, interval=3, offset=10)
a = {'Resource_List.select': '1:ncpus=1:mem=100mb:host=%s' %
self.hostA}
j = Job(TEST_USER, attrs=a)
j.create_script(self.sleep15_job)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, ATTR_o, jid)
o = j.attributes[ATTR_o]
self.tempfile.append(o)
fn = self.get_cgroup_job_dir('cpuset', jid, self.hostA)
fn = os.path.join(fn, memh_path)
result = self.du.cat(hostname=self.hostA, filename=fn, sudo=True)
self.assertEqual(result['rc'], 0)
self.assertEqual(result['out'][0], '1')
def test_cgroup_find_gpus(self):
"""
Confirm that the hook finds the correct number
of GPUs.
"""
if not self.paths['devices']:
self.skipTest('Skipping test since no devices subsystem defined')
name = 'CGROUP3'
self.load_config(self.cfg2)
cmd = ['nvidia-smi', '-L']
try:
rv = self.du.run_cmd(cmd=cmd)
except OSError:
rv = {'err': True}
if rv['err'] or 'GPU' not in rv['out'][0]:
self.skipTest('Skipping test since nvidia-smi not found')
gpus = int(len(rv['out']))
if gpus < 1:
self.skipTest('Skipping test since no gpus found')
self.server.expect(NODE, {'state': 'free'}, id=self.nodeA)
ngpus = self.server.status(NODE, 'resources_available.ngpus',
id=self.nodeA)[0]
ngpus = int(ngpus['resources_available.ngpus'])
self.assertEqual(gpus, ngpus, 'ngpus is incorrect')
a = {'Resource_List.select': '1:ngpus=1', ATTR_N: name}
j = Job(TEST_USER, attrs=a)
j.create_script(self.check_gpu_script)
jid = self.server.submit(j)
self.server.expect(JOB, {'job_state': 'R'}, jid)
self.server.status(JOB, [ATTR_o, 'exec_host'], jid)
filename = j.attributes[ATTR_o]
self.tempfile.append(filename)
ehost = j.attributes['exec_host']
tmp_file = filename.split(':')[1]
tmp_host = ehost.split('/')[0]
tmp_out = self.wait_and_read_file(filename=tmp_file, host=tmp_host)
self.logger.info(tmp_out)
self.assertIn('There are 1 GPUs', tmp_out, 'No gpus were assigned')
self.assertIn('c 195:255 rwm', tmp_out, 'Nvidia controller not found')
m = re.search(r'195:(?!255)', '\n'.join(tmp_out))
self.assertIsNotNone(m.group(0), 'No gpu assigned in cgroups')
def test_cgroup_cpuset_memory_spread_page(self):
"""
Confirm that mem_spread_page affects setting of
cpuset.memory_spread_page
"""
self.load_config(self.cfg5 % ('false', '', 'true', 'false',
'false', self.swapctl))
self.server.expect(NODE, {'state': 'free'},
id=self.nodeA, interval=3, offset=10)
a = {'Resource_List.select': '1:ncpus=1:mem=100mb:host=%s' %
self.hostA}
j = Job(TEST_USER, attrs=a)
j.create_script(self.sleep15_job)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, ATTR_o, jid)
o = j.attributes[ATTR_o]
self.tempfile.append(o)
spread_path = 'cpuset.memory_spread_page'
fn = self.get_cgroup_job_dir('cpuset', jid, self.hostA)
if self.noprefix:
spread_path = 'memory_spread_page'
fn = os.path.join(fn, spread_path)
result = self.du.cat(hostname=self.hostA, filename=fn, sudo=True)
self.assertEqual(result['rc'], 0)
self.assertEqual(result['out'][0], '0')
self.load_config(self.cfg5 % ('false', '', 'true', 'false',
'true', self.swapctl))
self.server.expect(NODE, {'state': 'free'},
id=self.nodeA, interval=3, offset=10)
a = {'Resource_List.select': '1:ncpus=1:mem=100mb:host=%s' %
self.hostA}
j = Job(TEST_USER, attrs=a)
j.create_script(self.sleep15_job)
jid = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid)
self.server.status(JOB, ATTR_o, jid)
o = j.attributes[ATTR_o]
self.tempfile.append(o)
fn = self.get_cgroup_job_dir('cpuset', jid, self.hostA)
fn = os.path.join(fn, spread_path)
result = self.du.cat(hostname=self.hostA, filename=fn, sudo=True)
self.assertEqual(result['rc'], 0)
self.assertEqual(result['out'][0], '1')
def test_cgroup_use_hierarchy(self):
"""
Test that memory.use_hierarchy is enabled by default
when PBS cgroups hook is instantiated
"""
now = int(time.time())
# Remove PBS directories from memory subsystem
if 'memory' in self.paths and self.paths['memory']:
cdir = self.paths['memory']
if os.path.isdir(cdir):
cpath = os.path.join(cdir, 'pbspro')
if not os.path.isdir(cpath):
cpath = os.path.join(cdir, 'pbspro.slice')
else:
self.skipTest(
"memory subsystem is not enabled for cgroups")
cmd = ["rmdir", cpath]
self.logger.info("Removing %s" % cpath)
self.du.run_cmd(cmd=cmd, sudo=True)
self.load_config(self.cfg6 % (self.swapctl))
self.momA.restart()
# Wait for exechost_startup hook to run
self.momA.log_match("Hook handler returned success for"
" exechost_startup event",
starttime=now)
# Verify that memory.use_hierarchy is enabled
fpath = os.path.join(cpath, "memory.use_hierarchy")
self.logger.info("looking for file %s" % fpath)
if os.path.isfile(fpath):
with open(fpath, 'r') as fd:
val = fd.read()
self.assertEqual(
val.rstrip(), "1", "%s is not equal to 1"
% val.rstrip())
self.logger.info("memory.use_hierarchy is enabled")
else:
self.assertFalse(1, "File %s not present" % fpath)
def test_cgroup_periodic_update_known_jobs(self):
"""
Verify that jobs known to mom are updated, not orphans
"""
conf = {'freq': 5, 'order': 100}
self.server.manager(MGR_CMD_SET, HOOK, conf, self.hook_name)
self.load_config(self.cfg3 % ('', '', '', self.swapctl, ''))
# Submit a short job and let it run to completion
a = {'Resource_List.select': '1:ncpus=1:mem=100mb:host=%s' %
self.hostA}
j = Job(TEST_USER, attrs=a)
j.create_script(self.sleep5_job)
jid1 = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid1)
self.server.status(JOB, ATTR_o, jid1)
o = j.attributes[ATTR_o]
self.tempfile.append(o)
err_msg = "Unexpected error in pbs_cgroups " + \
"handling exechost_periodic event: TypeError"
self.mom.log_match(err_msg, max_attempts=3,
interval=1, n=100, existence=False)
self.server.log_match(jid1 + ';Exit_status=0')
# Create a periodic hook that runs more frequently than the
# cgroup hook to prepend jid1 to mom_priv/hooks/hook_data/cgroup_jobs
hookname = 'prependjob'
hookbody = """
import pbs
import os
import re
import traceback
event = pbs.event()
jid_to_prepend = '%s'
pbs_home = ''
pbs_mom_home = ''
if 'PBS_HOME' in os.environ:
pbs_home = os.environ['PBS_HOME']
if 'PBS_MOM_HOME' in os.environ:
pbs_mom_home = os.environ['PBS_MOM_HOME']
pbs_conf = pbs.get_pbs_conf()
if pbs_conf:
if not pbs_home and 'PBS_HOME' in pbs_conf:
pbs_home = pbs_conf['PBS_HOME']
if not pbs_mom_home and 'PBS_MOM_HOME' in pbs_conf:
pbs_mom_home = pbs_conf['PBS_MOM_HOME']
if not pbs_home or not pbs_mom_home:
if 'PBS_CONF_FILE' in os.environ:
pbs_conf_file = os.environ['PBS_CONF_FILE']
else:
pbs_conf_file = os.path.join(os.sep, 'etc', 'pbs.conf')
regex = re.compile(r'\\s*([^\\s]+)\\s*=\\s*([^\\s]+)\\s*')
try:
with open(pbs_conf_file, 'r') as desc:
for line in desc:
match = regex.match(line)
if match:
if not pbs_home and match.group(1) == 'PBS_HOME':
pbs_home = match.group(2)
if not pbs_mom_home and (match.group(1) ==
'PBS_MOM_HOME'):
pbs_mom_home = match.group(2)
except Exception:
pass
if not pbs_home:
pbs.logmsg(pbs.EVENT_DEBUG, 'Failed to locate PBS_HOME')
event.reject()
if not pbs_mom_home:
pbs_mom_home = pbs_home
jobsfile = os.path.join(pbs_mom_home, 'mom_priv', 'hooks',
'hook_data', 'cgroup_jobs')
try:
with open(jobsfile, 'r+') as desc:
joblist = desc.readline().split()
jobset = set(joblist)
if jid_to_prepend not in jobset:
jobset.add(jid_to_prepend)
desc.seek(0)
desc.write(' '.join(jobset))
desc.truncate()
except Exception as exc:
pbs.logmsg(pbs.EVENT_DEBUG, 'Failed to modify ' + jobsfile)
pbs.logmsg(pbs.EVENT_DEBUG,
str(traceback.format_exc().strip().splitlines()))
event.reject()
event.accept()
""" % jid1
events = '"execjob_begin,exechost_periodic"'
hookconf = {'enabled': 'True', 'freq': 2, 'alarm': 30, 'event': events}
self.server.create_import_hook(hookname, hookconf, hookbody,
overwrite=True)
# Submit a second job and verify that the following message
# does NOT appear in the mom log:
# _exechost_periodic_handler: Failed to update jid1
presubmit = int(time.time())
a = {'Resource_List.select': '1:ncpus=1:mem=100mb:host=%s' %
self.hostA}
j = Job(TEST_USER, attrs=a)
j.create_script(self.sleep15_job)
jid2 = self.server.submit(j)
a = {'job_state': 'R'}
self.server.expect(JOB, a, jid2)
self.server.status(JOB, ATTR_o, jid2)
o = j.attributes[ATTR_o]
self.tempfile.append(o)
err_msg = "Unexpected error in pbs_cgroups " + \
"handling exechost_periodic event: TypeError"
self.mom.log_match(err_msg, max_attempts=3,
interval=1, n=100, existence=False)
self.server.log_match(jid2 + ';Exit_status=0')
self.server.manager(MGR_CMD_DELETE, HOOK, None, hookname)
command = ['truncate', '-s0',
os.path.join(self.momA.pbs_conf['PBS_HOME'], 'mom_priv',
'hooks', 'hook_data', 'cgroup_jobs')]
self.du.run_cmd(cmd=command, hosts=self.hostA, sudo=True)
logmsg = '_exechost_periodic_handler: Failed to update %s' % jid1
self.momA.log_match(msg=logmsg, starttime=presubmit,
max_attempts=1, existence=False)
def check_req_rjs(self):
"""
Check the requirements for the reliable job startup tests.
MomA must have two free vnodes and MomB has one free vnode.
Return 1 if requirements are not satisfied.
"""
# Check that momA has two free vnodes
attr = {'state': 'free'}
rv1 = True
try:
self.server.expect(VNODE, attr, id='%s[0]' % self.hostA,
max_attempts=3, interval=2)
except PtlExpectError as exc:
rv1 = exc.rv
rv2 = True
try:
self.server.expect(VNODE, attr, id='%s[1]' % self.hostA,
max_attempts=3, interval=2)
except PtlExpectError as exc:
rv2 = exc.rv
# Check that momB has one free vnode
rv3 = True
try:
self.server.expect(VNODE, attr, id='%s[0]' % self.hostB,
max_attempts=3, interval=2)
except PtlExpectError as exc:
rv3 = exc.rv
if not rv1 or not rv2 or not rv3:
return 1
return 0
def test_cgroup_release_nodes(self):
"""
Verify that exec_vnode values are trimmed
when execjob_launch hook prunes job via release_nodes(),
tolerate_node_failures=job_start
"""
# Test requires 2 nodes
if len(self.moms) != 2:
self.skipTest('Test requires two Moms as input, '
'use -p moms=')
self.load_config(self.cfg7)
# Check that MomA has two free vnodes and MomB has a free vnode
if self.check_req_rjs() == 1:
self.skipTest(
'MomA must have two free vnodes and MomB one free vnode')
# instantiate queuejob hook
hook_event = 'queuejob'
hook_name = 'qjob'
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_launch hook
hook_event = 'execjob_launch'
hook_name = 'launch'
a = {'event': hook_event, 'enabled': 'true'}
self.keep_select = 'e.job.Resource_List["site"]'
self.server.create_import_hook(
hook_name, a, self.launch_hook_body % (self.keep_select))
# Submit a job that requires 2 nodes
j = Job(TEST_USER)
j.create_script(self.job_scr2)
jid = self.server.submit(j)
# Check the exec_vnode while in substate 41
self.server.expect(JOB, {ATTR_substate: '41'}, id=jid)
self.server.expect(JOB, 'exec_vnode', id=jid, op=SET)
job_stat = self.server.status(JOB, id=jid)
execvnode1 = job_stat[0]['exec_vnode']
self.logger.info("initial exec_vnode: %s" % execvnode1)
initial_vnodes = execvnode1.split('+')
# Check the exec_vnode after job is in substate 42
self.server.expect(JOB, {ATTR_substate: '42'}, id=jid)
self.server.expect(JOB, 'exec_vnode', id=jid, op=SET)
job_stat = self.server.status(JOB, id=jid)
execvnode2 = job_stat[0]['exec_vnode']
self.logger.info("pruned exec_vnode: %s" % execvnode2)
pruned_vnodes = execvnode2.split('+')
# Check that the pruned exec_vnode has one less than initial value
self.assertEqual(len(pruned_vnodes) + 1, len(initial_vnodes))
# Find the released vnode
for vn in initial_vnodes:
if vn not in pruned_vnodes:
rel_vn = vn
vnodeB = rel_vn.split(':')[0].split('(')[1]
self.logger.info("released vnode: %s" % vnodeB)
# Submit a second job requesting the released vnode, job runs
j2 = Job(TEST_USER,
{ATTR_l + '.select': '1:ncpus=1:mem=2gb:vnode=%s' % vnodeB})
jid2 = self.server.submit(j2)
self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
def test_cgroup_sismom_resize_fail(self):
"""
Verify that exec_vnode values are trimmed
when execjob_launch hook prunes job via release_nodes(),
exec_job_resize failure in sister mom,
tolerate_node_failures=job_start
"""
# Test requires 2 nodes
if len(self.moms) != 2:
self.skipTest('Test requires two Moms as input, '
'use -p moms=')
self.load_config(self.cfg7)
# Check that MomA has two free vnodes and MomB has a free vnode
if self.check_req_rjs() == 1:
self.skipTest(
'MomA must have two free vnodes and MomB one free vnode')
# instantiate queuejob hook
hook_event = 'queuejob'
hook_name = 'qjob'
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_launch hook
hook_event = 'execjob_launch'
hook_name = 'launch'
a = {'event': hook_event, 'enabled': 'true'}
self.keep_select = 'e.job.Resource_List["site"]'
self.server.create_import_hook(
hook_name, a, self.launch_hook_body % (self.keep_select))
# instantiate execjob_resize hook
hook_event = 'execjob_resize'
hook_name = 'resize'
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(
hook_name, a, self.resize_hook_body % ('not'))
# Submit a job that requires 2 nodes
j = Job(TEST_USER)
j.create_script(self.job_scr2)
stime = int(time.time())
jid = self.server.submit(j)
# Check the exec_vnode while in substate 41
self.server.expect(JOB, {ATTR_substate: '41'}, id=jid)
self.server.expect(JOB, 'exec_vnode', id=jid, op=SET)
job_stat = self.server.status(JOB, id=jid)
execvnode1 = job_stat[0]['exec_vnode']
self.logger.info("initial exec_vnode: %s" % execvnode1)
initial_vnodes = execvnode1.split('+')
# Check the exec_resize hook reject message in sister mom logs
self.momA.log_match(
"Job;%s;Cannot resize the job" % (jid),
starttime=stime, interval=2)
# Check the exec_vnode after job is in substate 42
self.server.expect(JOB, {ATTR_substate: '42'}, id=jid)
# Check for the pruned exec_vnode due to release_nodes() in launch hook
self.server.expect(JOB, 'exec_vnode', id=jid, op=SET)
job_stat = self.server.status(JOB, id=jid)
execvnode2 = job_stat[0]['exec_vnode']
self.logger.info("pruned exec_vnode: %s" % execvnode2)
pruned_vnodes = execvnode2.split('+')
# Check that the pruned exec_vnode has one less than initial value
self.assertEqual(len(pruned_vnodes) + 1, len(initial_vnodes))
# Check that the exec_vnode got pruned
self.momB.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, execvnode1), starttime=stime)
self.momB.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, execvnode2), starttime=stime)
# Check that the sister mom failed to update the job
self.momB.log_match(
"Job;%s;sister node %s.* failed to update job" % (jid, self.hostA),
starttime=stime, interval=2, regexp=True)
# Because of resize hook reject Mom failed to update the job.
# Check that job got requeued.
self.server.log_match("Job;%s;Job requeued" % (jid), starttime=stime)
def test_cgroup_msmom_resize_fail(self):
"""
Verify that exec_vnode values are trimmed
when execjob_launch hook prunes job via release_nodes(),
exec_job_resize failure in mom superior,
tolerate_node_failures=job_start
"""
# Test requires 2 nodes
if len(self.moms) != 2:
self.skipTest('Test requires two Moms as input, '
'use -p moms=')
self.load_config(self.cfg7)
# Check that MomA has two free vnodes and MomB has a free vnode
if self.check_req_rjs() == 1:
self.skipTest(
'MomA must have two free vnodes and MomB one free vnode')
# instantiate queuejob hook
hook_event = 'queuejob'
hook_name = 'qjob'
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_launch hook
hook_event = 'execjob_launch'
hook_name = 'launch'
a = {'event': hook_event, 'enabled': 'true'}
self.keep_select = 'e.job.Resource_List["site"]'
self.server.create_import_hook(
hook_name, a, self.launch_hook_body % (self.keep_select))
# instantiate execjob_resize hook
hook_event = 'execjob_resize'
hook_name = 'resize'
a = {'event': hook_event, 'enabled': 'true'}
self.server.create_import_hook(
hook_name, a, self.resize_hook_body % (''))
# Submit a job that requires 2 nodes
j = Job(TEST_USER)
j.create_script(self.job_scr2)
stime = int(time.time())
jid = self.server.submit(j)
# Check the exec_vnode while in substate 41
self.server.expect(JOB, {ATTR_substate: '41'}, id=jid)
self.server.expect(JOB, 'exec_vnode', id=jid, op=SET)
job_stat = self.server.status(JOB, id=jid)
execvnode1 = job_stat[0]['exec_vnode']
self.logger.info("initial exec_vnode: %s" % execvnode1)
initial_vnodes = execvnode1.split('+')
# Check the exec_resize hook reject message in mom superior logs
self.momB.log_match(
"Job;%s;Cannot resize the job" % (jid),
starttime=stime, interval=2)
# Check the exec_vnode after job is in substate 42
self.server.expect(JOB, {ATTR_substate: '42'}, id=jid)
self.server.expect(JOB, 'exec_vnode', id=jid, op=SET)
job_stat = self.server.status(JOB, id=jid)
execvnode2 = job_stat[0]['exec_vnode']
self.logger.info("pruned exec_vnode: %s" % execvnode2)
pruned_vnodes = execvnode2.split('+')
# Check that the pruned exec_vnode has one less than initial value
self.assertEqual(len(pruned_vnodes) + 1, len(initial_vnodes))
# Check that the exec_vnode got pruned
self.momB.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, execvnode1), starttime=stime)
self.momB.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, execvnode2), starttime=stime)
# Because of resize hook reject Mom failed to update the job.
# Check that job got requeued
self.server.log_match("Job;%s;Job requeued" % (jid), starttime=stime)
def test_cgroup_msmom_nodes_only(self):
"""
Verify that exec_vnode values are trimmed
when execjob_launch hook prunes job via release_nodes(),
job is using only vnodes from mother superior host,
tolerate_node_failures=job_start
"""
# Test requires 2 nodes
if len(self.moms) != 2:
self.skipTest('Test requires two Moms as input, '
'use -p moms=')
self.load_config(self.cfg7)
# Check that MomA has two free vnodes and MomB has a free vnode
if self.check_req_rjs() == 1:
self.skipTest(
'MomA must have two free vnodes and MomB one free vnode')
# disable queuejob hook
hook_event = 'queuejob'
hook_name = 'qjob'
a = {'event': hook_event, 'enabled': 'false'}
self.server.create_import_hook(hook_name, a, self.qjob_hook_body)
# instantiate execjob_launch hook
hook_event = 'execjob_launch'
hook_name = 'launch'
a = {'event': hook_event, 'enabled': 'true'}
self.keep_select = '"ncpus=4:mem=2gb"'
self.server.create_import_hook(
hook_name, a, self.launch_hook_body % (self.keep_select))
# disable execjob_resize hook
hook_event = 'execjob_resize'
hook_name = 'resize'
a = {'event': hook_event, 'enabled': 'false'}
self.server.create_import_hook(
hook_name, a, self.resize_hook_body % (''))
# Submit a job that requires two vnodes on one host
j = Job(TEST_USER)
j.create_script(self.job_scr3)
stime = int(time.time())
jid = self.server.submit(j)
# Check the exec_vnode while in substate 41
self.server.expect(JOB, {ATTR_substate: '41'}, id=jid)
self.server.expect(JOB, 'exec_vnode', id=jid, op=SET)
job_stat = self.server.status(JOB, id=jid)
execvnode1 = job_stat[0]['exec_vnode']
self.logger.info("initial exec_vnode: %s" % execvnode1)
initial_vnodes = execvnode1.split('+')
# Check the exec_vnode after job is in substate 42
self.server.expect(JOB, {ATTR_substate: '42'}, id=jid)
self.server.expect(JOB, 'exec_vnode', id=jid, op=SET)
job_stat = self.server.status(JOB, id=jid)
execvnode2 = job_stat[0]['exec_vnode']
self.logger.info("pruned exec_vnode: %s" % execvnode2)
pruned_vnodes = execvnode2.split('+')
# Check that the pruned exec_vnode has one less than initial value
self.assertEqual(len(pruned_vnodes) + 1, len(initial_vnodes))
# Check that the exec_vnode got pruned
self.momA.log_match("Job;%s;pruned from exec_vnode=%s" % (
jid, execvnode1), starttime=stime)
self.momA.log_match("Job;%s;pruned to exec_vnode=%s" % (
jid, execvnode2), starttime=stime)
# Find out the released vnode
if initial_vnodes[0] == execvnode2:
execvnodeB = initial_vnodes[1]
else:
execvnodeB = initial_vnodes[0]
vnodeB = execvnodeB.split(':')[0].split('(')[1]
self.logger.info("released vnode: %s" % vnodeB)
# Submit job2 requesting the released vnode, job runs
j2 = Job(TEST_USER, {
ATTR_l + '.select': '1:ncpus=1:mem=2gb:vnode=%s' % vnodeB})
jid2 = self.server.submit(j2)
self.server.expect(JOB, {ATTR_state: 'R'}, id=jid2)
def tearDown(self):
TestFunctional.tearDown(self)
self.load_config(self.cfg0)
if not self.iscray:
self.remove_vntype()
self.momA.delete_vnode_defs()
if self.hostA != self.hostB:
self.momB.delete_vnode_defs()
events = '"execjob_begin,execjob_launch,execjob_attach,'
events += 'execjob_epilogue,execjob_end,exechost_startup,'
events += 'exechost_periodic,execjob_resize"'
# Disable the cgroups hook
conf = {'enabled': 'False', 'freq': 30, 'event': events}
self.server.manager(MGR_CMD_SET, HOOK, conf, self.hook_name)
# Cleanup any temp file created
self.logger.info('Deleting temporary files %s' % self.tempfile)
self.du.rm(hostname=self.serverA, path=self.tempfile, force=True,
recursive=True, sudo=True)
# Cleanup frozen jobs
if 'freezer' in self.paths:
self.logger.info('Cleaning up frozen jobs ****')
fdir = self.paths['freezer']
if os.path.isdir(fdir):
self.logger.info('freezer directory present')
fpath = os.path.join(fdir, 'PtlPbs')
if os.path.isdir(fpath):
jid = glob.glob(os.path.join(fpath, '*', ''))
self.logger.info('found jobs %s' % jid)
if jid:
for files in jid:
self.logger.info('*** found jobdir %s' % files)
jpath = os.path.join(fpath, files)
freezer_file = os.path.join(jpath, 'freezer.state')
# Thaw the cgroup
state = 'THAWED'
fn = self.du.create_temp_file(
hostname=self.hostA, body=state)
self.du.run_copy(hosts=self.hostA, src=fn,
dest=freezer_file, sudo=True,
uid='root', gid='root',
mode=0644)
self.du.rm(hostname=self.hostA, path=fn)
cmd = ['rmdir ', jpath]
self.logger.info('deleting jobdir %s' % cmd)
self.du.run_cmd(cmd=cmd, sudo=True)
self.du.rm(hostname=self.hostA, path=fpath)
# Remove the jobdir if any under other cgroups
cgroup_subsys = ('cpuset', 'memory', 'blkio', 'devices', 'cpuacct',
'pids', 'systemd')
for subsys in cgroup_subsys:
if subsys in self.paths and self.paths[subsys]:
self.logger.info('Looking for orphaned jobdir in %s' % subsys)
cdir = self.paths[subsys]
if os.path.isdir(cdir):
cpath = os.path.join(cdir, 'pbspro')
if not os.path.isdir(cpath):
cpath = os.path.join(cdir, 'pbspro.slice')
if os.path.isdir(cpath):
for jdir in glob.glob(os.path.join(cpath, '*', '')):
if not os.path.isdir(jdir):
continue
self.logger.info('deleting jobdir %s' % jdir)
cmd2 = ['rmdir', jdir]
self.du.run_cmd(cmd=cmd2, sudo=True)